"""Read/Write local image files
"""
# Copyright (c) 2018 Erling Andersen, Haukeland University Hospital, Bergen, Norway
import os
import os.path
import fnmatch
import shutil
import urllib.parse
import logging
from abc import ABC
import imagedata.archives
import imagedata.transports
from imagedata.archives.abstractarchive import AbstractArchive
[docs]class ReadOnlyError(Exception):
pass
[docs]class WriteOnFile(Exception):
pass
[docs]class NoSuchFile(Exception):
pass
[docs]class FilesystemArchive(AbstractArchive, ABC):
"""Read/write local files."""
name = "filesystem"
description = "Read and write local image files."
authors = "Erling Andersen"
version = "1.0.0"
url = "www.helse-bergen.no"
mimetypes = ['*'] # Disregards MIME types
# self.__dirname: root directory
# self.__filelist: list of absolute filename
@staticmethod
def _get_transport(url, mode, read_directory_only):
"""Get transport plugin from url.
If the url addressess a missing file in read mode,
access the parent directory.
"""
url_tuple = urllib.parse.urlsplit(url, scheme='file')
logging.debug('FilesystemArchive._get_transport: scheme: %s, netloc: %s' %
(url_tuple.scheme, url_tuple.path))
try:
_transport = imagedata.transports.Transport(
url_tuple.scheme,
netloc=url_tuple.netloc,
root=url_tuple.path,
mode=mode,
read_directory_only=read_directory_only)
except imagedata.transports.RootDoesNotExist:
# Mode='r': location does not exist
raise
except imagedata.transports.RootIsNotDirectory:
# Mode='r': Retry with parent directory
parent, _ = os.path.split(url_tuple.path)
_transport = imagedata.transports.Transport(
url_tuple.scheme,
netloc=url_tuple.netloc,
root=parent,
mode=mode,
read_directory_only=read_directory_only)
return _transport
def __init__(self, transport=None, url=None, mode='r',
read_directory_only=True, opts=None):
super(FilesystemArchive, self).__init__(
self.name, self.description,
self.authors, self.version, self.url, self.mimetypes)
logging.debug("FilesystemArchive.__init__ url: {}".format(url))
urldict = urllib.parse.urlsplit(url, scheme="file")
if transport is not None:
self.__transport = transport
self.__path = urldict.path
elif url is None:
raise ValueError('url not given')
else:
# Determine transport from url
# self.__transport = self._get_transport(url, mode, read_directory_only)
# netloc = urldict.netloc
# netloc = urldict.path
# netloc: where is zipfile
# self.__path: zipfile name
# netloc, self.__path = os.path.split(urldict.path)
# logging.debug('FilesystemArchive.__init__: scheme: %s, netloc: %s path: %s' %
# (urldict.scheme, netloc, self.__path))
self.__path = urldict.path
logging.debug('FilesystemArchive.__init__: scheme: %s, path: %s' %
(urldict.scheme, self.__path))
self.__transport = imagedata.transports.Transport(
urldict.scheme,
netloc=urldict.netloc,
root=self.__path,
mode=mode,
read_directory_only=read_directory_only,
opts=opts)
self.__mode = mode
self.__files = {}
logging.debug("FilesystemArchive __init__: {}".format(type(transport)))
logging.debug("FilesystemArchive path: {}".format(self.__path))
# self.__fp = self.__transport.open(
# self.__path, mode=self.__mode + "b")
# logging.debug("FilesystemArchive self.__fp: {}".format(type(self.__fp)))
logging.debug("FilesystemArchive open zipfile mode %s" % self.__mode)
# If the URL refers to a single file, let directory_name refer to the
# directory and basename to the file
logging.debug("FilesystemArchive __init__ verify : {}".format(self.__path))
if os.path.isfile(self.__path):
self.__dirname = os.path.dirname(self.__path)
self.__basename = os.path.basename(self.__path)
self.__filelist = [self.__basename]
logging.debug("FilesystemArchive __init__ directory_name : {}".format(self.__dirname))
logging.debug("FilesystemArchive __init__ basename: {}".format(self.__basename))
# logging.debug("FilesystemArchive self.__filelist: {}".format(self.__filelist))
return
# The URL refers to a directory. Let directory_name refer to the directory
self.__dirname = self.__path
self.__basename = None
logging.debug("FilesystemArchive __init__ scan directory_name : {}".format(self.__dirname))
logging.debug("FilesystemArchive __init__ scan basename: {}".format(self.__basename))
self.__filelist = list()
# logging.debug("FilesystemArchive walk root: {}".format(self.__urldict.path))
for root, dirs, files in self.__transport.walk(self.__path):
# logging.debug("FilesystemArchive scan root: {} {}".format(root, files))
for filename in files:
# fname = os.path.join(self.__path, root, filename)
fname = os.path.join(root, filename)
# logging.debug("FilesystemArchive scan fname: {}".format(fname))
self.__filelist.append(fname)
# if transport.isfile(fname):
# if root.startswith(self.__dirname):
# root = root[len(self.__dirname)+1:] # Strip off directory_name
# self.__filelist[fname] = (root,filename)
# logging.debug(fname)
self._sort_filelist()
# logging.debug("FilesystemArchive self.__filelist: {}".format(self.__filelist))
@property
def transport(self):
"""Underlying transport plugin
"""
return self.__transport
[docs] def use_query(self):
"""Do the plugin need the ?query part of the url?"""
return False
[docs] def getnames(self, files=None):
"""Get name list of the members.
Return:
The members as a list of their names.
It has the same order as the members of the archive.
"""
if files:
filelist = list()
for filename in self.__filelist:
for required_filename in files:
if fnmatch.fnmatchcase(filename, os.path.normpath(required_filename)):
filelist.append(filename)
elif fnmatch.fnmatchcase(filename, os.path.normpath(required_filename)+'/*'):
filelist.append(filename)
if len(filelist) < 1:
raise FileNotFoundError('No such file: %s' % files)
return filelist
else:
return self.__filelist
[docs] def basename(self, filehandle):
"""Basename of file.
Examples:
if archive.basename(filehandle) == "DICOMDIR":
Args:
filehandle: reference to member object
"""
root, filename = filehandle
return os.path.basename(filename)
[docs] def open(self, filehandle, mode='rb'):
"""Open file.
Returns:
a member object for member with filehandle.
"""
# logging.debug("getmember: fname {}".format(filehandle))
return self.__transport.open(filehandle, mode)
[docs] def getmembers(self, files=None):
"""Get the members of the archive.
Returns:
The members of the archive as a list of member objects.
The list has the same order as the members in the archive.
"""
# logging.debug("getmembers: files {}".format(files))
if files:
if issubclass(type(files), list):
wanted_files = files
else:
wanted_files = list((files,))
found_match = [False for _ in range(len(wanted_files))]
filelist = list()
for filename in self.__filelist:
for i, required_filename in enumerate(wanted_files):
if fnmatch.fnmatchcase(filename, os.path.normpath(required_filename)):
filelist.append(filename)
found_match[i] = True
elif fnmatch.fnmatchcase(filename,os.path.normpath(required_filename)+'/*'):
filelist.append(filename)
found_match[i] = True
# Verify that all wanted files are found
for i, found in enumerate(found_match):
if not found:
raise FileNotFoundError('No such file: %s' % wanted_files[i])
if len(filelist) < 1:
raise FileNotFoundError('No such file: %s' % files)
return filelist
else:
return self.__filelist
[docs] def to_localfile(self, filehandle):
"""Access a member object through a local file.
"""
# logging.debug('FilesystemArchive to_localfile: filename %s' %
# filehandle)
return os.path.join(self.__path, filehandle)
[docs] def add_localfile(self, local_file, filename):
"""Add a local file to the archive.
Args:
local_file: named local file
filename: filename in the archive
Returns:
filehandle to file in the archive
"""
fname = os.path.join(self.__dirname, filename)
if fname not in self.__filelist:
# Ensure the directory exists,
# create it silently if not.
os.makedirs(
os.path.dirname(fname),
exist_ok=True)
shutil.copy(local_file, fname)
self.__filelist.append(fname)
self._sort_filelist()
else:
raise imagedata.archives.FileAlreadyExistsError(
'File %s already exists' %
os.path.join(
self.__path,
filename))
[docs] def writedata(self, filename, data):
"""Write data to a named file in the archive.
Args:
filename: named file in the archive
data: data to write
"""
if self.__mode[0] == 'r':
raise ReadOnlyError("Archive is read-only.")
if self.__basename is not None:
raise WriteOnFile("Do not know how to write a file to a file.")
fname = os.path.join(self.__dirname, filename)
logging.debug("writedata: fname {}".format(fname))
with self.__transport.open(fname, 'wb') as f:
f.write(data)
self.__filelist.append(fname)
self._sort_filelist()
[docs] def close(self):
"""Close function.
"""
self.__transport.close()
return
[docs] def is_file(self, filehandle):
"""Determine whether the named file is a single file.
"""
return self.__transport.isfile(filehandle)
def __enter__(self):
"""Enter context manager.
"""
logging.debug("FilesystemArchive __enter__: {} mode {}".format(type(self.__transport), self.__mode))
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Leave context manager, cleaning up any open files.
"""
self.close()
def _sort_filelist(self):
"""Sort self.__filelist, usually after creation or insertion"""
self.__filelist = sorted(self.__filelist)