Source code for imagedata.formats.dicomplugin

"""Read/Write DICOM files
"""

# Copyright (c) 2013-2026 Erling Andersen, Haukeland University Hospital, Bergen, Norway

import os
import sys
import logging
import traceback
import mimetypes
import math
from collections import namedtuple, Counter
from functools import cmp_to_key
from datetime import date, datetime, timedelta, timezone
import numpy as np
import pydicom
import pydicom.valuerep
import pydicom.config
import pydicom.errors
import pydicom.uid
from pydicom.datadict import dictionary_VR, tag_for_keyword
from pydicom.dataset import Dataset, FileDataset, FileMetaDataset

from ..formats import (CannotSort, EmptyImageError, NotImageError, UnknownTag,
                       INPUT_ORDER_NONE, INPUT_ORDER_TIME, INPUT_ORDER_B,
                       INPUT_ORDER_FA, INPUT_ORDER_TE, INPUT_ORDER_BVECTOR,
                       INPUT_ORDER_DTI, INPUT_ORDER_TRIGGERTIME,
                       SORT_ON_SLICE
                       )
from ..series import Series
from ..axis import Axis, VariableAxis, UniformLengthAxis
from .abstractplugin import AbstractPlugin
from ..archives.abstractarchive import AbstractArchive, Member
from ..header import Header
from ..apps.diffusion import set_ds_b_value, set_ds_b_vector
from .dicomlib.instance import Instance
from .dicomlib.sorting import (combine_data_and_header, compare_tuples, determine_sorting,
                               scan_tags, verify_consistent_slices)
from .dicomlib.datatypes import (SeriesUID, SourceList, ObjectList, DatasetList, DatasetDict,
                                 SortedDatasetList, SortedData, SortedDataDict, SortedHeaderDict,
                                 PixelDict)

logger = logging.getLogger(__name__)
pydicom.config.settings.reading_validation_mode = pydicom.config.IGNORE
# pydicom.config.settings.writing_validation_mode = pydicom.config.IGNORE
pydicom.config.settings.writing_validation_mode = pydicom.config.WARN
# pydicom.config.settings.writing_validation_mode = pydicom.config.RAISE

mimetypes.add_type('application/dicom', '.ima')


"""
Processes
=========

object_list: ObjectList = self._get_dicom_files(sources)

dataset_dict: DatasetDict
dataset_dict = self._catalog_on_instance_uid(object_list)
    self._extract_member(dataset_dict, f)

imaging_dataset_dict: DatasetDict
imaging_dataset_dict = self._select_imaging_datasets(dataset_dict)
non_imaging_dataset_dict: DatasetDict
non_imaging_dataset_dict = self._select_non_imaging_datasets(dataset_dict)

if imaging_dataset_dict:
    sorted_data_dict: SortedDataDict
    sorted_data_dict = self._sort_datasets(imaging_dataset_dict)

    sorted_header_dict: SortedHeaderDict = SortedHeaderDict()

    pixel_dict: PixelDict = PixelDict()
    pixel_dict = self._construct_pixel_arrays(sorted_data_dict)
        si = self._construct_pixel_array(dataset_dict, header, header.shape)

if non_imaging_dataset_dict:
    non_image_header_dict: SortedHeaderDict
    non_image_header_dict = self._get_non_image_headers(non_imaging_dataset_dict)
        self._extract_non_image_dicom_attributes(series_dataset, hdr)
    
    non_image_pixel_dict = self._construct_pixel_arrays(non_imaging_dataset_dict)
        si = self._construct_pixel_array(dataset_dict, header, header.shape)
                                                        
"""
image_uids = [pydicom.uid.MRImageStorage,
              pydicom.uid.CTImageStorage,
              pydicom.uid.DICOSCTImageStorage,
              pydicom.uid.RTImageStorage,
              pydicom.uid.UltrasoundImageStorage,
              pydicom.uid.UltrasoundMultiFrameImageStorage,
              pydicom.uid.ComputedRadiographyImageStorage,
              pydicom.uid.XRayAngiographicImageStorage,
              pydicom.uid.XRay3DAngiographicImageStorage,
              pydicom.uid.XRay3DCraniofacialImageStorage,
              pydicom.uid.XRayRadiofluoroscopicImageStorage,
              pydicom.uid.SecondaryCaptureImageStorage,
              pydicom.uid.PositronEmissionTomographyImageStorage,
              pydicom.uid.BreastTomosynthesisImageStorage,
              pydicom.uid.NuclearMedicineImageStorage,
              pydicom.uid.ParametricMapStorage,
              pydicom.uid.EddyCurrentImageStorage,
              pydicom.uid.EddyCurrentMultiFrameImageStorage,
              pydicom.uid.VLEndoscopicImageStorage,
              pydicom.uid.VideoEndoscopicImageStorage,
              pydicom.uid.VLMicroscopicImageStorage,
              pydicom.uid.VideoMicroscopicImageStorage,
              pydicom.uid.VLPhotographicImageStorage,
              pydicom.uid.VideoPhotographicImageStorage
              ]

# sr_uids = [pydicom.uid.BasicTextSRStorage,
#            pydicom.uid.EnhancedSRStorage,
#            pydicom.uid.ComprehensiveSRStorage,]


