Source code for imagedata.readdata

"""Read/Write image files, calling appropriate transport, archive and format plugins
"""

# Copyright (c) 2013-2022 Erling Andersen, Haukeland University Hospital, Bergen, Norway

import os.path
import logging
import mimetypes
import argparse
import fnmatch
import pathlib
import urllib.parse
import traceback as tb
from typing import Dict, List, Tuple, Union
from .formats import INPUT_ORDER_NONE, input_order_to_str, find_plugin, get_plugins_list
from .formats import CannotSort, NotImageError, UnknownInputError, WriteNotImplemented
from .transports import RootIsNotDirectory
from .archives import find_mimetype_plugin, ArchivePluginNotFound


[docs] class NoTransportError(Exception): pass
[docs] class NoArchiveError(Exception): pass
[docs] class UnknownOptionType(Exception): pass
logger = logging.getLogger(__name__)
[docs] def read(urls, order=None, opts=None, input_format=None): """Read image data, calling appropriate transport, archive and format plugins Args: urls: list of urls or url to read (list of str, or str) order: determine how to sort the images (default: auto-detect) opts: input options (argparse.Namespace or dict) input_format: specify a particular input format (default: auto-detect) Returns: tuple of - hdr: header instance - si[tag,slice,rows,columns]: numpy array Raises: ValueError: When no sources are given. UnknownOptionType: When opts cannot be made into a dict. FileNotFoundError: When specified URL cannot be opened. UnknownInputError: When the input format could not be determined. CannotSort: When input data cannot be sorted. """ _name: str = '{}.{}'.format(__name__, read.__name__) logger.debug("{}: urls {}".format(_name, urls)) # transport,my_urls,files = sanitize_urls(urls) # if len(my_urls) < 1: # raise ValueError("No URL(s) where given") # logger.debug("reader.read: transport {} my_urls {}".format(transport,my_urls)) sources = _get_sources(urls, mode='r', opts=opts) if len(sources) < 1: raise ValueError("No source(s) where given") logger.debug("{}: sources {}".format(_name, sources)) # Let in_opts be a dict from opts if opts is None: in_opts = {} elif issubclass(type(opts), dict): in_opts = opts elif issubclass(type(opts), argparse.Namespace): in_opts = vars(opts) else: raise UnknownOptionType('Unknown opts type ({}): {}'.format(type(opts), opts)) if input_format is None and 'input_format' in in_opts: input_format = in_opts['input_format'] # Let the calling party override a default input order input_order = INPUT_ORDER_NONE if 'input_order' in in_opts: input_order = in_opts['input_order'] if order != 'none': input_order = order logger.info("{}: Input order: {}.".format( _name, input_order_to_str(input_order))) # Pre-fetch DICOM template pre_hdr = None if 'template' in in_opts and in_opts['template']: logger.debug("{}: template {}".format(_name, in_opts['template'])) template_source = _get_sources(in_opts['template'], mode='r', opts=in_opts) reader = find_plugin('dicom') pre_hdr, _ = reader.read(template_source, None, input_order, in_opts) if len(pre_hdr) != 1: raise ValueError('Template is not a single series') pre_hdr = pre_hdr[next(iter(pre_hdr))] # Pre-fetch DICOM geometry geom_hdr = None if 'geometry' in in_opts and in_opts['geometry']: logger.debug("{}: geometry {}".format(_name, in_opts['geometry'])) geometry_source = _get_sources(in_opts['geometry'], mode='r', opts=in_opts) reader = find_plugin('dicom') geom_hdr, _ = reader.read(geometry_source, None, input_order, in_opts) if len(geom_hdr) != 1: raise ValueError('Geometry template is not a single series') geom_hdr = geom_hdr[next(iter(geom_hdr))] # if pre_hdr is None: # pre_hdr = {} # _add_dicom_geometry(pre_hdr, geom_hdr) # Call reader plugins in turn to read the image data plugins = sorted_plugins_dicom_first(get_plugins_list(), input_format) logger.debug("{}: plugins length {}".format(_name, len(plugins))) summary = 'Summary of read plugins:' for pname, ptype, pclass in plugins: logger.debug("{}: {:20s} ({:8s}) {}".format( _name, pname, ptype, pclass.description)) reader = pclass() try: hdr, si = reader.read(sources, None, input_order, in_opts) del reader for source in sources: logger.debug("{}: close archive {}".format(_name, source['archive'])) source['archive'].close() if 'headers_only' in in_opts and in_opts['headers_only']: pass for seriesUID in hdr: hdr[seriesUID].add_template(pre_hdr) hdr[seriesUID].add_geometry(geom_hdr) return hdr, si except (FileNotFoundError, CannotSort): if 'skip_broken_series' in opts and opts['skip_broken_series']: pass else: raise except NotImageError as e: logger.info("{}: Giving up {}: {}".format(_name, ptype, e)) summary = summary + '\n {}: {}'.format(ptype, e) except Exception as e: logger.info("{}: Giving up (OTHER) {}: {}".format(_name, ptype, e)) summary = summary + '\n {}: {}'.format(ptype, e) # import traceback, sys # traceback.print_exc(file=sys.stdout) # exit(1) for source in sources: logger.debug("{}: close archive {}".format(_name, source['archive'])) source['archive'].close() # All reader plugins failed - report if issubclass(type(urls), list): raise UnknownInputError('Could not determine input format of "{}": {}'.format( urls[0], summary)) else: raise UnknownInputError('Could not determine input format of "{}": {}'.format( urls, summary))
# def _add_template(hdr, pre_hdr): # if pre_hdr is not None: # for key in pre_hdr: # hdr[key] = copy.copy(pre_hdr[key]) # def _add_dicom_geometry(pre_hdr, geometry): # """For each slice in geometry, use most of pre_hdr, adding a few attributes from geometry # """ # # #logger.debug("_add_dicom_geometry template %s geometry %s" % ( # # imagedata.formats.shape_to_str(self.shape), # # imagedata.formats.shape_to_str(geometry.shape))) # pre_hdr['sliceLocations'] = geometry['sliceLocations'].copy() # pre_hdr['spacing'] = geometry['spacing'].copy() # pre_hdr['orientation'] = geometry['orientation'].copy() # pre_hdr['imagePositions'] = {} # logger.debug("_add_dicom_geometry:") # logger.debug("_add_dicom_geometry: geometry.imagePositions {}".format( # geometry['imagePositions'].keys())) # for k in geometry['imagePositions'].keys(): # pre_hdr['imagePositions'][k] = geometry['imagePositions'][k].copy() # pre_hdr['axes'] = geometry['axes'].copy()
[docs] def write(si, url, opts=None, formats=None): """Write image data, calling appropriate format plugins Args: si[tag,slice,rows,columns]: Series array url: output destination url opts: Output options (argparse.Namespace or dict) formats: list of output formats, overriding opts.output_format (list or str) Raises: UnknownOptionType: When opts cannot be made into a dict. TypeError: List of output format is not list(). ValueError: Wrong number of destinations given, or no way to write multidimensional image. imagedata.formats.WriteNotImplemented: Cannot write this image format. """ _name: str = '{}.{}'.format(__name__, write.__name__) def _replace_url(url, pattern, value): if isinstance(url, str): url = url.replace(pattern, value) elif issubclass(type(url), pathlib.PurePath): _pl = [] for _p in url.parts: _pl.append(_p.replace(pattern, value)) url = pathlib.Path(*_pl) return url # logger.debug("write: directory_name(si): {}".format(directory_name(si))) # Let out_opts be a dict from opts if opts is None: out_opts = {} elif issubclass(type(opts), dict): out_opts = opts elif issubclass(type(opts), argparse.Namespace): out_opts = vars(opts) else: raise UnknownOptionType('Unknown opts type ({}): {}'.format(type(opts), opts)) if 'sernum' in out_opts and out_opts['sernum']: si.seriesNumber = out_opts['sernum'] if 'serdes' in out_opts and out_opts['serdes']: si.seriesDescription = out_opts['serdes'] if 'imagetype' in out_opts and out_opts['imagetype']: si.imageType = out_opts['imagetype'] if 'frame' in out_opts and out_opts['frame']: si.frameOfReferenceUID = out_opts['frame'] if 'SOPClassUID' in out_opts and out_opts['SOPClassUID']: si.SOPClassUID = out_opts['SOPClassUID'] # Default output format is input format try: output_formats = [si.input_format] except AttributeError: output_formats = None logger.debug("{}: Default output format : {}".format(_name, output_formats)) logger.debug("{}: Overriding output formats: {}".format(_name, formats)) logger.debug("{}: Options: {}".format(_name, out_opts)) if formats is not None: if isinstance(formats, list): output_formats = formats elif isinstance(formats, str): output_formats = [formats] else: raise TypeError("List of output format is not list() ({})".format(type(formats))) elif 'output_format' in out_opts and len(out_opts['output_format']): output_formats = out_opts['output_format'] if output_formats is None: output_formats = ['dicom'] # Fall-back to dicom output logger.info("{}: Output formats: {}".format(_name, output_formats)) # Determine output dtype write_si = si if 'dtype' in out_opts and out_opts['dtype'] is not None: if out_opts['dtype'] != si.dtype: # write_si = si.astype(str_to_dtype(out_opts['dtype'])) write_si = si.astype(out_opts['dtype']) # Verify there is one destination only # destinations = _get_sources(url, mode='w') # if len(destinations) != 1: # raise ValueError('Wrong number of destinations (%d) given' % # len(destinations)) # Call plugin writers in turn to store the data logger.debug("{}: Available plugins {}".format(_name, len(get_plugins_list()))) written = False msg = '' for pname, ptype, pclass in get_plugins_list(): if ptype in output_formats: logger.debug("{}: Attempt plugin {}".format(_name, ptype)) # Create plugin to write data in specified format writer = pclass() logger.debug("{}: Created writer plugin of type {}".format( _name, type(writer))) # local_url = url.replace('%p', ptype) local_url = _replace_url(url, '%p', ptype) destinations = _get_sources(local_url, mode='w', opts=out_opts) if len(destinations) != 1: raise ValueError('Wrong number of destinations (%d) given' % len(destinations)) destination = destinations[0] logger.debug('{}: destination {}'.format(_name, destination)) try: if write_si.ndim == 4 and write_si.shape[0] > 1: # 4D data writer.write_4d_numpy(write_si, destination, out_opts) elif write_si.ndim >= 2: # 2D-3D data writer.write_3d_numpy(write_si, destination, out_opts) else: raise ValueError("Don't know how to write image of shape {}".format( write_si.shape)) written = True del writer except WriteNotImplemented: raise except Exception as e: logger.info("{}: Giving up (OTHER) {}: {}".format( _name, ptype, e)) msg = msg + '\n{}: {}'.format(ptype, e) msg = msg + '\n' + ''.join(tb.format_exception(None, e, e.__traceback__)) pass destination['archive'].close() if not written: if len(msg) > 0: raise IOError("Failed writing: {}".format(msg)) raise ValueError("No writer plugin was found for {}".format(output_formats)) if len(msg) > 0: logger.error("{}: {}".format(_name, msg))
# destination['archive'].close()
[docs] def sorted_plugins_dicom_first(plugins, input_format): """Sort plugins such that any Nifti plugin is used early.""" if input_format is not None: for pname, ptype, pclass in plugins: if ptype == input_format: return [(pname, ptype, pclass)] for pname, ptype, pclass in plugins: if ptype == 'nifti': plugins.remove((pname, ptype, pclass)) plugins.insert(0, (pname, ptype, pclass)) break """Sort plugins such that any DICOM plugin is used first.""" for pname, ptype, pclass in plugins: if ptype == 'dicom': plugins.remove((pname, ptype, pclass)) plugins.insert(0, (pname, ptype, pclass)) break return plugins
def _get_location_part(url): """Get location part of URL: scheme, netloc and path""" _name: str = '{}.{}'.format(__name__, _get_location_part.__name__) if os.name == 'nt' and fnmatch.fnmatch(url, '[A-Za-z]:\\*'): # Windows: Parse without x:, then reattach drive letter url_tuple = urllib.parse.urlsplit(url[2:], scheme="file") _path = url[:2] + url_tuple.path else: url_tuple = urllib.parse.urlsplit(url, scheme="file") _path = url_tuple.path # url_tuple = urllib.parse.urlsplit(url, scheme='file') # Strip off query and fragment parts location = urllib.parse.urlunsplit(( url_tuple.scheme, url_tuple.netloc, _path, None, None)) if location[:8] == 'file:///' and _path[0] != '/': location = 'file://' + os.path.abspath(location[8:]) logger.debug('{}: scheme {}'.format(_name, url_tuple.scheme)) logger.debug('{}: netloc {}'.format(_name, url_tuple.netloc)) logger.debug('{}: path {}'.format(_name, _path)) logger.debug('{}: location {}'.format(_name, location)) return location def _get_query_part(url): """Get query part of URL. This may contain file name""" url_tuple = urllib.parse.urlsplit(url, scheme='file') return url_tuple.query def _get_archive(url, mode='r', opts=None): """Get archive plugin for given URL.""" _name: str = '{}.{}'.format(__name__, _get_archive.__name__) if opts is None: opts = {} logger.debug('{}: url {}'.format(_name, url)) url_tuple = urllib.parse.urlsplit(url, scheme="file") if os.name == 'nt' and \ url_tuple.scheme == 'file' and \ fnmatch.fnmatch(url_tuple.netloc, '[A-Za-z]:\\*'): # Windows: Parse without /x:, then re-attach drive letter _path = url_tuple.netloc else: _path = url_tuple.path # url_tuple = urllib.parse.urlsplit(url, scheme='file') mimetype = mimetypes.guess_type(_path)[0] archive = find_mimetype_plugin( mimetype, url, mode, # read_directory_only=mode[0] == 'r', read_directory_only=False, opts=opts) logger.debug('{}: _mimetypes {}'.format(_name, mimetype)) logger.debug('{}: archive {}'.format(_name, archive.name)) return archive def _common_prefix(level): """This unlike the os.path.commonprefix version always returns path prefixes as it compares path component wise https://stackoverflow.com/questions/21498939 """ cp = [] ls = [p.split(os.sep) for p in level] ml = min(len(p) for p in ls) for i in range(ml): s = set(p[i] for p in ls) if len(s) != 1: break cp.append(s.pop()) return os.sep.join(cp) def _simplify_locations(locations): """Simplify locations by joining file:/// locations to a common prefix.""" _name: str = '{}.{}'.format(__name__, _simplify_locations.__name__) logger.debug('{}: locations {}'.format(_name, locations)) new_locations = {} paths = [] for location in locations: # On Windows, any backslash (os.sep) will be replaced by slash in URL # url_tuple = urllib.parse.urlsplit(location.replace(os.sep, '/'), scheme='file') if os.name == 'nt' and fnmatch.fnmatch(location, '[A-Za-z]:\\*'): # Windows: Parse without x:, then reattach drive letter url_tuple = urllib.parse.urlsplit(location[2:], scheme='file') _path = location[:2] + url_tuple.path else: url_tuple = urllib.parse.urlsplit(location, scheme='file') _path = url_tuple.path if len(url_tuple.path) > 0 else url_tuple.netloc if url_tuple.scheme == 'file': paths.append(_path) else: new_locations[location] = True logger.debug('{}: paths {}'.format(_name, paths)) if len(paths) > 0: prefix = _common_prefix(paths) logger.debug('{}: prefix {}'.format(_name, prefix)) prefix_url = urllib.parse.urlunsplit(( 'file', '', prefix, None, None)) # urlunsplit prepends file:/// when a Windows drive is present. Simplify to file:// if os.name == 'nt' and fnmatch.fnmatch(prefix_url, 'file:///[A-Za-z]:\\*'): prefix_url = 'file://' + prefix_url[8:] new_locations[prefix_url] = True logger.debug('{}: new_locations {}'.format(_name, new_locations)) return new_locations def _get_sources( urls: Union[List, Tuple, str], mode: str, opts: dict = None) -> List[Dict]: """Determine transport, archive and file from each url. Handle both single url, a url tuple, and a url list Args: urls: list, tuple or single string, e.g.: file://dicom transport: file, archive: fs, url: dicom file://dicom.zip?query transport: file, archive: zip, files: query file://dicom.tar.gz?query transport: file, archive: tgz, files: query http://server:port/dicom.zip transport: http, archive: zip dicom://server:port/AET transport: dicom, archive: fs xnat://server:port/project/subject/experiment/scan transport: xnat, archive: zip mode: 'r' or 'w' for Read or Write When mode = 'r', the urls must exist. Returns: sources: list of dict for each url - 'archive' : archive plugin - 'files' : list of file names or regexp. May be empty list. """ _name: str = '{}.{}'.format(__name__, _get_sources.__name__) # Ensure the input is a list: my_urls if opts is None: opts = {} if isinstance(urls, list): source_urls = urls elif isinstance(urls, tuple): source_urls = list(urls) else: source_urls = [urls] my_urls = [] for url in source_urls: if issubclass(type(url), pathlib.PurePath): my_urls.append(str(url.resolve())) else: my_urls.append(url) # Scan my_urls to determine the locations of the inputs locations = {} for url in my_urls: locations[_get_location_part(url)] = True locations = _simplify_locations(locations) # Set up sources for each location, and possibly add files sources = [] for location in locations: logger.debug('{}: location {}'.format(_name, location)) source_location = location source = {'files': []} try: source['archive'] = _get_archive(source_location, mode=mode, opts=opts) except (RootIsNotDirectory, ArchivePluginNotFound): # Retry with parent directory source_location, filename = os.path.split(source_location) logger.debug('{}: retry location {}'.format(_name, source_location)) source['archive'] = _get_archive(source_location, mode=mode, opts=opts) for url in my_urls: location_part = _get_location_part(url) logger.debug('{}: compare _get_location_part {} location {}'.format( _name, location_part, source_location)) query = _get_query_part(url) logger.debug('{}: query {}'.format(_name, query)) if location_part.startswith(source_location): if source['archive'].use_query(): fname = query else: if query: fname = query else: fname = location_part[len(source_location) + 1:] # _get_query_part(url) if len(fname) > 0: source['files'].append(fname) sources.append(source) for source in sources: logger.debug('{}: sources {}'.format(_name, source)) return sources
[docs] def str_to_dtype(s): """Convert dtype string to numpy dtype.""" return eval('np.' + s)