"""Read/Write image files, calling appropriate transport, archive and format plugins
"""
# Copyright (c) 2013-2022 Erling Andersen, Haukeland University Hospital, Bergen, Norway
import os.path
import logging
import mimetypes
import argparse
import fnmatch
import pathlib
import urllib.parse
import traceback as tb
from .formats import INPUT_ORDER_NONE, input_order_to_str, find_plugin, get_plugins_list
from .formats import CannotSort, NotImageError, UnknownInputError, WriteNotImplemented
from .transports import RootIsNotDirectory
from .archives import find_mimetype_plugin, ArchivePluginNotFound
[docs]
class NoTransportError(Exception):
pass
[docs]
class NoArchiveError(Exception):
pass
[docs]
class UnknownOptionType(Exception):
pass
logger = logging.getLogger(__name__)
[docs]
def read(urls, order=None, opts=None):
"""Read image data, calling appropriate transport, archive and format plugins
Args:
urls: list of urls or url to read (list of str, or str)
order: determine how to sort the images (default: auto-detect)
opts: input options (argparse.Namespace or dict)
Returns:
tuple of
- hdr: header instance
- si[tag,slice,rows,columns]: numpy array
Raises:
ValueError: When no sources are given.
UnknownOptionType: When opts cannot be made into a dict.
FileNotFoundError: When specified URL cannot be opened.
UnknownInputError: When the input format could not be determined.
CannotSort: When input data cannot be sorted.
"""
logger.debug("reader.read: urls {}".format(urls))
# transport,my_urls,files = sanitize_urls(urls)
# if len(my_urls) < 1:
# raise ValueError("No URL(s) where given")
# logger.debug("reader.read: transport {} my_urls {}".format(transport,my_urls))
sources = _get_sources(urls, mode='r', opts=opts)
if len(sources) < 1:
raise ValueError("No source(s) where given")
logger.debug("reader.read: sources {}".format(sources))
# Let in_opts be a dict from opts
if opts is None:
in_opts = {}
elif issubclass(type(opts), dict):
in_opts = opts
elif issubclass(type(opts), argparse.Namespace):
in_opts = vars(opts)
else:
raise UnknownOptionType('Unknown opts type ({}): {}'.format(type(opts),
opts))
# Let the calling party override a default input order
input_order = INPUT_ORDER_NONE
if 'input_order' in in_opts:
input_order = in_opts['input_order']
if order != 'none':
input_order = order
logger.info("Input order: {}.".format(input_order_to_str(input_order)))
# Pre-fetch DICOM template
pre_hdr = None
if 'template' in in_opts and in_opts['template']:
logger.debug("readdata.read template {}".format(in_opts['template']))
template_source = _get_sources(in_opts['template'], mode='r', opts=in_opts)
reader = find_plugin('dicom')
pre_hdr, _ = reader.read_files(template_source, input_order, in_opts)
# Pre-fetch DICOM geometry
geom_hdr = None
if 'geometry' in in_opts and in_opts['geometry']:
logger.debug("readdata.read geometry {}".format(in_opts['geometry']))
geometry_source = _get_sources(in_opts['geometry'], mode='r', opts=in_opts)
reader = find_plugin('dicom')
geom_hdr, _ = reader.read_files(geometry_source, input_order, in_opts)
# if pre_hdr is None:
# pre_hdr = {}
# _add_dicom_geometry(pre_hdr, geom_hdr)
# Call reader plugins in turn to read the image data
plugins = sorted_plugins_dicom_first(get_plugins_list())
logger.debug("readdata.read plugins length {}".format(len(plugins)))
summary = 'Summary of read plugins:'
for pname, ptype, pclass in plugins:
logger.debug("%20s (%8s) %s" % (pname, ptype, pclass.description))
reader = pclass()
try:
hdr, si = reader.read(sources, None, input_order, in_opts)
del reader
for source in sources:
logger.debug("readdata.read: close archive {}".format(source['archive']))
source['archive'].close()
if 'headers_only' in in_opts and in_opts['headers_only']:
pass
elif 'separate_series' not in in_opts or not in_opts['separate_series']:
hdr.add_template(pre_hdr)
hdr.add_geometry(geom_hdr)
return hdr, si
except (FileNotFoundError, CannotSort):
raise
except NotImageError as e:
logger.info("Giving up {}: {}".format(ptype, e))
summary = summary + '\n {}: {}'.format(ptype, e)
except Exception as e:
logger.info("Giving up (OTHER) {}: {}".format(ptype, e))
summary = summary + '\n {}: {}'.format(ptype, e)
for source in sources:
logger.debug("readdata.read: close archive {}".format(source['archive']))
source['archive'].close()
# All reader plugins failed - report
if issubclass(type(urls), list):
raise UnknownInputError('Could not determine input format of "{}": {}'.format(
urls[0], summary))
else:
raise UnknownInputError('Could not determine input format of "{}": {}'.format(
urls, summary))
# def _add_template(hdr, pre_hdr):
# if pre_hdr is not None:
# for key in pre_hdr:
# hdr[key] = copy.copy(pre_hdr[key])
# def _add_dicom_geometry(pre_hdr, geometry):
# """For each slice in geometry, use most of pre_hdr, adding a few attributes from geometry
# """
#
# #logger.debug("_add_dicom_geometry template %s geometry %s" % (
# # imagedata.formats.shape_to_str(self.shape),
# # imagedata.formats.shape_to_str(geometry.shape)))
# pre_hdr['sliceLocations'] = geometry['sliceLocations'].copy()
# pre_hdr['spacing'] = geometry['spacing'].copy()
# pre_hdr['orientation'] = geometry['orientation'].copy()
# pre_hdr['imagePositions'] = {}
# logger.debug("_add_dicom_geometry:")
# logger.debug("_add_dicom_geometry: geometry.imagePositions {}".format(
# geometry['imagePositions'].keys()))
# for k in geometry['imagePositions'].keys():
# pre_hdr['imagePositions'][k] = geometry['imagePositions'][k].copy()
# pre_hdr['axes'] = geometry['axes'].copy()
[docs]
def write(si, url, opts=None, formats=None):
"""Write image data, calling appropriate format plugins
Args:
si[tag,slice,rows,columns]: Series array
url: output destination url
opts: Output options (argparse.Namespace or dict)
formats: list of output formats, overriding opts.output_format (list or str)
Raises:
UnknownOptionType: When opts cannot be made into a dict.
TypeError: List of output format is not list().
ValueError: Wrong number of destinations given, or no way to write multidimensional image.
imagedata.formats.WriteNotImplemented: Cannot write this image format.
"""
def _replace_url(url, pattern, value):
if isinstance(url, str):
url = url.replace(pattern, value)
elif issubclass(type(url), pathlib.PurePath):
_pl = []
for _p in url.parts:
_pl.append(_p.replace(pattern, value))
url = pathlib.Path(*_pl)
return url
# logger.debug("write: directory_name(si): {}".format(directory_name(si)))
# Let out_opts be a dict from opts
if opts is None:
out_opts = {}
elif issubclass(type(opts), dict):
out_opts = opts
elif issubclass(type(opts), argparse.Namespace):
out_opts = vars(opts)
else:
raise UnknownOptionType('Unknown opts type ({}): {}'.format(type(opts),
opts))
if 'sernum' in out_opts and out_opts['sernum']:
si.seriesNumber = out_opts['sernum']
if 'serdes' in out_opts and out_opts['serdes']:
si.seriesDescription = out_opts['serdes']
if 'imagetype' in out_opts and out_opts['imagetype']:
si.imageType = out_opts['imagetype']
if 'frame' in out_opts and out_opts['frame']:
si.frameOfReferenceUID = out_opts['frame']
if 'SOPClassUID' in out_opts and out_opts['SOPClassUID']:
si.SOPClassUID = out_opts['SOPClassUID']
# Default output format is input format
try:
output_formats = [si.input_format]
except AttributeError:
output_formats = None
logger.debug("Default output format : {}".format(output_formats))
logger.debug("Overriding output formats: {}".format(formats))
logger.debug("Options: {}".format(out_opts))
if formats is not None:
if isinstance(formats, list):
output_formats = formats
elif isinstance(formats, str):
output_formats = [formats]
else:
raise TypeError("List of output format is not list() ({})".format(type(formats)))
elif 'output_format' in out_opts and len(out_opts['output_format']):
output_formats = out_opts['output_format']
if output_formats is None:
output_formats = ['dicom'] # Fall-back to dicom output
logger.info("Output formats: {}".format(output_formats))
# Determine output dtype
write_si = si
if 'dtype' in out_opts and out_opts['dtype'] is not None:
if out_opts['dtype'] != si.dtype:
# write_si = si.astype(str_to_dtype(out_opts['dtype']))
write_si = si.astype(out_opts['dtype'])
# Verify there is one destination only
# destinations = _get_sources(url, mode='w')
# if len(destinations) != 1:
# raise ValueError('Wrong number of destinations (%d) given' %
# len(destinations))
# Call plugin writers in turn to store the data
logger.debug("Available plugins {}".format(len(get_plugins_list())))
written = False
msg = ''
for pname, ptype, pclass in get_plugins_list():
if ptype in output_formats:
logger.debug("Attempt plugin {}".format(ptype))
# Create plugin to write data in specified format
writer = pclass()
logger.debug("readdata.write: Created writer plugin of type {}".format(type(writer)))
# local_url = url.replace('%p', ptype)
local_url = _replace_url(url, '%p', ptype)
destinations = _get_sources(local_url, mode='w', opts=out_opts)
if len(destinations) != 1:
raise ValueError('Wrong number of destinations (%d) given' %
len(destinations))
destination = destinations[0]
logger.debug('readdata.write: destination {}'.format(destination))
try:
if write_si.ndim == 4 and write_si.shape[0] > 1:
# 4D data
writer.write_4d_numpy(write_si, destination, out_opts)
elif write_si.ndim >= 2:
# 2D-3D data
writer.write_3d_numpy(write_si, destination, out_opts)
else:
raise ValueError("Don't know how to write image of shape {}".format(
write_si.shape))
written = True
del writer
except WriteNotImplemented:
raise
except Exception as e:
logger.info("Giving up (OTHER) {}: {}".format(ptype, e))
msg = msg + '\n{}: {}'.format(ptype, e)
msg = msg + '\n' + ''.join(tb.format_exception(None, e, e.__traceback__))
pass
destination['archive'].close()
if not written:
if len(msg) > 0:
raise IOError("Failed writing: {}".format(msg))
raise ValueError("No writer plugin was found for {}".format(output_formats))
if len(msg) > 0:
logger.error(msg)
# destination['archive'].close()
[docs]
def sorted_plugins_dicom_first(plugins):
"""Sort plugins such that any Nifti plugin is used early."""
for pname, ptype, pclass in plugins:
if ptype == 'nifti':
plugins.remove((pname, ptype, pclass))
plugins.insert(0, (pname, ptype, pclass))
break
"""Sort plugins such that any DICOM plugin is used first."""
for pname, ptype, pclass in plugins:
if ptype == 'dicom':
plugins.remove((pname, ptype, pclass))
plugins.insert(0, (pname, ptype, pclass))
break
return plugins
def _get_location_part(url):
"""Get location part of URL: scheme, netloc and path"""
if os.name == 'nt' and fnmatch.fnmatch(url, '[A-Za-z]:\\*'):
# Windows: Parse without x:, then reattach drive letter
url_tuple = urllib.parse.urlsplit(url[2:], scheme="file")
_path = url[:2] + url_tuple.path
else:
url_tuple = urllib.parse.urlsplit(url, scheme="file")
_path = url_tuple.path
# url_tuple = urllib.parse.urlsplit(url, scheme='file')
# Strip off query and fragment parts
location = urllib.parse.urlunsplit((
url_tuple.scheme,
url_tuple.netloc,
_path,
None,
None))
if location[:8] == 'file:///' and _path[0] != '/':
location = 'file://' + os.path.abspath(location[8:])
logger.debug('readdata._get_location_part: scheme %s' % url_tuple.scheme)
logger.debug('readdata._get_location_part: netloc %s' % url_tuple.netloc)
logger.debug('readdata._get_location_part: path %s' % _path)
logger.debug('readdata._get_location_part: location %s' % location)
return location
def _get_query_part(url):
"""Get query part of URL. This may contain file name"""
url_tuple = urllib.parse.urlsplit(url, scheme='file')
return url_tuple.query
def _get_archive(url, mode='r', opts=None):
"""Get archive plugin for given URL."""
if opts is None:
opts = {}
logger.debug('readdata._get_archive: url %s' % url)
url_tuple = urllib.parse.urlsplit(url, scheme="file")
if os.name == 'nt' and \
url_tuple.scheme == 'file' and \
fnmatch.fnmatch(url_tuple.netloc, '[A-Za-z]:\\*'):
# Windows: Parse without /x:, then re-attach drive letter
_path = url_tuple.netloc
else:
_path = url_tuple.path
# url_tuple = urllib.parse.urlsplit(url, scheme='file')
mimetype = mimetypes.guess_type(_path)[0]
archive = find_mimetype_plugin(
mimetype,
url,
mode,
# read_directory_only=mode[0] == 'r',
read_directory_only=False,
opts=opts)
logger.debug('readdata._get_archive: _mimetypes %s' % mimetype)
logger.debug('readdata._get_archive: archive %s' % archive.name)
return archive
def _common_prefix(level):
"""This unlike the os.path.commonprefix version
always returns path prefixes as it compares
path component wise
https://stackoverflow.com/questions/21498939
"""
cp = []
ls = [p.split(os.sep) for p in level]
ml = min(len(p) for p in ls)
for i in range(ml):
s = set(p[i] for p in ls)
if len(s) != 1:
break
cp.append(s.pop())
return os.sep.join(cp)
def _simplify_locations(locations):
"""Simplify locations by joining file:/// locations to a common prefix."""
logger.debug('readdata._simplify_locations: locations {}'.format(locations))
new_locations = {}
paths = []
for location in locations:
# On Windows, any backslash (os.sep) will be replaced by slash in URL
# url_tuple = urllib.parse.urlsplit(location.replace(os.sep, '/'), scheme='file')
if os.name == 'nt' and fnmatch.fnmatch(location, '[A-Za-z]:\\*'):
# Windows: Parse without x:, then reattach drive letter
url_tuple = urllib.parse.urlsplit(location[2:], scheme='file')
_path = location[:2] + url_tuple.path
else:
url_tuple = urllib.parse.urlsplit(location, scheme='file')
_path = url_tuple.path if len(url_tuple.path) > 0 else url_tuple.netloc
if url_tuple.scheme == 'file':
paths.append(_path)
else:
new_locations[location] = True
logger.debug('readdata._simplify_locations: paths {}'.format(paths))
if len(paths) > 0:
prefix = _common_prefix(paths)
logger.debug('readdata._simplify_locations: prefix {}'.format(prefix))
prefix_url = urllib.parse.urlunsplit((
'file',
'',
prefix,
None,
None))
# urlunsplit prepends file:/// when a Windows drive is present. Simplify to file://
if os.name == 'nt' and fnmatch.fnmatch(prefix_url, 'file:///[A-Za-z]:\\*'):
prefix_url = 'file://' + prefix_url[8:]
new_locations[prefix_url] = True
logger.debug('readdata._simplify_locations: new_locations {}'.format(new_locations))
return new_locations
def _get_sources(urls, mode, opts=None):
"""Determine transport, archive and file from each url.
Handle both single url, a url tuple, and a url list
Args:
urls: list, tuple or single string, e.g.:
file://dicom
transport: file, archive: fs, url: dicom
file://dicom.zip?query
transport: file, archive: zip, files: query
file://dicom.tar.gz?query
transport: file, archive: tgz, files: query
http://server:port/dicom.zip
transport: http, archive: zip
dicom://server:port/AET
transport: dicom, archive: fs
xnat://server:port/project/subject/experiment/scan
transport: xnat, archive: zip
mode: 'r' or 'w' for Read or Write
When mode = 'r', the urls must exist.
Returns:
sources: list of dict for each url
- 'archive' : archive plugin
- 'files' : list of file names or regexp. May be empty list.
"""
# Ensure the input is a list: my_urls
if opts is None:
opts = {}
if isinstance(urls, list):
source_urls = urls
elif isinstance(urls, tuple):
source_urls = list(urls)
else:
source_urls = [urls]
my_urls = []
for url in source_urls:
if issubclass(type(url), pathlib.PurePath):
my_urls.append(str(url.resolve()))
else:
my_urls.append(url)
# Scan my_urls to determine the locations of the inputs
locations = {}
for url in my_urls:
locations[_get_location_part(url)] = True
locations = _simplify_locations(locations)
# Set up sources for each location, and possibly add files
sources = []
for location in locations:
logger.debug('readdata._get_sources: location %s' % location)
source_location = location
source = {'files': []}
try:
source['archive'] = _get_archive(source_location, mode=mode, opts=opts)
except (RootIsNotDirectory,
ArchivePluginNotFound):
# Retry with parent directory
source_location, filename = os.path.split(source_location)
logger.debug('readdata._get_sources: retry location %s' % source_location)
source['archive'] = _get_archive(source_location, mode=mode, opts=opts)
for url in my_urls:
location_part = _get_location_part(url)
logger.debug('readdata._get_sources: compare _get_location_part %s location %s' %
(location_part, source_location))
query = _get_query_part(url)
logger.debug('readdata._get_sources: query %s' % query)
if location_part.startswith(source_location):
if source['archive'].use_query():
fname = query
else:
if query:
fname = query
else:
fname = location_part[len(source_location) + 1:]
# _get_query_part(url)
if len(fname) > 0:
source['files'].append(fname)
sources.append(source)
for source in sources:
logger.debug('readdata._get_sources: sources %s' % source)
return sources
[docs]
def str_to_dtype(s):
"""Convert dtype string to numpy dtype."""
return eval('np.' + s)