[docs] class DoNotIncludeFile(Exception): pass
[docs] class NoDICOMAttributes(Exception): pass
[docs] class ValueErrorWrapperPrecisionError(Exception): pass
[docs] class DICOMPlugin(AbstractPlugin): """Read/write DICOM files. Attributes: input_order instanceNumber today now serInsUid input_options output_sort output_dir seriesTime """ name = "dicom" description = "Read and write DICOM files." authors = "Erling Andersen" version = "2.1.0" url = "www.helse-bergen.no" extensions = [".dcm", ".ima"] root = "2.16.578.1.37.1.1.4" smallint = ('bool8', 'byte', 'ubyte', 'ushort', 'uint16', 'int8', 'uint8', 'int16') keep_uid = False slice_tolerance = 1e-4 dir_cosine_tolerance = 0.0 def __init__(self): super(DICOMPlugin, self).__init__(self.name, self.description, self.authors, self.version, self.url) self.input_order = None self.DicomHeaderDict = None self.dicomTemplate = None self.instanceNumber = 0 self.today = date.today().strftime("%Y%m%d") self.now = datetime.now().strftime("%H%M%S.%f") self.serInsUid = None self.input_options = {} self.output_sort = None self.output_dir = None self.seriesTime = None
[docs] def read(self, sources: SourceList, pre_hdr: Header, input_order: str, opts: dict) -> ( tuple[SortedHeaderDict, PixelDict]): """Read image data Args: self: DICOMPlugin instance sources: list of sources to image data pre_hdr: Pre-filled header dict. Can be None input_order: sort order opts: input options (dict) Returns: Tuple of - hdr: Header - input_format - input_order - slices - sliceLocations - dicomTemplate - keep_uid - tags - seriesNumber - seriesDescription - imageType - spacing - orientation - imagePositions - si[tag,slice,rows,columns]: multi-dimensional numpy array """ _name: str = '{}.{}'.format(__name__, self.read.__name__) self.input_order = input_order self.input_options = { INPUT_ORDER_NONE: 'get_no_value', INPUT_ORDER_TIME: 'get_acquisition_time', INPUT_ORDER_TRIGGERTIME: 'get_trigger_time', INPUT_ORDER_B: 'get_b_value', INPUT_ORDER_BVECTOR: 'get_b_vector', INPUT_ORDER_DTI: 'get_dti', INPUT_ORDER_TE: 'get_echo_time', INPUT_ORDER_FA: 'get_flip_angle', 'auto_sort': ['time', 'triggertime', 'b', 'fa', 'te'] } for key, value in opts.items(): # Copy opts to self.input_options self.input_options[key] = value skip_pixels = 'headers_only' in opts and opts['headers_only'] if 'slice_tolerance' in self.input_options: self.slice_tolerance = float(self.input_options['slice_tolerance']) if 'dir_cosine_tolerance' in self.input_options: self.dir_cosine_tolerance = float(self.input_options['dir_cosine_tolerance']) # Read DICOM headers logger.debug('{}: sources {}'.format(_name, sources)) # pydicom.config.debug(True) object_list: ObjectList = self._get_dicom_files(sources) dataset_dict: DatasetDict dataset_dict = self._catalog_on_instance_uid(object_list, opts, skip_pixels) imaging_dataset_dict: DatasetDict imaging_dataset_dict = self._select_imaging_datasets(dataset_dict, opts) non_imaging_dataset_dict: DatasetDict non_imaging_dataset_dict = self._select_non_imaging_datasets(dataset_dict, opts) logger.debug('{}: imaging_datasets {}'.format(_name, len(imaging_dataset_dict))) logger.debug('{}: non_imaging_datasets {}'.format(_name, len(non_imaging_dataset_dict))) sorted_header_dict: SortedHeaderDict = SortedHeaderDict() pixel_dict: PixelDict = PixelDict() if imaging_dataset_dict: sorted_data_dict: SortedDataDict sorting: dict[str] sorted_data_dict, sorting = self._sort_datasets(imaging_dataset_dict, input_order, opts) if not skip_pixels: logger.debug('{}: going to _construct_pixel_arrays'.format(_name)) pixel_dict = self._construct_pixel_arrays(sorted_data_dict, opts, skip_pixels) if 'correct_acq' in opts and opts['correct_acq']: for seriesUID in sorted_data_dict: pixel_dict[seriesUID] = self._correct_acqtimes_for_dynamic_series( sorted_data_dict[seriesUID], pixel_dict[seriesUID] ) for seriesUID in sorted_data_dict: _, header = sorted_data_dict[seriesUID] sorted_header_dict[seriesUID] = header if non_imaging_dataset_dict: logger.debug('{}: going to _get_non_image_headers {}'.format(_name, sources)) non_image_header_dict: SortedHeaderDict non_image_header_dict = self._get_non_image_headers(non_imaging_dataset_dict, opts) non_image_pixel_dict = {} if not skip_pixels: logger.debug('{}: going to _construct_pixel_arrays'.format(_name)) sorted_data_dict: SortedDataDict sorted_data_dict = combine_data_and_header(non_imaging_dataset_dict, non_image_header_dict, opts) try: non_image_pixel_dict = self._construct_pixel_arrays(sorted_data_dict, opts, skip_pixels) except EmptyImageError: pass for seriesUID in non_image_header_dict: if seriesUID in sorted_header_dict: sorted_header_dict[seriesUID].datasets = non_imaging_dataset_dict[seriesUID] else: sorted_header_dict[seriesUID] = non_image_header_dict[seriesUID] sorted_header_dict[seriesUID].datasets = non_imaging_dataset_dict[seriesUID] if seriesUID in pixel_dict: raise IndexError('Duplicate pixel data') elif seriesUID in non_image_pixel_dict: pixel_dict[seriesUID] = non_image_pixel_dict[seriesUID] logger.debug('{}: ending'.format(_name)) return sorted_header_dict, pixel_dict
def _get_dicom_files(self, sources: SourceList ) -> ObjectList: """Get DICOM objects. Args: self: DICOMPlugin instance sources: list of sources to image data Returns: List of tuples of - archive - member """ _name: str = '{}.{}'.format(__name__, self._get_dicom_files.__name__) logger.debug("{}: sources: {} {}".format( _name, type(sources), sources)) object_list: ObjectList = ObjectList() for source in sources: archive = source['archive'] scan_files = source['files'] logger.debug("{}: archive: {}".format(_name, archive)) if scan_files is None or len(scan_files) == 0: if archive.base is not None: scan_files = [archive.base] else: scan_files = ['*'] elif archive.base is not None: raise ValueError('When is archive.base with source[files]') logger.debug("{}: source: {} {}".format(_name, type(source), source)) logger.debug("{}: scan_files: {}".format(_name, scan_files)) for path in archive.getnames(scan_files): logger.debug("{}: member: {}".format(_name, path)) if os.path.basename(path) == "DICOMDIR": continue member = archive.getmembers([path, ]) if len(member) != 1: raise IndexError('Should not be multiple files for a filename') member = member[0] object_list.append((archive, member)) return object_list def _catalog_on_instance_uid(self, object_list: ObjectList, opts: dict = None, skip_pixels: bool = False) \ -> DatasetDict: """Sort files on Series Instance UID Args: self: DICOMPlugin instance object_list: List of (archive, member) tuples opts: input options (dict) skip_pixels: Do not read pixel data (default: False) Returns: Dict of List of Instance """ _name: str = '{}.{}'.format(__name__, self._catalog_on_instance_uid.__name__) logger.debug('{}:'.format(_name)) dataset_dict: DatasetDict = DatasetDict() last_message = '' for archive, member in object_list: try: with archive.open(member, mode='rb') as f: logger.debug('{}: process_member {}'.format(_name, member)) self._extract_member(dataset_dict, f, opts, skip_pixels=skip_pixels) except DoNotIncludeFile as e: last_message = '{}'.format(e) except Exception as e: logger.debug('{}: Exception {}'.format(_name, e)) last_message = '{}'.format(e) if len(object_list) > 0 and len(dataset_dict) < 1: raise NotImageError(last_message) return dataset_dict def _select_imaging_datasets(self, dataset_dict: DatasetDict, opts: dict = None ) \ -> DatasetDict: """Select imaging datasets only Args: self: DICOMPlugin instance dataset_dict: Dict of List of Dataset (DatasetDict) opts: input options (dict) Returns: Dict of List of Instance """ _name: str = '{}.{}'.format(__name__, self._select_imaging_datasets.__name__) # Select datasets on SOPClassUID selected_dataset_dict: DatasetDict = DatasetDict() for seriesUID in dataset_dict: dataset_list = dataset_dict[seriesUID] dataset: Dataset dataset = dataset_list[0] if dataset.SOPClassUID in image_uids: # Keep imaging datasets selected_dataset_dict[seriesUID] = dataset_list logger.debug('{}: end with {}'.format(_name, selected_dataset_dict.keys())) return selected_dataset_dict def _select_non_imaging_datasets(self, dataset_dict: DatasetDict, opts: dict = {} ) \ -> DatasetDict: """Select non-imaging datasets only Args: self: DICOMPlugin instance dataset_dict: Dict of List of Dataset (DatasetDict) opts: input options (dict) Returns: Dict of List of non-imaging Instance """ _name: str = '{}.{}'.format(__name__, self._select_non_imaging_datasets.__name__) # Select datasets on SOPClassUID selected_dataset_dict: DatasetDict = DatasetDict() for seriesUID in dataset_dict: dataset_list = dataset_dict[seriesUID] dataset: Dataset dataset = dataset_list[0] if dataset.SOPClassUID not in image_uids: # Keep non-imaging datasets selected_dataset_dict[seriesUID] = dataset_list logger.debug('{}: end with {}'.format(_name, selected_dataset_dict.keys())) return selected_dataset_dict def _extract_member(self, image_list: DatasetDict, member: Dataset | Member | str, opts: dict = None, skip_pixels: bool = False): im: Dataset if issubclass(type(member), Dataset): im = member else: # Read the DICOM object try: im = pydicom.filereader.dcmread(member, stop_before_pixels=skip_pixels) except pydicom.errors.InvalidDicomError as e: raise DoNotIncludeFile('Invalid Dicom Error: {}'.format(e)) # Verify that the DICOM object has pixel data if not skip_pixels: try: _ = len(im.pixel_array) except AttributeError: pass # raise DoNotIncludeFile('No pixel data in DICOM object') if 'input_serinsuid' in opts and opts['input_serinsuid'] is not None: if im.SeriesInstanceUID != opts['input_serinsuid']: raise DoNotIncludeFile('Series Instance UID not selected') if 'input_echo' in opts and opts['input_echo'] is not None: if int(im.EchoNumbers) != int(opts['input_echo']): raise DoNotIncludeFile('Echo Number not selected') if 'input_acquisition' in opts and opts['input_acquisition'] is not None: if int(im.AcquisitionNumber) != int(opts['input_acquisition']): raise DoNotIncludeFile('Acquisition Number not selected') # Catalog images with ref as key acquisition_number = echo_number = None series_instance_uid = im.SeriesInstanceUID if 'ignore_series_uid' in opts and opts['ignore_series_uid']: series_instance_uid = None if 'split_acquisitions' in opts and opts['split_acquisitions']: acquisition_number = im.AcquisitionNumber if 'split_echo_numbers' in opts and opts['split_echo_numbers']: echo_number = im.EchoNumbers ref = SeriesUID(im.PatientID, im.StudyInstanceUID, series_instance_uid, acquisition_number, echo_number) image_list[ref].append(Instance(im)) def _sort_datasets(self, image_dict: DatasetDict, input_order: str, opts: dict = None ) -> (SortedDataDict, dict[str]): """Sort datasets on slice and tags. Args: image_dict: Input datasets (DatasetDict) input_order: Input order (str) opts: Input options (dict) Returns: tuple of SortedDataDict and dict Process: _sort_datasets() takes an unordered DatasetDict, sorts on slice and tags, creates a Header instance, and returns a SortedDataDict of SortedData. For each dataset: sorted_dataset_list = self._sort_dataset_geometry(dataset_list) -> spacing, tranformationMatrix, imagePositions _sort_dataset_geometry() takes an unordered DatasetDict, sorts by slice location, and adds spacing, transformationMatrix and imagePositions. slice_count = verify_consistent_slices(sorted_dataset_list) <- from SortedDatasetList : series[sloc] # Determine (automatic) sorting sorting[seriesUID] = determine_sorting(sorted_dataset_list) Catalog tag values tag_values, shape = scan_tags(seriesUID) Sort the tags and index the images shape = () axes = () tag_db = {} im.slice_index im.tag_index calculate_shape calculate_shape_with_duplicates -> tags # Sort the dataset on selected key for each sloc sorted_dataset_list[sloc].sort(key=partial(get_tag_value, ...)) _extract_all_tags() takes existing SortedDatasetList and fills in additional information. Extract additional DICOM tags. Find maximum shape in slices. Place each image on the proper tag. """ def _cmp_tags(im1: Instance, im2: Instance) -> int: tags1 = im1.get_tag_tuple(None, sorting[seriesUID], self.input_options) tags2 = im2.get_tag_tuple(None, sorting[seriesUID], self.input_options) return compare_tuples(tags1, tags2) _name: str = '{}.{}'.format(__name__, self._sort_datasets.__name__) skip_broken_series = 'skip_broken_series' in opts and opts['skip_broken_series'] # Sort datasets on sloc sorted_data_dict: SortedDataDict = SortedDataDict() sorting: dict[SeriesUID, str] sorting = {} for seriesUID in image_dict: sorting[seriesUID] = 'none' dataset_list: DatasetList dataset_list = image_dict[seriesUID] message: str = f'{dataset_list}' message2: str = '' sorted_dataset_list: SortedDatasetList = None try: sorted_dataset_list = self._sort_dataset_geometry(dataset_list, message, opts) except CannotSort as e: message2 = '{}'.format(e) logger.debug('{}: _sort_dataset_geometry CannotSort: {}'.format(_name, e)) if skip_broken_series: continue if sorted_dataset_list is None: raise CannotSort('Cannot sort: {}'.format(message2)) _: Counter = verify_consistent_slices(sorted_dataset_list, message, opts) header = sorted_dataset_list.get_headers() header.sliceLocations = np.array(sorted(sorted_dataset_list.keys())) # Determine (automatic) sorting try: sorting[seriesUID] = determine_sorting( sorted_dataset_list, input_order, self.input_options ) except CannotSort: if skip_broken_series: logger.debug(f'{_name}: skip_broken_series continue {seriesUID}') continue # Next series else: logger.debug(f'{_name}: skip_broken_series raise') raise header.input_order = sorting[seriesUID] # Sort the dataset on selected key for each sloc for sloc in sorted(sorted_dataset_list.keys()): try: sorted_dataset_list[sloc].sort(key=cmp_to_key(_cmp_tags)) except (ValueError, TypeError): pass # Catalog actual tag values keeping other tags fixed # Important when sorting acquisition time which differ from slice to slice axes, tags = scan_tags(sorted_dataset_list, sorting[seriesUID], self.input_options) sorted_dataset_list.axes = self._get_axes(sorted_dataset_list) header.axes = self._join_axes(sorted_dataset_list, axes) header.tags = tags header.SOPInstanceUIDs = self._get_sop_ins_uids(sorted_dataset_list) header.dicomTemplate = sorted_dataset_list[next(iter(sorted_dataset_list))][0] header.geometryIsDefined = True # Catalog images with seriesUID and sloc as keys sorted_data_dict[seriesUID] = SortedData((sorted_dataset_list, header)) logger.debug('{}: end with {}'.format(_name, sorted_data_dict.keys())) return sorted_data_dict, sorting def _get_non_image_headers(self, dataset_dict: DatasetDict, opts: dict = None ) -> SortedHeaderDict: """Get DICOM headers for non-image datasets""" _name: str = '{}.{}'.format(__name__, self._get_non_image_headers.__name__) skip_broken_series = False if 'skip_broken_series' in opts: skip_broken_series = opts['skip_broken_series'] sorted_header_dict: SortedHeaderDict = SortedHeaderDict() for seriesUID in dataset_dict: series_dataset: DatasetList = dataset_dict[seriesUID] hdr = Header() hdr.input_format = 'dicom' hdr.input_order = 'none' if len(series_dataset) == 0: raise ValueError("No DICOM images found.") try: self._extract_non_image_dicom_attributes(series_dataset, hdr, opts=opts) hdr.set_default_values(hdr.axes) sorted_header_dict[seriesUID] = hdr except CannotSort: if skip_broken_series: logger.debug( '{}: skip_broken_series continue {}'.format( _name, seriesUID )) continue # Next series else: logger.debug('{}: skip_broken_series raise'.format(_name)) raise logger.debug('{}: end with {}'.format(_name, sorted_header_dict.keys() )) return sorted_header_dict def _construct_pixel_arrays(self, sorted_data_dict: SortedDataDict, opts: dict = None, skip_pixels: bool = False ) -> PixelDict: _name: str = '{}.{}'.format(__name__, self._construct_pixel_arrays.__name__) skip_broken_series = 'skip_broken_series' in opts and opts['skip_broken_series'] pixel_dict: PixelDict = PixelDict() for seriesUID in sorted_data_dict: sorted_dataset_list: SortedDatasetList header: Header sorted_dataset_list, header = sorted_data_dict[seriesUID] setattr(header, 'keep_uid', True) si = None if not skip_pixels: # Extract pixel data try: si = self._construct_pixel_array( sorted_dataset_list, header, header.shape, opts=opts ) except TypeError: pass except Exception: if skip_broken_series: logger.debug( '{}: skip_broken_series continue {}'.format( _name, seriesUID )) continue else: logger.debug('{}: skip_broken_series raise'.format(_name)) raise if si is not None: pixel_dict[seriesUID] = si return pixel_dict def _construct_pixel_array(self, image_dict: SortedDatasetList, hdr: Header, shape: tuple, opts: dict = None ) -> np.ndarray: def _copy_pixels(_si, _hdr, _image_dict): _name: str = '{}.{}'.format(__name__, _copy_pixels.__name__) faulty = 0 for _slice, _sloc in enumerate(sorted(_image_dict)): _done = {} for im in _image_dict[_sloc]: tag = im.tags idx = im.tag_index if idx in _done and not accept_duplicate_tag: raise CannotSort("Overwriting data at index {}, tag {}\n".format(idx, tag) + "Maybe try accept_duplicate_tag=True?") _done[idx] = True idx += (_slice,) # Simplify index when image is 3D, remove tag index logger.debug("{}: si.ndim {}, idx {}".format(_name, _si.ndim, idx)) if _si.ndim == 3: idx = idx[len(tag):] try: logger.debug("{}: get idx {} shape {}".format(_name, idx, _si[idx].shape)) if _si.ndim > 2: _si[idx] = im.get_pixels_with_shape(_si[idx].shape) else: _si[...] = im.get_pixels_with_shape(_si.shape) except Exception as e: logger.warning("{}: Cannot read pixel data: {}".format(_name, e)) raise del im faulty += 1 def _copy_pixels_from_frames(_si, _hdr, _image_dict): _name: str = '{}.{}'.format(__name__, _copy_pixels_from_frames.__name__) assert len(_image_dict) == _si.shape[0], "Do not know how to unpack frames and slices" if _si.ndim > 3: for i, im in enumerate(_image_dict): try: logger.debug("{}: get shape {}".format(_name, _si.shape)) _si[i] = im.get_pixels_with_shape(_si.shape[1:]) except Exception as e: logger.warning("{}: Cannot read pixel data: {}".format(_name, e)) raise del im else: try: im = image_dict[next(iter(image_dict))][0] except TypeError: im = image_dict[0] try: logger.debug("{}: get shape {}".format(_name, _si.shape)) _si[...] = im.get_pixels_with_shape(_si.shape) except Exception as e: logger.warning("{}: Cannot read pixel data: {}".format(_name, e)) raise del im _name: str = '{}.{}'.format(__name__, self._construct_pixel_array.__name__) opts = {} if opts is None else opts accept_duplicate_tag = 'accept_duplicate_tag' in opts and opts['accept_duplicate_tag'] # Look-up first image to determine pixel type try: im: Instance = image_dict[next(iter(image_dict))][0] except TypeError: im: Instance = image_dict[0] if 'BitsAllocated' not in im: raise EmptyImageError("No pixel data in instance.") hdr.photometricInterpretation = 'MONOCHROME2' if 'PhotometricInterpretation' in im: hdr.photometricInterpretation = im.PhotometricInterpretation matrix_dtype = np.uint16 if 'PixelRepresentation' in im: if im.PixelRepresentation == 1: matrix_dtype = np.int16 if 'RescaleSlope' in im and 'RescaleIntercept' in im and \ (abs(im.RescaleSlope - 1) > 1e-4 or abs(im.RescaleIntercept) > 1e-4): matrix_dtype = float elif im.BitsAllocated == 8: if hdr.color: matrix_dtype = np.dtype([('R', 'u1'), ('G', 'u1'), ('B', 'u1')]) else: matrix_dtype = np.uint8 elif im.BitsAllocated == 1: matrix_dtype = np.bool_ logger.debug('{}: matrix_dtype {}'.format(_name, matrix_dtype)) # Load DICOM image data logger.debug('{}: shape {}'.format(_name, shape)) si = np.zeros(shape, matrix_dtype) if 'NumberOfFrames' in im and im.NumberOfFrames > 1: _copy_pixels_from_frames(si, hdr, image_dict) else: _copy_pixels(si, hdr, image_dict) # Simplify shape self._reduce_shape(si, hdr.axes) logger.debug('{}: si {}'.format(_name, si.shape)) return si def _extract_non_image_dicom_attributes(self, series: DatasetList, hdr: Header, opts: dict = None ) -> None: """Extract DICOM attributes Args: self: DICOMPlugin instance series: DatasetList hdr: existing header (Header) opts: Returns: hdr: header - seriesNumber - seriesDescription - imageType - modality, laterality, protocolName, bodyPartExamined - seriesDate, seriesTime """ dataset = series[0] dataset.copy_attributes_to_header(hdr) if 'Rows' not in dataset: return # Construct axes. Required to determine matrix shape if len(series) > 1: descriptions = [] for im in series: ss = im.SegmentSequence # ars = ss[0].AnatomicRegionSequence # code_value = ars[0].CodeValue # spccs = ss[0].SegmentedPropertyCategoryCodeSequence sptcs = ss[0].SegmentedPropertyTypeCodeSequence code_value = sptcs[0].CodeValue code_meaning = sptcs[0].CodeMeaning descriptions.append('{}:{}'.format(code_value, code_meaning)) hdr.axes = namedtuple('Axes', [ 'text', 'slice', 'row', 'column' ])(VariableAxis('text', descriptions), UniformLengthAxis('slice', 0, dataset.NumberOfFrames, 1), UniformLengthAxis('row', 1, dataset.Rows, 1), # dataset.PixelSpacing[0] UniformLengthAxis('column', 2, dataset.Columns, 1)) # dataset.PixelSpacing[1] hdr.input_order = 'text' elif 'NumberOfFrames' in dataset: hdr.axes = namedtuple('Axes', [ 'slice', 'row', 'column' ])(UniformLengthAxis('slice', 0, dataset.NumberOfFrames, 1), UniformLengthAxis('row', 1, dataset.Rows, 1), # dataset.PixelSpacing[0] UniformLengthAxis('column', 2, dataset.Columns, 1)) # dataset.PixelSpacing[1] else: hdr.axes = namedtuple('Axes', [ 'row', 'column' ])(UniformLengthAxis('row', 0, dataset.Rows, 1), # dataset.PixelSpacing[0] UniformLengthAxis('column', 1, dataset.Columns, 1)) # dataset.PixelSpacing[1] def _sort_dataset_geometry(self, dataset_list: DatasetList, message: str, opts: dict = None) -> SortedDatasetList: # _name: str = '{}.{}'.format(__name__, self._sort_dataset_geometry.__name__) def _get_spacing(dataset_list: DatasetList) -> np.ndarray: _name: str = '{}.{}'.format(__name__, _get_spacing.__name__) # Spacing dr = dc = 1.0 try: pixel_spacing = dataset_list.getDicomAttribute(tag_for_keyword("PixelSpacing")) if pixel_spacing is not None: # Notice that DICOM row spacing comes first, column spacing second! dr = float(pixel_spacing[0]) dc = float(pixel_spacing[1]) except (AttributeError, KeyError) as e: logger.debug('{}: {}'.format(_name, e)) pass try: slice_spacing = float(dataset_list.getDicomAttribute(tag_for_keyword("SpacingBetweenSlices"))) except KeyError: try: slice_spacing = float(dataset_list.getDicomAttribute(tag_for_keyword("SliceThickness"))) except KeyError: slice_spacing = 1.0 return np.array([slice_spacing, dr, dc]) def _verify_no_gantry_tilt(dataset_list: DatasetList) -> None: try: gantry = dataset_list.getDicomAttributeValues(tag_for_keyword("GantryDetectorTilt")) if gantry is None: return gantry = np.unique(gantry) if len(gantry) > 1: raise CannotSort('{}: More than one Gantry/Detector Tilt'.format(message)) elif len(gantry) == 1: if gantry[0] != 0.0: raise CannotSort('{}: Gantry/Detector Tilt is not zero'.format(message)) except Exception: raise def _get_orientation(dataset_list: DatasetList) -> list[np.ndarray]: # iops = self.getDicomAttribute(dictionary, tag_for_keyword("ImageOrientationPatient")) orients = [] for s in range(len(dataset_list)): # orient = dataset_list.get_image_orientation_patient() orients.append(dataset_list.get_image_orientation_patient()) if self.dir_cosine_tolerance == 0.0: if len(orients) != 1: found = None for it in orients: if found is None: found = it elif (it != found).all(): raise CannotSort('{}: More than one IOP. Try changing dir_cosine_tolerance'.format(message)) if found is None: raise CannotSort('{}: No IOP.'.format(message)) return orients[0] def _verify_single_frame_of_reference(dataset_list: DatasetList): frames = dataset_list.getDicomAttributeValues(tag_for_keyword("FrameOfReferenceUID")) frames = sorted(set(frames)) if len(frames) > 1: logger.warning('{}: Multiple values of FrameOfReferenceUID'.format(message)) def _calculate_distances(dataset_list: DatasetList, orient: np.ndarray, spacing: np.ndarray, opts: dict = None)\ -> list[np.ndarray, np.ndarray]: # _name: str = '{}.{}'.format(__name__, _calculate_distances.__name__) sort_on_slice_location = 'sort_on_slice_location' in opts and opts['sort_on_slice_location'] # Calculate slice normal from IOP, will be the same for all slices colr = np.array(orient[:3]).reshape(3, 1) colc = np.array(orient[3:]).reshape(3, 1) colr = colr / np.linalg.norm(colr) colc = colc / np.linalg.norm(colc) normal = np.cross(colc, colr, axis=0).reshape(3) # For each slice, calculate distance along the slice normal using IPP distances = [] ipps = [] for _slice in range(len(dataset_list)): ipp = self.getOriginForSlice(dataset_list, _slice) if ipp is None: ipp = np.array([0.0, 0.0, 0.0]) if self.dir_cosine_tolerance != 0.0: orient2 = orient[_slice] colr2 = np.array(orient2[:3]).reshape(3, 1) colc2 = np.array(orient2[3:]).reshape(3, 1) colr2 = colr2 / np.linalg.norm(colr2) colc2 = colc2 / np.linalg.norm(colc2) normal2 = np.cross(colr2, colc2, axis=0) cd = sum(normal[:] * normal2[:])[0] if np.fabs(1 - cd) > self.dir_cosine_tolerance: raise CannotSort('{}: Problem with dir_cosine_tolerance'.format(message)) dist = np.dot(normal, ipp) distances.append(dist) ipps.append(ipp) # Determine sorting of the slices based on distance distances = np.array(distances) distance_idx = np.argsort(distances) unique_distances = np.unique(distances) # Construct transformationMatrix slices = len(unique_distances) T0 = ipps[distance_idx[0]] Tn = ipps[distance_idx[-1]] k = ((Tn - T0) / (slices - 1)) if slices > 1 else np.array([0, 0, 1]) transform = np.eye(4) transform[:3, :4] = np.hstack([ k.reshape(3, 1), colc.reshape(3, 1) * spacing[1], colr.reshape(3, 1) * spacing[2], T0.reshape(3, 1)]) if sort_on_slice_location: # If we do not trust sorting on ipp, repeat with slice locations distances = [] for _slice in range(len(dataset_list)): try: dist = float(self.getDicomAttribute(dataset_list, tag_for_keyword("SliceLocation"), _slice)) except TypeError: raise CannotSort('{}: Missing SliceLocation'.format(message)) distances.append(dist) distances = np.array(distances) distance_idx = np.argsort(distances) unique_distances = np.unique(distances) if len(unique_distances) != slices: raise CannotSort('{}: Problem with sorting, {} unique distances do not match {} slices'.format( message, len(unique_distances), slices )) # Sort imagePositions imagePositions = {} for i in range(slices): pos = np.where(distances == unique_distances[i])[0][0] imagePositions[i] = ipps[pos] return distances, distance_idx, transform, imagePositions def _verify_spacing(distances: np.ndarray): # Verify spacing spacings = [] spacing_is_good = True has_warned = False d = np.unique(distances) prev = d[0] if len(d) > 1: current = d[1] slice_spacing = current - prev for it in range(1, len(d)): current = d[it] spacings.append(abs(current - prev)) if abs(current - prev) - slice_spacing > self.slice_tolerance: if not has_warned: logger.warning('{}: Slice spacing differs too much, {} vs {}. Decrease slice_tolerance.'.format( message, abs(current - prev), slice_spacing )) has_warned = True spacing_is_good = False prev = current if not spacing_is_good: raise CannotSort( '{}: Slice spacing varies:\n Distances: {}\n Spacing: {}'.format( message, distances, spacings )) spacing = _get_spacing(dataset_list) _verify_no_gantry_tilt(dataset_list) orient = _get_orientation(dataset_list) _verify_single_frame_of_reference(dataset_list) distances, distance_idx, transform, ipps = _calculate_distances(dataset_list, orient, spacing, opts) _verify_spacing(distances) # Sort dataset on distances sorted_dataset: SortedDatasetList = SortedDatasetList() sorted_dataset.spacing = spacing sorted_dataset.transformationMatrix = transform sorted_dataset.imagePositions = ipps for idx in distance_idx: distance = distances[idx] # Catalog images with distance (sloc) as key sorted_dataset[distance].append(dataset_list[idx]) return sorted_dataset def _get_axes(self, sorted_dataset: SortedDatasetList) -> tuple[Axis]: frames = None rows = columns = 0 for _slice, sloc in enumerate(sorted_dataset): im: Instance for im in sorted_dataset[sloc]: rows = max(rows, im.Rows) columns = max(columns, im.Columns) if 'NumberOfFrames' in im: frames = im.NumberOfFrames if frames is not None and frames > 1: slices = frames else: slices = len(sorted_dataset) ipp = sorted_dataset.imagePositions[0] slice_axis = UniformLengthAxis('slice', ipp[0], slices, sorted_dataset.spacing[0]) row_axis = UniformLengthAxis('row', ipp[1], rows, sorted_dataset.spacing[1]) column_axis = UniformLengthAxis('column', ipp[2], columns, sorted_dataset.spacing[2]) return slice_axis, row_axis, column_axis def _join_axes(self, sorted_dataset: SortedDatasetList, axes: tuple[Axis]) -> namedtuple: """Join given sorted dataset with given axes. """ geometry_axes = sorted_dataset.axes if len(geometry_axes[0]) <= 1 and not len(axes): geometry_axes = geometry_axes[1:] tag_axes = [] axis_names = [] if len(axes): for axis in axes: if len(axis) > 1: tag_axes.append(axis) axis_names.append(axis.name) for axis in geometry_axes: tag_axes.append(axis) axis_names.append(axis.name) Axes = namedtuple('Axes', axis_names) axes = Axes(*tag_axes) return axes def _get_sop_ins_uids(self, sorted_dataset: SortedDatasetList) -> dict: SOPInstanceUIDs = {} for _slice, sloc in enumerate(sorted_dataset): im: Instance for im in sorted_dataset[sloc]: _idx = im.tag_index SOPInstanceUIDs[_idx + (_slice,)] = im.SOPInstanceUID return SOPInstanceUIDs
[docs] def getOriginForSlice(self, dictionary, slice): """Get origin of given slice. Args: self: DICOMPlugin instance dictionary: image dictionary slice: slice number (int) Returns: z,y,x: coordinate for origin of given slice (np.array) """ try: origin = self.getDicomAttribute(dictionary, tag_for_keyword("ImagePositionPatient"), slice) if origin is not None: x = float(origin[0]) y = float(origin[1]) z = float(origin[2]) return np.array([z, y, x]) except TypeError: pass if issubclass(type(slice), Dataset): d = slice s = 0 else: d = dictionary s = slice while not issubclass(type(d), Dataset): if issubclass(type(d), dict): d = d[next(iter(d))] elif issubclass(type(d), (tuple, list)): for d in d: if issubclass(type(d), Dataset): break try: origin = self.getDicomAttribute(d, tag_for_keyword("ImagePositionPatient"), s) except TypeError: origin = self.getDicomAttribute(slice, tag_for_keyword("ImagePositionPatient"), 0) if origin is not None: x = float(origin[0]) y = float(origin[1]) z = float(origin[2]) return np.array([z, y, x]) return None
[docs] def setDicomAttribute(self, dictionary, tag, value): """Set a given DICOM attribute to the provided value. Ignore if no real dicom header exists. Args: self: DICOMPlugin instance dictionary: image dictionary tag: DICOM tag of addressed attribute. value: Set attribute to this value. """ if dictionary is not None: for _slice in dictionary: for tg, im in dictionary[_slice]: if tag not in im: VR = pydicom.datadict.dictionary_VR(tag) im.add_new(tag, VR, value) else: im[tag].value = value
[docs] def getDicomAttribute(self, dictionary, tag, slice=0): """Get DICOM attribute from first image for given slice. Args: self: DICOMPlugin instance dictionary: image dictionary tag: DICOM tag of requested attribute. slice: which slice to access. Default: slice=0 """ assert dictionary is not None, "dicomplugin.getDicomAttribute: dictionary is None" try: _, im = dictionary[slice][next(iter(dictionary[slice]))] except TypeError: try: _, im = dictionary[slice][0] except (KeyError, TypeError): im = dictionary[slice] except KeyError: try: im = dictionary[slice] except KeyError: im = dictionary if tag in im: return im[tag].value else: return None
[docs] def removePrivateTags(self, dictionary): """Remove private DICOM attributes. Ignore if no real dicom header exists. Args: self: DICOMPlugin instance dictionary: image dictionary """ if dictionary is not None: for _slice in dictionary: for tg, im in dictionary[_slice]: im.remove_private_tags()
def _read_image(self, f, opts, hdr): """Read image data from given file handle Args: self: format plugin instance f: file handle or filename (depending on self._need_local_file) opts: Input options (dict) hdr: Header Returns: Tuple of - hdr: Header Return values: - info: Internal data for the plugin None if the given file should not be included (e.g. raw file) - si: numpy array (multi-dimensional) """ pass def _set_tags(self, image_list, hdr, si): """Set header tags. Args: self: format plugin instance image_list: list with (info,img) tuples hdr: Header si: numpy array (multi-dimensional) Returns: hdr: Header """ pass def _correct_acqtimes_for_dynamic_series(self, hdr: Header, si: np.ndarray): # si[t,slice,rows,columns] _name: str = '{}.{}'.format(__name__, self._correct_acqtimes_for_dynamic_series.__name__) # Extract acqtime for each image slices = len(hdr.sliceLocations) timesteps = self._count_timesteps(hdr) logger.info( "{}: Slices: {}, apparent time steps: {}, actual time steps: {}".format( _name, slices, len(hdr.tags), timesteps)) new_shape = (timesteps, slices, si.shape[2], si.shape[3]) newsi = np.zeros(new_shape, dtype=si.dtype) acq = np.zeros([slices, timesteps]) for _slice in self.DicomHeaderDict: t = 0 for tg, im in self.DicomHeaderDict[_slice]: acq[_slice, t] = tg t += 1 # Correct acqtimes by setting acqtime for each slice of a volume to # the smallest time for t in range(acq.shape[1]): min_acq = np.min(acq[:, t]) for _slice in range(acq.shape[0]): acq[_slice, t] = min_acq # Set new acqtime for each image for _slice in self.DicomHeaderDict: t = 0 for tg, im in self.DicomHeaderDict[_slice]: im.AcquisitionTime = "%f" % acq[_slice, t] newsi[t, _slice, :, :] = si[t, _slice, :, :] t += 1 # Update taglist in hdr hdr.tags = {} for _slice in self.DicomHeaderDict: hdr.tags[_slice] = np.empty((acq.shape[1],)) for t in range(acq.shape[1]): hdr.tags[_slice][t] = acq[0, t] return newsi @staticmethod def _count_timesteps(hdr): slices = len(hdr.sliceLocations) timesteps = np.zeros([slices], dtype=int) for _slice in hdr.DicomHeaderDict: timesteps[_slice] = len(hdr.DicomHeaderDict[_slice]) if timesteps.min() != timesteps.max(): raise ValueError("Number of time steps ranges from %d to %d." % ( timesteps.min(), timesteps.max())) return timesteps.max()
[docs] def write_3d_numpy(self, si: Series, destination, opts): """Write 3D Series image as DICOM files Args: self: DICOMPlugin instance si: Series array (3D or 4D) destination: dict of archive and filenames opts: Output options (dict) """ _name: str = '{}.{}'.format(__name__, self.write_3d_numpy.__name__) logger.debug('{}: destination {}'.format(_name, destination)) archive = destination['archive'] archive.set_member_naming_scheme( fallback='Image_{:05d}.dcm', level=max(0, si.ndim - 2), default_extension='.dcm', extensions=self.extensions ) self.keep_uid = False if 'keep_uid' not in opts else opts['keep_uid'] self.instanceNumber = 0 logger.debug('{}: orig shape {}, slices {} len {}'.format( _name, si.shape, si.slices, si.ndim)) assert si.ndim in [0, 2, 3], \ "write_3d_series: input dimension %d is not 2D/3D." % si.ndim if si.ndim > 0: self._calculate_rescale(si) logger.info("{}: Smallest/largest pixel value in series: {}/{}".format( _name, self.smallestPixelValueInSeries, self.largestPixelValueInSeries)) if 'window' in opts and opts['window'] == 'original': raise ValueError('No longer supported: opts["window"] is set') self.center = si.windowCenter self.width = si.windowWidth self.today = date.today().strftime("%Y%m%d") self.now = datetime.now().strftime("%H%M%S.%f") # Set series instance UID when writing self.serInsUid = si.header.seriesInstanceUID if self.keep_uid else si.header.new_uid() logger.debug("{}: {}".format(_name, self.serInsUid)) for key, value in opts.items(): # Copy opts to self.input_options self.input_options[key] = value if pydicom.uid.UID(si.SOPClassUID).keyword == 'EnhancedMRImageStorage' or \ pydicom.uid.UID(si.SOPClassUID).keyword == 'EnhancedCTImageStorage': # Write Enhanced CT/MR self.write_enhanced(si, destination) else: # Either legacy CT/MR, or another modality if si.ndim < 3: logger.debug('{}: write 2D ({})'.format(_name, si.ndim)) if self.keep_uid: sop_ins_uid = si.SOPInstanceUIDs[(0, 0)] else: sop_ins_uid = si.header.new_uid() self.write_slice('none', None, si, destination, 0, sop_ins_uid=sop_ins_uid) else: logger.debug('{}: write 3D slices {}'.format(_name, si.slices)) for _slice in range(si.slices): if self.keep_uid: sop_ins_uid = si.SOPInstanceUIDs[(0, _slice)] else: sop_ins_uid = si.header.new_uid() try: self.write_slice('none', (_slice,), si[_slice], destination, _slice, sop_ins_uid=sop_ins_uid) except Exception: traceback.print_exc(file=sys.stdout) raise
[docs] def write_4d_numpy(self, si: Series, destination, opts): """Write 4D Series image as DICOM files si.series_number is inserted into each dicom object si.series_description is inserted into each dicom object si.image_type: Dicom image type attribute opts['output_sort']: Which tag will sort the output images (slice or tag) opts['output_dir']: Store all images in a single or multiple directories Args: self: DICOMPlugin instance si: Series array si[tag,slice,rows,columns] destination: dict of archive and filenames opts: Output options (dict) """ _name: str = '{}.{}'.format(__name__, self.write_4d_numpy.__name__) logger.debug('{}: destination {}'.format(_name, destination)) archive = destination['archive'] self.keep_uid = False if 'keep_uid' not in opts else opts['keep_uid'] # Defaults self.output_sort = SORT_ON_SLICE if 'output_sort' in opts: self.output_sort = opts['output_sort'] self.output_dir = 'single' if 'output_dir' in opts: self.output_dir = opts['output_dir'] self.instanceNumber = 0 logger.debug('{}: orig shape {}, len {}'.format(_name, si.shape, si.ndim)) assert si.ndim >= 4, "write_4d_series: input dimension %d is less than 4D." % si.ndim tags = si.tags[0].ndim steps = si.shape[:tags] self._calculate_rescale(si) logger.info("{}: Smallest/largest pixel value in series: {}/{}".format( _name, self.smallestPixelValueInSeries, self.largestPixelValueInSeries)) self.today = date.today().strftime("%Y%m%d") self.now = datetime.now().strftime("%H%M%S.%f") # Not used # self.seriesTime = obj.getDicomAttribute(tag_for_keyword("AcquisitionTime")) # Set series instance UID when writing if not self.keep_uid: si.header.seriesInstanceUID = si.header.new_uid() self.serInsUid = si.header.seriesInstanceUID self.input_options = opts if pydicom.uid.UID(si.SOPClassUID).keyword == 'EnhancedMRImageStorage' or \ pydicom.uid.UID(si.SOPClassUID).keyword == 'EnhancedCTImageStorage': # Write Enhanced CT/MR self.write_enhanced(si, destination) return # Either legacy CT/MR, or another modality if self.output_sort == SORT_ON_SLICE: if self.output_dir == 'single': # Filenames: Image_00000.dcm, sort slice fastest archive.set_member_naming_scheme( fallback='Image_{:05d}.dcm', level=1, default_extension='.dcm', extensions=self.extensions ) else: # self.output_dir == 'multi' # Filenames: Tag0/../TagN/Image_00000.dcm, sort slice fastest dirn = [] for i, order in enumerate(si.input_order.split(',')): digits = len("{}".format(steps[i])) dirn.append( "{0}{{{1}:0{2}}}".format( order, i, digits) ) archive.set_member_naming_scheme( fallback=os.path.join( *dirn, 'Image_{' + '{}'.format(len(dirn)) + ':05d}.dcm'), level=max(0, si.ndim - 2), default_extension='.dcm', extensions=self.extensions ) ifile = 0 for tag in np.ndindex(steps): for _slice in range(si.slices): if si.tags[_slice][tag] is None: continue _tag = tag + (_slice,) if self.keep_uid: sop_ins_uid = si.SOPInstanceUIDs[tag + (_slice,)] else: sop_ins_uid = si.header.new_uid() if self.output_dir == 'multi' and _slice == 0: # Restart file number in each subdirectory ifile = 0 if self.output_dir == 'multi': _file_tag = _tag else: _file_tag = (ifile,) try: _t = si.header.tags[_slice][tag] if _t is None: continue self.write_slice(si.input_order, _file_tag, si[_tag], destination, ifile, tag_value=si.header.tags[_slice][tag], sop_ins_uid=sop_ins_uid) except Exception: traceback.print_exc(file=sys.stdout) raise ifile += 1 else: # self.output_sort == SORT_ON_TAG: if self.output_dir == 'single': # Filenames: Image_00000.dcm, sort tags fastest archive.set_member_naming_scheme( fallback='Image_{:05d}.dcm', level=1, default_extension='.dcm', extensions=self.extensions ) else: # self.output_dir == 'multi' # Filenames: slice/tag0/../tagN/Image_00000.dcm, sort tags fastest digits = len("{}".format(si.slices)) dirn = ["slice{{0:0{0}}}".format(digits)] for i, order in enumerate(si.input_order.split(',')[:-1]): digits = len("{}".format(steps[i])) dirn.append( "{0}{{{1}:0{2}}}".format( order, i + 1, digits ) ) order = si.input_order.split(',')[-1] digits = len("{}".format(steps[-1])) archive.set_member_naming_scheme( fallback=os.path.join( *dirn, order + '{' + '{}'.format(len(dirn)) + ':0{}'.format(digits) + '}.dcm'), level=max(0, si.ndim - 2), default_extension='.dcm', extensions=self.extensions ) ifile = 0 for _slice in range(si.slices): for tag in np.ndindex(steps): _tag = (_slice,) + tag if si.tags[_slice][tag] is None: continue if self.keep_uid: sop_ins_uid = si.SOPInstanceUIDs[tag + (_slice,)] else: sop_ins_uid = si.header.new_uid() if self.output_dir == 'multi' and tag == 0: # Restart file number in each subdirectory ifile = 0 if self.output_dir == 'multi': _file_tag = _tag else: _file_tag = (ifile,) try: _t = si.header.tags[_slice][tag] if _t is None: continue self.write_slice(si.input_order, _file_tag, si[tag + (_slice,)], destination, ifile, tag_value=si.header.tags[_slice][tag], sop_ins_uid=sop_ins_uid) except Exception: traceback.print_exc(file=sys.stdout) raise ifile += 1
[docs] def write_enhanced(self, si, archive, filename_template, opts): """Write enhanced CT/MR object to DICOM file Args: self: DICOMPlugin instance si: Series instance, including these attributes: archive: archive object filename_template: file name template, possible without '.dcm' extension opts: Output options (dict) Raises: """ _name: str = '{}.{}'.format(__name__, self.write_enhanced.__name__) filename = 'dummy' logger.debug("{}: {} {}".format(_name, filename, self.serInsUid)) try: tg, member_name, im = si.DicomHeaderDict[0][0] except (KeyError, IndexError): raise IndexError("Cannot address dicom_template.DicomHeaderDict[0][0]") except ValueError: raise NoDICOMAttributes("Cannot write DICOM object when no DICOM attributes exist.") logger.debug("{}: member_name {}".format(_name, member_name)) self.keep_uid = False if 'keep_uid' not in opts else opts['keep_uid'] if not self.keep_uid: si.header.seriesInstanceUID = si.header.new_uid() self.serInsUid = si.header.seriesInstanceUID ds = self.construct_enhanced_dicom(filename_template, im, si) # Add header information try: ds.SliceLocation = si.sliceLocations[0] except (AttributeError, ValueError): # Dont know the SliceLocation, attempt to calculate from image geometry try: ds.SliceLocation = im.calculate_slice_location() except ValueError: # Dont know the SliceLocation, so will set this to be the slice index ds.SliceLocation = slice try: dz, dy, dx = si.spacing except ValueError: dz, dy, dx = 1, 1, 1 ds.PixelSpacing = [str(dy), str(dx)] ds.SliceThickness = str(dz) try: ipp = si.imagePositions if len(ipp) > 0: ipp = ipp[0] else: ipp = np.array([0, 0, 0]) except ValueError: ipp = np.array([0, 0, 0]) if ipp.shape == (3, 1): ipp.shape = (3,) z, y, x = ipp[:] ds.ImagePositionPatient = [str(x), str(y), str(z)] # Reverse orientation vectors from zyx to xyz try: ds.ImageOrientationPatient = [ si.orientation[2], si.orientation[1], si.orientation[0], si.orientation[5], si.orientation[4], si.orientation[3]] except ValueError: ds.ImageOrientationPatient = [0, 0, 1, 0, 1, 0] try: ds.SeriesNumber = si.seriesNumber except ValueError: ds.SeriesNumber = 1 try: ds.SeriesDescription = si.seriesDescription except ValueError: ds.SeriesDescription = '' try: ds.ImageType = "\\".join(si.imageType) except ValueError: ds.ImageType = 'DERIVED\\SECONDARY' try: ds.FrameOfReferenceUID = si.frameOfReferenceUID except ValueError: pass ds.SmallestPixelValueInSeries = np.uint16(self.smallestPixelValueInSeries) ds.LargestPixelValueInSeries = np.uint16(self.largestPixelValueInSeries) ds[0x0028, 0x0108].VR = 'US' ds[0x0028, 0x0109].VR = 'US' ds.WindowCenter = self.center ds.WindowWidth = self.width if si.dtype in self.smallint or np.issubdtype(si.dtype, np.bool_): ds.SmallestImagePixelValue = np.uint16(si.min().astype('uint16')) ds.LargestImagePixelValue = np.uint16(si.max().astype('uint16')) if 'RescaleSlope' in ds: del ds.RescaleSlope if 'RescaleIntercept' in ds: del ds.RescaleIntercept else: ds.SmallestImagePixelValue = np.uint16((si.min().item() - self.b) / self.a) ds.LargestImagePixelValue = np.uint16((si.max().item() - self.b) / self.a) try: ds.RescaleSlope = "%f" % self.a except OverflowError: ds.RescaleSlope = "%d" % int(self.a) ds.RescaleIntercept = "%f" % self.b ds[0x0028, 0x0106].VR = 'US' ds[0x0028, 0x0107].VR = 'US' # General Image Module Attributes ds.InstanceNumber = 1 ds.ContentDate = self.today ds.ContentTime = self.now # ds.AcquisitionTime = self.add_time(self.seriesTime, timeline[tag]) ds.Rows = si.rows ds.Columns = si.columns self._insert_pixel_data(ds, si) # logger.debug("write_enhanced: filename {}".format(filename)) # Set tag # si will always have only the present tag ds.set_dicom_tag(self.input_options, si.input_order, si.tags[0]) if len(os.path.splitext(filename)[1]) > 0: fn = filename else: fn = filename + '.dcm' logger.debug("{}: filename {}".format(_name, fn)) # if archive.transport.name == 'dicom': # # Store dicom set ds directly # archive.transport.store(ds) # else: # # Store dicom set ds as file # with archive.open(fn, 'wb') as f: # ds.save_as(f) raise ValueError("write_enhanced: to be implemented")
[docs] def write_slice(self, input_order, tag, si, destination, ifile, tag_value=None, sop_ins_uid=None): """Write single slice to DICOM file Args: self: DICOMPlugin instance input_order: input order tag: tag index si: Series instance, including these attributes: - slices - sliceLocations - dicomTemplate - dicomToDo - seriesNumber - seriesDescription - imageType - frame - spacing - orientation - imagePositions - photometricInterpretation destination: destination object ifile: instance number in series tag_value: set tag value sop_ins_uid: set SOP Instance UID """ _name: str = '{}.{}'.format(__name__, self.write_slice.__name__) archive: AbstractArchive = destination['archive'] query = None if destination['files'] and len(destination['files']): query = destination['files'][0] filename = archive.construct_filename( tag=tag, query=query ) logger.debug("{}: {} {}".format(_name, filename, self.serInsUid)) try: ds = self.construct_dicom(filename, si.dicomTemplate, si, sop_ins_uid=sop_ins_uid) except ValueError: ds = self.construct_basic_dicom(si, sop_ins_uid=sop_ins_uid) ds.SeriesInstanceUID = si.header.seriesInstanceUID # Add header information try: ds.SliceLocation = pydicom.valuerep.format_number_as_ds(float(si.sliceLocations[0])) except (AttributeError, ValueError): # Do not know the SliceLocation, so will set this to be the slice index if tag is None: ds.SliceLocation = 0 else: ds.SliceLocation = tag[-1] try: dz, dy, dx = si.spacing except ValueError: dz, dy, dx = 1, 1, 1 ds.PixelSpacing = [pydicom.valuerep.format_number_as_ds(float(dy)), pydicom.valuerep.format_number_as_ds(float(dx))] ds.SliceThickness = pydicom.valuerep.format_number_as_ds(float(dz)) try: ipp = si.imagePositions if len(ipp) > 0: ipp = ipp[0] else: ipp = np.array([0, 0, 0]) except ValueError: ipp = np.array([0, 0, 0]) if ipp.shape == (3, 1): ipp.shape = (3,) z, y, x = ipp[:] ds.ImagePositionPatient = [pydicom.valuerep.format_number_as_ds(float(x)), pydicom.valuerep.format_number_as_ds(float(y)), pydicom.valuerep.format_number_as_ds(float(z))] # Reverse orientation vectors from zyx to xyz try: ds.ImageOrientationPatient = [ pydicom.valuerep.format_number_as_ds(float(si.orientation[2])), pydicom.valuerep.format_number_as_ds(float(si.orientation[1])), pydicom.valuerep.format_number_as_ds(float(si.orientation[0])), pydicom.valuerep.format_number_as_ds(float(si.orientation[5])), pydicom.valuerep.format_number_as_ds(float(si.orientation[4])), pydicom.valuerep.format_number_as_ds(float(si.orientation[3]))] except ValueError: ds.ImageOrientationPatient = [0, 0, 1, 0, 0, 1] try: ds.SeriesNumber = si.seriesNumber except ValueError: ds.SeriesNumber = 1 try: ds.SeriesDescription = si.seriesDescription except ValueError: ds.SeriesDescription = '' try: ds.ImageType = "\\".join(si.imageType) except ValueError: ds.ImageType = 'DERIVED\\SECONDARY' try: ds.FrameOfReferenceUID = si.frameOfReferenceUID except ValueError: pass # Add DICOM To Do items to present slice for _attr, _value, _slice, _tag in si.header.dicomToDo: _this_slice = True if _slice is None else _slice == tag[-1] _this_tag = True if _tag is None else _tag == tag if _this_slice and _this_tag: # Set Dicom Attribute if _attr not in ds: VR = pydicom.datadict.dictionary_VR(_attr) ds.add_new(_attr, VR, _value) elif issubclass(type(_value), str): if ds[_attr].VR == 'DA': ds[_attr].value = pydicom.valuerep.DA(_value) elif ds[_attr].VR == 'DT': ds[_attr].value = pydicom.valuerep.DT(_value) else: ds[_attr].value = _value else: ds[_attr].value = _value try: self._set_pixel_rescale(ds, si) except ValueError: pass # General Image Module Attributes ds.InstanceNumber = ifile + 1 ds.ContentDate = self.today ds.ContentTime = self.now # ds.AcquisitionTime = self.add_time(self.seriesTime, timeline[tag]) if si.ndim > 0: ds.Rows = si.rows ds.Columns = si.columns self._insert_pixel_data(ds, si) # Set tag # si will always have only the present tag self.set_dicom_tag(ds, self.input_options, input_order, tag_value) # TODO # This one? ds.set_dicom_tag(self.input_options, input_order, tag_value) logger.debug("{}: filename {}".format(_name, filename)) if archive.transport.name == 'dicom': # Store dicom set ds directly archive.transport.store(ds) else: # Store dicom set ds as file with archive.open(filename, 'wb') as f: ds.save_as(f)
def construct_basic_dicom(self, template: Series = None, filename: str = 'NA', sop_ins_uid: str = None ) -> FileDataset: if sop_ins_uid is None: raise ValueError('SOPInstanceUID is undefined.') # Populate required values for file meta information file_meta = FileMetaDataset() sop_class_uid = getattr(template, 'SOPClassUID', None) if sop_class_uid is None: sop_class_uid = pydicom.uid.UID('1.2.840.10008.5.1.4.1.1.7') file_meta.MediaStorageSOPClassUID = sop_class_uid if sop_ins_uid is not None: file_meta.MediaStorageSOPInstanceUID = sop_ins_uid else: file_meta.MediaStorageSOPInstanceUID = template.header.new_uid() file_meta.ImplementationClassUID = pydicom.uid.UID("%s.1" % self.root) file_meta.TransferSyntaxUID = pydicom.uid.ImplicitVRLittleEndian # Create the FileDataset instance # (initially no data elements, but file_meta supplied) ds = FileDataset( filename, {}, file_meta=file_meta, preamble=b"\0" * 128) ds.SOPClassUID = sop_class_uid ds.SOPInstanceUID = sop_ins_uid ds.PatientName = 'NA' ds.PatientID = 'NA' ds.PatientBirthDate = '00000000' ds.PatientSex = 'O' ds.StudyDate = self.today ds.StudyTime = '000000' try: ds.StudyInstanceUID = template.header.studyInstanceUID ds.SeriesInstanceUID = template.header.seriesInstanceUID except Exception as e: print(e) ds.StudyID = '0' ds.ReferringPhysicianName = 'NA' ds.AccessionNumber = 'NA' ds.Modality = 'SC' return ds def construct_dicom(self, filename: str, template: Series, si: Series, sop_ins_uid=None) -> FileDataset: self.instanceNumber += 1 if sop_ins_uid is None: sop_ins_uid = si.header.new_uid() # Populate required values for file meta information file_meta = FileMetaDataset() file_meta.MediaStorageSOPClassUID = si.SOPClassUID file_meta.MediaStorageSOPInstanceUID = sop_ins_uid file_meta.ImplementationClassUID = pydicom.uid.UID("%s.1" % self.root) file_meta.TransferSyntaxUID = pydicom.uid.ImplicitVRLittleEndian # Create the FileDataset instance # (initially no data elements, but file_meta supplied) ds = FileDataset( filename, {}, file_meta=file_meta, preamble=b"\0" * 128) # Add the data elements # -- not trying to set all required here. Check DICOM standard # copy_general_dicom_attributes(template, ds) for element in template.iterall(): if element.tag == 0x7fe00010: continue # Do not copy pixel data, will be added later ds.add(element) ds.StudyInstanceUID = si.header.studyInstanceUID ds.StudyID = si.header.studyID ds.SeriesInstanceUID = self.serInsUid ds.SOPClassUID = si.SOPClassUID ds.SOPInstanceUID = sop_ins_uid ds.AccessionNumber = si.header.accessionNumber ds.PatientName = si.header.patientName ds.PatientID = si.header.patientID ds.PatientBirthDate = si.header.patientBirthDate return ds def set_dicom_tag(self, ds: Dataset, input_options: dict, input_order: str, values) -> None: if input_order is None or values is None: return try: _ = len(values) except TypeError: values = [values] for order, value in zip(input_order.split(sep=','), values): if order == INPUT_ORDER_NONE: pass elif order == INPUT_ORDER_TIME: # AcquisitionTime time_tag = self.choose_tag("time", "AcquisitionTime") if time_tag not in ds: vr = dictionary_VR(time_tag) if vr == 'TM': ds.add_new(time_tag, vr, datetime.fromtimestamp( float(0.0), timezone.utc ).strftime("%H%M%S.%f") ) else: ds.add_new(time_tag, vr, 0.0) if ds.data_element(time_tag).VR == 'TM': time_str = datetime.fromtimestamp(float(value), timezone.utc).strftime("%H%M%S.%f") ds.data_element(time_tag).value = time_str else: ds.data_element(time_tag).value = float(value) elif order == INPUT_ORDER_DTI: set_ds_b_value(ds, value[0]) set_ds_b_vector(ds, value[1]) elif order == INPUT_ORDER_B: set_ds_b_value(ds, value) elif order == INPUT_ORDER_BVECTOR: set_ds_b_vector(ds, value) elif order == INPUT_ORDER_FA: fa_tag = self.choose_tag('fa', 'FlipAngle') if fa_tag not in ds: vr = dictionary_VR(fa_tag) ds.add_new(fa_tag, vr, float(value)) else: ds.data_element(fa_tag).value = float(value) elif order == INPUT_ORDER_TE: te_tag = self.choose_tag('te', 'EchoTime') if te_tag not in ds: vr = dictionary_VR(te_tag) ds.add_new(te_tag, vr, float(value)) else: ds.data_element(te_tag).value = float(value) else: # User-defined tag if order in input_options: _tag = input_options[order] if _tag not in ds: vr = dictionary_VR(_tag) ds.add_new(_tag, vr, float(value)) else: ds.data_element(_tag).value = float(value) else: raise (UnknownTag(f'Unknown input_order {order}.')) def choose_tag(self, tag, default): # Example: _tag = choose_tag('b', 'csa_header') if tag in self.input_options: return self.input_options[tag] else: return default @staticmethod def _copy_dicom_group(groupno, ds_in, ds_out): sub_dataset = ds_in.group_dataset(groupno) for data_element in sub_dataset: if data_element.VR != "SQ": ds_out[data_element.tag] = ds_in[data_element.tag] def _insert_pixel_data(self, ds, arr): """Insert pixel data into dicom object If float array, scale to uint16 """ ds.SamplesPerPixel = 1 ds.PixelRepresentation = 1 if np.issubdtype(arr.dtype, np.signedinteger) else 0 try: ds.PhotometricInterpretation = arr.photometricInterpretation if arr.photometricInterpretation == 'RGB': ds.SamplesPerPixel = 3 ds.PlanarConfiguration = 0 except ValueError: ds.PhotometricInterpretation = 'MONOCHROME2' if np.issubdtype(arr.dtype, np.bool_): # No scaling. Pack bits in 16-bit words ds.PixelData = arr.astype('uint16').tobytes() ds[0x7fe0, 0x0010].VR = 'OW' ds.BitsAllocated = 16 ds.BitsStored = 16 ds.HighBit = 15 elif arr.dtype in self.smallint: # No scaling of pixel values ds.PixelData = arr.tobytes() if arr.itemsize == 1: ds[0x7fe0, 0x0010].VR = 'OB' ds.BitsAllocated = 8 ds.BitsStored = 8 ds.HighBit = 7 elif arr.itemsize == 2: ds[0x7fe0, 0x0010].VR = 'OW' ds.BitsAllocated = 16 ds.BitsStored = 16 ds.HighBit = 15 else: raise TypeError('Cannot store {} itemsize {} without scaling'.format( arr.dtype, arr.itemsize)) elif arr.dtype == np.dtype([('R', 'u1'), ('G', 'u1'), ('B', 'u1')]): # RGB image ds.PixelData = arr.tobytes() ds[0x7fe0, 0x0010].VR = 'OB' ds.BitsAllocated = 8 ds.BitsStored = 8 ds.HighBit = 7 else: # Other high precision data type, like float: # rescale to uint16 rescaled = (np.asarray(arr) - self.b) / self.a ds.PixelData = rescaled.astype('uint16').tobytes() ds[0x7fe0, 0x0010].VR = 'OW' ds.BitsAllocated = 16 ds.BitsStored = 16 ds.HighBit = 15 def _calculate_rescale(self, arr): """Calculate rescale parameters for series. y = ax + b x in 0:65535 correspond to y in ymin:ymax 2^16 = 65536 possible steps in 16 bits dicom Returns: self.a: Rescale slope self.b: Rescale intercept self.center: Window center self.width: Window width self.smallestPixelValueInSeries: arr.min() self.largestPixelValueInSeries: arr.max() self.range_VR: The VR to use for DICOM elements (SS or US) """ _name: str = '{}.{}'.format(__name__, self._calculate_rescale.__name__) self.range_VR = 'SS' if np.issubdtype(arr.dtype, np.signedinteger) else 'US' self.range_VR = 'US' if arr.color else self.range_VR _range = 65536. if self.range_VR == 'US' else 32768. # Window center/width try: ymin = np.min(arr).item() ymax = np.max(arr).item() except AttributeError: ymin = np.min(arr) ymax = np.max(arr) if issubclass(type(ymin), tuple): ymin = 0 ymax = 255 self.center = 127 self.width = 256 else: self.center = (ymax + ymin) / 2 self.width = max(1, ymax - ymin) # y = ax + b, if arr.dtype in self.smallint or \ np.issubdtype(arr.dtype, np.bool_) or \ arr.dtype == np.dtype([('R', 'u1'), ('G', 'u1'), ('B', 'u1')]): # No need to rescale self.a = None self.b = None else: # Other high precision data type, like float # Must rescale data self.b = ymin if math.fabs(ymax - ymin) > 1e-6: self.a = (ymax - ymin) / (_range - 1) else: self.a = 1.0 logger.debug("{}: Rescale slope {}, rescale intercept {}".format( _name, self.a, self.b )) self.smallestPixelValueInSeries = ymin self.largestPixelValueInSeries = ymax def _set_pixel_rescale(self, ds, arr): """Set pixel rescale elements: - RescaleSlope - RescaleIntercept - WindowCenter - WindowWidth - SmallestPixelValueInSeries - LargestPixelValueInSeries Args: self.a: Rescale slope self.b: Rescale intercept self.center: Window center self.width: Window width self.smallestPixelValueInSeries: arr.min() self.largestPixelValueInSeries: arr.max() self.range_VR: The VR to use for DICOM elements (SS or US) ds: DICOM dataset arr: pixel series """ ds.WindowCenter = pydicom.valuerep.format_number_as_ds(float(self.center)) ds.WindowWidth = pydicom.valuerep.format_number_as_ds(float(self.width)) # Remove existing elements for element in ['SmallestImagePixelValue', 'LargestImagePixelValue', 'SmallestPixelValueInSeries', 'LargestPixelValueInSeries', 'RescaleSlope', 'RescaleIntercept']: if element in ds: del ds[element] if self.a is None: # No rescale slope _min = 0 if arr.color else arr.min() _max = 255 if arr.color else arr.max() _series_min = 0 if arr.color else self.smallestPixelValueInSeries _series_max = 255 if arr.color else self.largestPixelValueInSeries if isinstance(_min, np.bool_): _min, _max = 0, 1 _series_min, _series_max = 0, 1 else: try: ds.RescaleSlope = pydicom.valuerep.format_number_as_ds(self.a) except OverflowError: ds.RescaleSlope = "%d" % int(self.a) ds.RescaleIntercept = pydicom.valuerep.format_number_as_ds(float(self.b)) _min = np.array((arr.min() - self.b) / self.a).astype('uint16') _max = np.array((arr.max() - self.b) / self.a).astype('uint16') _series_min = np.array( (self.smallestPixelValueInSeries - self.b) / self.a).astype('uint16') _series_max = np.array( (self.largestPixelValueInSeries - self.b) / self.a).astype('uint16') ds.add_new(tag_for_keyword('SmallestImagePixelValue'), self.range_VR, _min) ds.add_new(tag_for_keyword('LargestImagePixelValue'), self.range_VR, _max) ds.add_new(tag_for_keyword('SmallestPixelValueInSeries'), self.range_VR, _series_min) ds.add_new(tag_for_keyword('LargestPixelValueInSeries'), self.range_VR, _series_max) @staticmethod def _add_time(now, add): """Add time to present time now Args: now: string hhmmss.ms add: float [s] Returns: newtime: string hhmmss.ms """ tnow = datetime.strptime(now, "%H%M%S.%f") s = int(add) ms = (add - s) * 1000. tadd = timedelta(seconds=s, milliseconds=ms) tnew = tnow + tadd return tnew.strftime("%H%M%S.%f")