Source code for imagedata.archives.zipfilearchive

"""Read/Write image files from a zipfile
"""

# Copyright (c) 2018-2021 Erling Andersen, Haukeland University Hospital, Bergen, Norway

import os
import os.path
import shutil
import tempfile
import io
import fnmatch
import urllib.parse
import logging
from abc import ABC

import imagedata.archives
import imagedata.transports
from imagedata.archives.abstractarchive import AbstractArchive
import zipfile

logger = logging.getLogger(__name__)


def list_files(startpath):
    import os
    for root, dirs, files in os.walk(startpath):
        level = root.replace(startpath, '').count('/')
        indent = ' ' * 4 * level
        print('{}{}/'.format(indent, os.path.basename(root)))
        subindent = ' ' * 4 * (level + 1)
        for f in files:
            print('{}{}'.format(subindent, f))


[docs]class WriteFileIO(io.FileIO): """Local object making sure the new file is written to zip archive before closing.""" def __init__(self, archive, filename, localfile): """Make a WriteFileIO object. Args: archive: ZipFile object filename: path name in zip archive localfile: path name to local, temporary file """ super(WriteFileIO, self).__init__(localfile.name, mode='wb') self.__archive = archive self.__filename = filename self.__localfile = localfile
[docs] def close(self): """Close file, copy it to archive, then delete local file.""" logger.debug("ZipfileArchive.WriteFileIO.close:") ret = super(WriteFileIO, self).close() self.__localfile.close() logger.debug("ZipfileArchive.WriteFileIO.close: zip %s as %s" % (self.__localfile.name, self.__filename)) self.__archive.write(self.__localfile.name, self.__filename) logger.debug("ZipfileArchive.WriteFileIO.close: remove %s" % self.__localfile.name) os.remove(self.__localfile.name) return ret
def __enter__(self): """Enter context manager. """ logger.debug("ZipfileArchive.WriteFileIO __enter__: %s %s" % (self.__filename, self.__localfile.name)) return self def __exit__(self, exc_type, exc_val, exc_tb): """Leave context manager: Copy file to zip archive. Remove local file. """ self.close()
[docs]class ZipfileArchive(AbstractArchive, ABC): """Read/write image files from a zipfile.""" name = "zip" description = "Read and write image files from a zipfile." authors = "Erling Andersen" version = "1.0.0" url = "www.helse-bergen.no" mimetypes = ['application/zip', 'application/x-zip-compressed'] # Internal data # self.__transport: file transport object. # self.__fp: zip file object in transport object # self.__archive: ZipFile object. # self.__path: path to the zip file using given transport. # self.__mode: 'r' or 'w': read or write access. # self.__tmpdir: Local directory where zip file is unpacked. # self.__files: dict of files in the zip archive. # key is path name in the zip archive. # value is a dict of member info: # 'unpacked': whether the file is unpacked in tmpdir (boolean) # 'name': path name in the zip archive # 'fh': file handle when open, otherwise None # 'localfile': local filename of unpacked file def __init__(self, transport=None, url=None, mode='r', read_directory_only=False, opts=None): super(ZipfileArchive, self).__init__( self.name, self.description, self.authors, self.version, self.url, self.mimetypes) self.opts = opts logger.debug("ZipfileArchive.__init__ url: {}".format(url)) if os.name == 'nt' and fnmatch.fnmatch(url, '[A-Za-z]:\\*'): # Windows: Parse without x:, then reattach drive letter urldict = urllib.parse.urlsplit(url[2:], scheme="file") self.__path = url[:2] + urldict.path else: urldict = urllib.parse.urlsplit(url, scheme="file") self.__path = urldict.path if len(urldict.path) > 0 else urldict.netloc if transport is not None: self.__transport = transport elif url is None: raise ValueError('url not given') else: # Determine transport from url # netloc = urldict.netloc # netloc = urldict.path # netloc: where is zipfile # self.__path: zipfile name if urldict.scheme == 'xnat': netloc = urldict.netloc + self.__path # self.__path = urldict.path logger.debug('ZipfileArchive.__init__: scheme: %s, netloc: %s' % (urldict.scheme, netloc)) self.__transport = imagedata.transports.Transport( urldict.scheme, netloc=urldict.netloc, root=urldict.path, mode=mode, read_directory_only=read_directory_only) else: # netloc, self.__path = os.path.split(urldict.path) netloc, self.__path = os.path.split(self.__path) logger.debug('ZipfileArchive.__init__: scheme: %s, netloc: %s' % (urldict.scheme, netloc)) self.__transport = imagedata.transports.Transport( urldict.scheme, root=netloc, mode=mode, read_directory_only=read_directory_only) self.__mode = mode self.__files = {} logger.debug("ZipfileArchive path: {}".format(self.__path)) self.__fp = self.__transport.open( self.__path, mode=self.__mode + "b") logger.debug("ZipfileArchive self.__fp: {}".format(type(self.__fp))) logger.debug("ZipfileArchive open zipfile mode %s" % self.__mode) self.__archive = zipfile.ZipFile( self.__fp, mode=self.__mode, compression=zipfile.ZIP_DEFLATED) # Extract the archive self.__tmpdir = tempfile.mkdtemp() logger.debug("Extract zipfile {} to {}".format( self.__archive, self.__tmpdir)) # Get filelist in self.__files for fname in self.__archive.namelist(): # norm_fname = os.path.normpath(fname) try: _is_dir = self.__archive.getinfo(fname).is_dir() # Works with Python >= 3.6 except AttributeError: _is_dir = fname[-1] == '/' except Exception as e: logger.error('ZipfileArchive: {}'.format(e)) raise if not _is_dir: # member = {'unpacked': False, 'name': norm_fname, 'fh': None} # self.__files[norm_fname] = member member = {'unpacked': False, 'name': fname, 'fh': None} self.__files[fname] = member # logger.debug("ZipFile self.__files: {}".format(self.__files)) @property def transport(self): """Underlying transport plugin """ return self.__transport
[docs] def use_query(self): """Do the plugin need the ?query part of the url?""" return True
[docs] def getnames(self, files=None): """Get name list of the members. Returns: The members as a list of their names. It has the same order as the members of the archive. """ if files is None or \ (issubclass(type(files), str) and files == '*') or \ (issubclass(type(files), list) and len(files) > 0 and files[0] == '*'): logger.debug('ZipfileArchive.getnames: found files {}'.format(len(self.__files))) return sorted(self.__files.keys()) else: filelist = list() for filename in self.__files: logger.debug('ZipfileArchive.getnames: member {}'.format(filename)) for required_filename in files: logger.debug('ZipfileArchive.getnames: required {}'.format(required_filename)) if required_filename[-1] == '/': required_filename = required_filename[:-1] # if fnmatch.fnmatchcase(filename, os.path.normpath(required_filename)): # filelist.append(filename) # elif fnmatch.fnmatchcase(filename, os.path.normpath(required_filename) + '/*'): # filelist.append(filename) if fnmatch.fnmatchcase(filename, required_filename): filelist.append(filename) elif fnmatch.fnmatchcase(filename, required_filename + '/*'): filelist.append(filename) logger.debug('ZipfileArchive.getnames: found files {}'.format(len(filelist))) if len(filelist) < 1: raise FileNotFoundError('No such file: %s' % files) return filelist
[docs] def basename(self, filehandle): """Basename of file. Examples: if archive.basename(filehandle) == "DICOMDIR": Args: filehandle: reference to member object """ return os.path.basename(filehandle['name'])
@staticmethod def _longest_prefix(keys, required): prefix = '' for folder in keys: # new_prefix = os.path.commonprefix([folder, os.path.normpath(required)]) new_prefix = os.path.commonprefix([folder, required]) if len(new_prefix) > len(prefix): prefix = new_prefix return prefix def _filehandle_in_files(self, filehandle): fname = filehandle['name'] prefix = self._longest_prefix(self.__files.keys(), fname) return prefix in self.__files
[docs] def open(self, filehandle, mode='rb'): """Open file. Returns: A member object for member with filehandle. Extract the member object to local file space. This is necessary to allow the seek() operation on open files. """ logger.debug('ZipfileArchive.open: mode %s' % mode) logger.debug('ZipfileArchive.open: filehandle %s' % filehandle) if mode[0] == 'r': if filehandle['name'] not in self.__files: raise FileNotFoundError( 'No such file: %s' % filehandle['name']) filehandle['localfile'] = self.__archive.extract( filehandle['name'], path=self.__tmpdir) filehandle['unpacked'] = True filehandle['fh'] = open(filehandle['localfile'], mode=mode) return filehandle['fh'] elif mode[0] == 'w': if self.__mode[0] == 'r': raise PermissionError( 'Cannot write on an archive opened for read') # Open local file for write localfile = tempfile.NamedTemporaryFile(delete=False) logger.debug('ZipfileArchive.open: mode %s file %s' % ( mode, localfile)) fh = WriteFileIO(self.__archive, filehandle, localfile) member = {'unpacked': True, 'name': filehandle, 'fh': fh, 'localfile': localfile} self.__files[filehandle] = member return fh else: raise ValueError('Unknown mode "%s"' % mode)
[docs] def getmembers(self, files=None): """Get the members of the archive. Returns: The members of the archive as a list of member objects. The list has the same order as the members in the archive. """ if files is None or \ (issubclass(type(files), str) and files == '*') or \ (issubclass(type(files), list) and len(files) > 0 and files[0] == '*'): return self.__files else: # logger.debug('ZipfileArchive.getmembers: files {}'.format(len(files))) if issubclass(type(files), list): wanted_files = [] for file in files: # wanted_files.append(os.path.normpath(file)) if file[-1] == '/': file = file[:-1] wanted_files.append(file) else: # wanted_files = list((os.path.normpath(files),)) if files[-1] == '/': files = files[:-1] wanted_files = list((files,)) # logger.debug('ZipfileArchive.getmembers: wanted_files {}'.format(len(wanted_files))) found_match = [False for _ in range(len(wanted_files))] filelist = list() for filename in self.__files: for i, required_filename in enumerate(wanted_files): #if i == 0: # logger.debug('ZipfileArchive.getmembers: compare {} {} {}'.format(os.path.normpath(filename), required_filename, # os.path.normpath(required_filename))) if fnmatch.fnmatchcase(filename, required_filename): filelist.append(self.__files[filename]) found_match[i] = True # elif fnmatch.fnmatchcase(filename, required_filename + os.sep + '*'): elif fnmatch.fnmatchcase(filename, required_filename + '/*'): filelist.append(self.__files[filename]) found_match[i] = True # Verify that all wanted files are found for i, found in enumerate(found_match): if not found: raise FileNotFoundError('No such file: %s' % wanted_files[i]) if len(filelist) < 1: raise FileNotFoundError('No such file: %s' % files) return filelist
[docs] def to_localfile(self, filehandle): """Access a member object through a local file. """ if not self._filehandle_in_files(filehandle): raise FileNotFoundError( 'No such file: %s' % filehandle['name']) if not filehandle['unpacked']: filehandle['localfile'] = \ self.__archive.extract(filehandle['name']) filehandle['unpacked'] = True self.__files[filehandle['name']] = filehandle return filehandle['localfile']
[docs] def add_localfile(self, local_file, filename): """Add a local file to the archive. Args: local_file: named local file filename: filename in the archive Returns: filehandle to file in the archive """ if self.__mode[0] == 'r': raise PermissionError( 'Cannot write on an archive opened for read') member = {'unpacked': True, 'name': filename, 'fh': None, 'localfile': local_file} self.__archive.write(local_file, arcname=filename) logger.debug('ZipfileArchive.add_localfile: local {} as {}'.format( local_file, filename)) self.__files[filename] = member logger.debug('{}'.format(self.__archive.namelist()))
[docs] def writedata(self, filename, data): """Write data to a named file in the archive. Args: filename: named file in the archive data: data to write """ if self.__mode[0] == 'r': raise PermissionError( 'Cannot write on an archive opened for read') member = {'unpacked': False, 'name': filename, 'fh': None} self.__archive.writestr(filename, data) self.__files[filename] = member
[docs] def close(self): """Close zip file. """ self.__archive.close() self.__fp.close() shutil.rmtree(self.__tmpdir) logger.debug('ZipfileArchive.close: {}'.format(self.__tmpdir)) self.__transport.close()
[docs] def is_file(self, filehandle): """Determine whether the named file is a single file. """ pass
def __enter__(self): """Enter context manager. """ logger.debug("ZipfileArchive __enter__: {} mode {}".format(type(self.__transport), self.__mode)) return self def __exit__(self, exc_type, exc_val, exc_tb): """Leave context manager, cleaning up any open files. """ logger.debug('ZipfileArchive.__exit__:') self.close()