"""Read/Write files from a zipfile
"""
# Copyright (c) 2018-2024 Erling Andersen, Haukeland University Hospital, Bergen, Norway
from typing import Tuple, Union
import os
import os.path
import shutil
import tempfile
import io
import fnmatch
import urllib.parse
import logging
from abc import ABC
from .abstractarchive import AbstractArchive, Member
from ..transports import Transport
import zipfile
logger = logging.getLogger(__name__)
def list_files(startpath):
import os
for root, dirs, files in os.walk(startpath):
level = root.replace(startpath, '').count('/')
indent = ' ' * 4 * level
print('{}{}/'.format(indent, os.path.basename(root)))
subindent = ' ' * 4 * (level + 1)
for f in files:
print('{}{}'.format(subindent, f))
[docs]
class WriteFileIO(io.FileIO):
"""Local object making sure the new file is written to zip
archive before closing."""
def __init__(self, archive, member, local_file):
"""Make a WriteFileIO object.
Args:
archive: ZipFile object
member: member of the zip archive
local_file: local temporary file
"""
if isinstance(local_file, str):
super(WriteFileIO, self).__init__(local_file, mode='wb')
else:
super(WriteFileIO, self).__init__(local_file.name, mode='wb')
self.__archive = archive
self.__filename = member.filename
self.__local_file = local_file
@property
def local_file(self):
return self.__local_file
[docs]
def close(self):
"""Close file, copy it to archive, then delete local file."""
logger.debug("ZipfileArchive.WriteFileIO.close:")
ret = super(WriteFileIO, self).close()
if isinstance(self.__local_file, str):
self.__archive.write(self.__local_file, self.__filename)
os.remove(self.__local_file)
else:
self.__local_file.close()
logger.debug("ZipfileArchive.WriteFileIO.close: zip %s as %s" %
(self.__local_file.name, self.__filename))
self.__archive.write(self.__local_file.name, self.__filename)
logger.debug("ZipfileArchive.WriteFileIO.close: remove %s" %
self.__local_file.name)
os.remove(self.__local_file.name)
return ret
def __enter__(self):
"""Enter context manager.
"""
# logger.debug("ZipfileArchive.WriteFileIO __enter__: %s %s" %
# (self.__filename, self.__local_file.name))
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Leave context manager:
Copy file to zip archive.
Remove local file.
"""
self.close()
[docs]
class ZipfileArchive(AbstractArchive, ABC):
"""Read/write image files from a zipfile."""
name = "zip"
description = "Read and write image files from a zipfile."
authors = "Erling Andersen"
version = "1.1.0"
url = "www.helse-bergen.no"
mimetypes = ['application/zip', 'application/x-zip-compressed']
# Internal data
# self.transport: file transport object.
# self.__fp: zip file object in transport object
# self.__archive: ZipFile object.
# self.__path: path to the zip file using given transport.
# self.__mode: 'r' or 'w': read or write access.
# self.__tmpdir: Local directory where zip file is unpacked.
# self.__files: dict of files in the zip archive.
# key is path name in the zip archive.
# value is a Member object:
# info['unpacked']: whether the file is unpacked in tmpdir (boolean)
# filename: path name in the zip archive
# fh: file handle when open, otherwise None
# local_file: local filename of unpacked file
def __init__(self, transport=None, url=None, mode='r', read_directory_only=False, opts=None):
super(ZipfileArchive, self).__init__(
self.name, self.description,
self.authors, self.version, self.url, self.mimetypes)
self.opts = opts
logger.debug("ZipfileArchive.__init__ url: {}".format(url))
if os.name == 'nt' and fnmatch.fnmatch(url, '[A-Za-z]:\\*'):
# Windows: Parse without x:, then reattach drive letter
urldict = urllib.parse.urlsplit(url[2:], scheme="file")
self.__path = url[:2] + urldict.path
else:
urldict = urllib.parse.urlsplit(url, scheme="file")
self.__path = urldict.path if len(urldict.path) > 0 else urldict.netloc
if transport is not None:
self.transport = transport
elif url is None:
raise ValueError('url not given')
else:
# Determine transport from url
# netloc = urldict.netloc
# netloc = urldict.path
# netloc: where is zipfile
# self.__path: zipfile name
try:
netloc = urldict.netloc + self.__path
logger.debug('ZipfileArchive.__init__: scheme: %s, netloc: %s' %
(urldict.scheme, netloc))
self.transport = Transport(
urldict.scheme,
netloc=urldict.netloc,
root=urldict.path,
mode=mode,
read_directory_only=read_directory_only)
except Exception:
raise
self.__mode = mode
self.__files = {}
logger.debug("ZipfileArchive path: {}".format(self.__path))
self.__fp = self.transport.open(
self.__path, mode=self.__mode + "b")
logger.debug("ZipfileArchive self.__fp: {}".format(type(self.__fp)))
logger.debug("ZipfileArchive open zipfile mode %s" % self.__mode)
self.__archive = zipfile.ZipFile(
self.__fp,
mode=self.__mode,
compression=zipfile.ZIP_DEFLATED)
# Extract the archive
self.__tmpdir = tempfile.mkdtemp()
logger.debug("Extract zipfile {} to {}".format(
self.__archive, self.__tmpdir))
# Get filelist in self.__files
for fname in self.__archive.namelist():
try:
_is_dir = self.__archive.getinfo(fname).is_dir()
except AttributeError:
_is_dir = fname[-1] == '/'
except Exception as e:
logger.error('ZipfileArchive: {}'.format(e))
raise
if not _is_dir:
self.__files[fname] = Member(fname,
info={'unpacked': False}
)
# logger.debug("ZipFile self.__files: {}".format(self.__files))
[docs]
def use_query(self):
"""Does the plugin need the ?query part of the url?"""
return True
[docs]
def getnames(self, files=None):
"""Get name list of the members.
Args:
files: List or single str of filename matches
Returns:
The members as a list of their names.
It has the same order as the members of the archive.
Raises:
FileNotFoundError: When no matching file is found.
"""
if files is not None and issubclass(type(files), str):
wanted_files = [files]
else:
wanted_files = files
if wanted_files is None or\
(issubclass(type(wanted_files), list) and (
len(wanted_files) == 0 or
len(wanted_files) > 0 and wanted_files[0] == '*')):
logger.debug('ZipfileArchive.getnames: found files {}'.format(len(self.__files)))
return sorted(self.__files.keys())
else:
filelist = list()
for filename in self.__files:
logger.debug('ZipfileArchive.getnames: member {}'.format(filename))
for required_filename in wanted_files:
logger.debug('ZipfileArchive.getnames: required {}'.format(required_filename))
if required_filename[-1] == '/':
required_filename = required_filename[:-1]
if fnmatch.fnmatchcase(filename, required_filename):
filelist.append(filename)
elif fnmatch.fnmatchcase(filename, required_filename + '/*'):
filelist.append(filename)
logger.debug('ZipfileArchive.getnames: found files {}'.format(len(filelist)))
if len(filelist) < 1:
raise FileNotFoundError('No such file: {}'.format(wanted_files))
return filelist
[docs]
def basename(self, filehandle: Member):
"""Basename of file.
Examples:
if archive.basename(filehandle) == "DICOMDIR":
Args:
filehandle: reference to member object
Returns:
Basename of file: str
"""
return os.path.basename(filehandle.filename)
@staticmethod
def _longest_prefix(keys, required):
prefix = ''
for folder in keys:
# new_prefix = os.path.commonprefix([folder, os.path.normpath(required)])
new_prefix = os.path.commonprefix([folder, required])
if len(new_prefix) > len(prefix):
prefix = new_prefix
return prefix
def _filehandle_in_files(self, filehandle):
fname = filehandle.filename
prefix = self._longest_prefix(self.__files.keys(), fname)
return prefix in self.__files
[docs]
def open(self, member: Member, mode: str = 'rb'):
"""Open file.
Extract the member object to local file space.
This is necessary to allow the seek() operation on open files.
Args:
member (imagedata.archives.abstractarchive.Member): Handle to file.
mode (str): Open mode.
Returns:
An IO object for the member.
Raises:
FileNotFoundError: when file is not found.
PermissionError: When archive is read-only.
"""
if isinstance(member, str):
member = Member(member)
logger.debug('ZipfileArchive.open: mode %s' % mode)
logger.debug('ZipfileArchive.open: member %s' % member.filename)
if mode[0] == 'r':
if member.filename not in self.__files:
raise FileNotFoundError(
'No such file: %s' % member.filename)
member.local_file = self.__archive.extract(
member.filename, path=self.__tmpdir)
member.info['unpacked'] = True
member.fh = open(member.local_file, mode=mode)
return member.fh
elif mode[0] == 'w':
if self.__mode[0] == 'r':
raise PermissionError(
'Cannot write on an archive opened for read')
# Open local file for write
suffix = None
ext = member.filename.find('.')
if ext >= 0:
suffix = member.filename[ext:]
local_file = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
logger.debug('ZipfileArchive.open: mode %s file %s' % (
mode, local_file))
fh = WriteFileIO(self.__archive, member, local_file)
# Update info on member file
self.__files[member.filename] = Member(member.filename,
info={'unpacked': True},
fh=fh,
local_file=local_file
)
return fh
else:
raise ValueError('Unknown mode "%s"' % mode)
[docs]
def getmembers(self, files=None):
"""Get the members of the archive.
Args:
files: List of filename matches
Returns:
The members of the archive as a list of Filehandles.
The list same order as the members in the archive.
"""
if files is not None and issubclass(type(files), str):
wanted_files = [files]
else:
wanted_files = files
if wanted_files is None or \
(issubclass(type(wanted_files), list) and (
len(wanted_files) == 0 or len(wanted_files) > 0 and
wanted_files[0] == '*')):
return list(self.__files.values())
else:
# logger.debug('ZipfileArchive.getmembers: files {}'.format(len(files)))
if issubclass(type(files), list):
wanted_files = []
for file in files:
if file[-1] == '/':
file = file[:-1]
wanted_files.append(file)
else:
if files[-1] == '/':
files = files[:-1]
wanted_files = list((files,))
# logger.debug('ZipfileArchive.getmembers: wanted_files {}'.format(len(wanted_files)))
found_match = [False for _ in range(len(wanted_files))]
filelist = list()
for filename in self.__files:
for i, required_filename in enumerate(wanted_files):
if fnmatch.fnmatchcase(filename, required_filename):
filelist.append(self.__files[filename])
found_match[i] = True
elif fnmatch.fnmatchcase(filename, required_filename + '/*'):
filelist.append(self.__files[filename])
found_match[i] = True
# Verify that all wanted files are found
for i, found in enumerate(found_match):
if not found:
raise FileNotFoundError('No such file: %s' % wanted_files[i])
if len(filelist) < 1:
raise FileNotFoundError('No such file: %s' % files)
return filelist
[docs]
def construct_filename(self,
tag: Union[Tuple, None],
query: str = None,
) -> str:
"""Construct a filename with given scheme.
Args:
tag: a tuple giving the present position of the filename (tuple).
query: from url query (str).
Returns:
A filename compatible with the given archive (str).
"""
if query is None:
query = self.fallback
# ext = self._get_extension(query)
if tag is None:
tag = (0,)
if self.level:
tag = tuple(0 for _ in range(self.level))
if '%' in query:
query = query % tag
else:
query = query.format(*tag)
return query
[docs]
def new_local_file(self,
filename: str) -> Member:
"""Create new local file.
Args:
filename: Preferred filename (str)
Returns:
member object (Member). The local_file property has the local filename.
"""
member = Member(filename)
if self._filehandle_in_files(member):
raise FileExistsError('File {} already exists')
# member.fh = tempfile.NamedTemporaryFile(delete=False)
# member.local_file = member.fh.name
suffix = None
ext = filename.find('.')
if ext >= 0:
suffix = filename[ext:]
member.fh, member.local_file = tempfile.mkstemp(suffix=suffix)
member.info['unpacked'] = True
self.__files[member.filename] = member
return WriteFileIO(self.__archive, member, member.local_file)
[docs]
def to_localfile(self, member):
"""Access a member object through a local file.
Args:
member: handle to member file.
Returns:
filename to file guaranteed to be local.
Raises:
FileNotFoundError: when file is not found.
"""
if not self._filehandle_in_files(member):
raise FileNotFoundError('No such file: {}'.format(member.filename))
if not member.info['unpacked']:
member.local_file = self.__archive.extract(member.filename)
member.info['unpacked'] = True
self.__files[member.filename] = member
return member.local_file
[docs]
def add_localfile(self, local_file, filename):
"""Add a local file to the archive.
Args:
local_file: named local file
filename: filename in the archive
"""
if self.__mode[0] == 'r':
raise PermissionError(
'Cannot write on an archive opened for read')
member = Member(filename, info={'unpacked': True},
local_file=local_file)
self.__archive.write(local_file, arcname=filename)
logger.debug('ZipfileArchive.add_localfile: local {} as {}'.format(
local_file, filename))
self.__files[filename] = member
logger.debug('{}'.format(self.__archive.namelist()))
[docs]
def writedata(self, filename, data):
"""Write data to a named file in the archive.
Args:
filename: named file in the archive
data: data to write
"""
if self.__mode[0] == 'r':
raise PermissionError(
'Cannot write on an archive opened for read')
member = Member(filename, info={'unpacked': False})
self.__archive.writestr(filename, data)
self.__files[filename] = member
[docs]
def close(self):
"""Close zip file.
"""
self.__archive.close()
self.__fp.close()
shutil.rmtree(self.__tmpdir)
logger.debug('ZipfileArchive.close: {}'.format(self.__tmpdir))
self.transport.close()
[docs]
def is_file(self, member):
"""Determine whether the named file is a single file.
Args:
member: file member
Returns:
whether named file is a single file (bool)
"""
return member.filename in self.__files and self._filehandle_in_files(member)
[docs]
def exists(self, member):
"""Determine whether the named path exists.
Args:
member: member name.
Returns:
whether member exists (bool)
"""
return member.filename in self.__files
@property
def root(self) -> str:
"""Archive root name.
"""
return os.path.sep
@property
def base(self) -> str:
"""Archive base name.
"""
return None
@property
def path(self) -> str:
"""Archive path.
"""
return ''
def __enter__(self):
"""Enter context manager.
"""
logger.debug("ZipfileArchive __enter__: {} mode {}".format(
type(self.transport), self.__mode))
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Leave context manager, cleaning up any open files.
"""
logger.debug('ZipfileArchive.__exit__:')
self.close()