"""Read/Write DICOM files
"""
# Copyright (c) 2013-2026 Erling Andersen, Haukeland University Hospital, Bergen, Norway
import os
import sys
import logging
import traceback
import mimetypes
import math
from collections import namedtuple, Counter
from functools import cmp_to_key
from datetime import date, datetime, timedelta, timezone
import numpy as np
import pydicom
import pydicom.valuerep
import pydicom.config
import pydicom.errors
import pydicom.uid
from pydicom.datadict import dictionary_VR, tag_for_keyword
from pydicom.dataset import Dataset, FileDataset, FileMetaDataset
from ..formats import (CannotSort, EmptyImageError, NotImageError, UnknownTag,
INPUT_ORDER_NONE, INPUT_ORDER_TIME, INPUT_ORDER_B,
INPUT_ORDER_FA, INPUT_ORDER_TE, INPUT_ORDER_BVECTOR,
INPUT_ORDER_DTI, INPUT_ORDER_TRIGGERTIME,
SORT_ON_SLICE
)
from ..series import Series
from ..axis import Axis, VariableAxis, UniformLengthAxis
from .abstractplugin import AbstractPlugin
from ..archives.abstractarchive import AbstractArchive, Member
from ..header import Header
from ..apps.diffusion import set_ds_b_value, set_ds_b_vector
from .dicomlib.instance import Instance
from .dicomlib.sorting import (combine_data_and_header, compare_tuples, determine_sorting,
scan_tags, verify_consistent_slices)
from .dicomlib.datatypes import (SeriesUID, SourceList, ObjectList, DatasetList, DatasetDict,
SortedDatasetList, SortedData, SortedDataDict, SortedHeaderDict,
PixelDict)
logger = logging.getLogger(__name__)
pydicom.config.settings.reading_validation_mode = pydicom.config.IGNORE
# pydicom.config.settings.writing_validation_mode = pydicom.config.IGNORE
pydicom.config.settings.writing_validation_mode = pydicom.config.WARN
# pydicom.config.settings.writing_validation_mode = pydicom.config.RAISE
mimetypes.add_type('application/dicom', '.ima')
"""
Processes
=========
object_list: ObjectList = self._get_dicom_files(sources)
dataset_dict: DatasetDict
dataset_dict = self._catalog_on_instance_uid(object_list)
self._extract_member(dataset_dict, f)
imaging_dataset_dict: DatasetDict
imaging_dataset_dict = self._select_imaging_datasets(dataset_dict)
non_imaging_dataset_dict: DatasetDict
non_imaging_dataset_dict = self._select_non_imaging_datasets(dataset_dict)
if imaging_dataset_dict:
sorted_data_dict: SortedDataDict
sorted_data_dict = self._sort_datasets(imaging_dataset_dict)
sorted_header_dict: SortedHeaderDict = SortedHeaderDict()
pixel_dict: PixelDict = PixelDict()
pixel_dict = self._construct_pixel_arrays(sorted_data_dict)
si = self._construct_pixel_array(dataset_dict, header, header.shape)
if non_imaging_dataset_dict:
non_image_header_dict: SortedHeaderDict
non_image_header_dict = self._get_non_image_headers(non_imaging_dataset_dict)
self._extract_non_image_dicom_attributes(series_dataset, hdr)
non_image_pixel_dict = self._construct_pixel_arrays(non_imaging_dataset_dict)
si = self._construct_pixel_array(dataset_dict, header, header.shape)
"""
image_uids = [pydicom.uid.MRImageStorage,
pydicom.uid.CTImageStorage,
pydicom.uid.DICOSCTImageStorage,
pydicom.uid.RTImageStorage,
pydicom.uid.UltrasoundImageStorage,
pydicom.uid.UltrasoundMultiFrameImageStorage,
pydicom.uid.ComputedRadiographyImageStorage,
pydicom.uid.XRayAngiographicImageStorage,
pydicom.uid.XRay3DAngiographicImageStorage,
pydicom.uid.XRay3DCraniofacialImageStorage,
pydicom.uid.XRayRadiofluoroscopicImageStorage,
pydicom.uid.SecondaryCaptureImageStorage,
pydicom.uid.PositronEmissionTomographyImageStorage,
pydicom.uid.BreastTomosynthesisImageStorage,
pydicom.uid.NuclearMedicineImageStorage,
pydicom.uid.ParametricMapStorage,
pydicom.uid.EddyCurrentImageStorage,
pydicom.uid.EddyCurrentMultiFrameImageStorage,
pydicom.uid.VLEndoscopicImageStorage,
pydicom.uid.VideoEndoscopicImageStorage,
pydicom.uid.VLMicroscopicImageStorage,
pydicom.uid.VideoMicroscopicImageStorage,
pydicom.uid.VLPhotographicImageStorage,
pydicom.uid.VideoPhotographicImageStorage
]
# sr_uids = [pydicom.uid.BasicTextSRStorage,
# pydicom.uid.EnhancedSRStorage,
# pydicom.uid.ComprehensiveSRStorage,]
[docs]
class DoNotIncludeFile(Exception):
pass
[docs]
class NoDICOMAttributes(Exception):
pass
[docs]
class ValueErrorWrapperPrecisionError(Exception):
pass
[docs]
class DICOMPlugin(AbstractPlugin):
"""Read/write DICOM files.
Attributes:
input_order
instanceNumber
today
now
serInsUid
input_options
output_sort
output_dir
seriesTime
"""
name = "dicom"
description = "Read and write DICOM files."
authors = "Erling Andersen"
version = "2.1.0"
url = "www.helse-bergen.no"
extensions = [".dcm", ".ima"]
root = "2.16.578.1.37.1.1.4"
smallint = ('bool8', 'byte', 'ubyte', 'ushort', 'uint16', 'int8', 'uint8', 'int16')
keep_uid = False
slice_tolerance = 1e-4
dir_cosine_tolerance = 0.0
def __init__(self):
super(DICOMPlugin, self).__init__(self.name, self.description,
self.authors, self.version, self.url)
self.input_order = None
self.DicomHeaderDict = None
self.dicomTemplate = None
self.instanceNumber = 0
self.today = date.today().strftime("%Y%m%d")
self.now = datetime.now().strftime("%H%M%S.%f")
self.serInsUid = None
self.input_options = {}
self.output_sort = None
self.output_dir = None
self.seriesTime = None
[docs]
def read(self, sources: SourceList, pre_hdr: Header, input_order: str, opts: dict) -> (
tuple[SortedHeaderDict, PixelDict]):
"""Read image data
Args:
self: DICOMPlugin instance
sources: list of sources to image data
pre_hdr: Pre-filled header dict. Can be None
input_order: sort order
opts: input options (dict)
Returns:
Tuple of
- hdr: Header
- input_format
- input_order
- slices
- sliceLocations
- dicomTemplate
- keep_uid
- tags
- seriesNumber
- seriesDescription
- imageType
- spacing
- orientation
- imagePositions
- si[tag,slice,rows,columns]: multi-dimensional numpy array
"""
_name: str = '{}.{}'.format(__name__, self.read.__name__)
self.input_order = input_order
self.input_options = {
INPUT_ORDER_NONE: 'get_no_value',
INPUT_ORDER_TIME: 'get_acquisition_time',
INPUT_ORDER_TRIGGERTIME: 'get_trigger_time',
INPUT_ORDER_B: 'get_b_value',
INPUT_ORDER_BVECTOR: 'get_b_vector',
INPUT_ORDER_DTI: 'get_dti',
INPUT_ORDER_TE: 'get_echo_time',
INPUT_ORDER_FA: 'get_flip_angle',
'auto_sort': ['time', 'triggertime', 'b', 'fa', 'te']
}
for key, value in opts.items(): # Copy opts to self.input_options
self.input_options[key] = value
skip_pixels = 'headers_only' in opts and opts['headers_only']
if 'slice_tolerance' in self.input_options:
self.slice_tolerance = float(self.input_options['slice_tolerance'])
if 'dir_cosine_tolerance' in self.input_options:
self.dir_cosine_tolerance = float(self.input_options['dir_cosine_tolerance'])
# Read DICOM headers
logger.debug('{}: sources {}'.format(_name, sources))
# pydicom.config.debug(True)
object_list: ObjectList = self._get_dicom_files(sources)
dataset_dict: DatasetDict
dataset_dict = self._catalog_on_instance_uid(object_list, opts, skip_pixels)
imaging_dataset_dict: DatasetDict
imaging_dataset_dict = self._select_imaging_datasets(dataset_dict, opts)
non_imaging_dataset_dict: DatasetDict
non_imaging_dataset_dict = self._select_non_imaging_datasets(dataset_dict, opts)
logger.debug('{}: imaging_datasets {}'.format(_name, len(imaging_dataset_dict)))
logger.debug('{}: non_imaging_datasets {}'.format(_name, len(non_imaging_dataset_dict)))
sorted_header_dict: SortedHeaderDict = SortedHeaderDict()
pixel_dict: PixelDict = PixelDict()
if imaging_dataset_dict:
sorted_data_dict: SortedDataDict
sorting: dict[str]
sorted_data_dict, sorting = self._sort_datasets(imaging_dataset_dict, input_order, opts)
if not skip_pixels:
logger.debug('{}: going to _construct_pixel_arrays'.format(_name))
pixel_dict = self._construct_pixel_arrays(sorted_data_dict, opts, skip_pixels)
if 'correct_acq' in opts and opts['correct_acq']:
for seriesUID in sorted_data_dict:
pixel_dict[seriesUID] = self._correct_acqtimes_for_dynamic_series(
sorted_data_dict[seriesUID], pixel_dict[seriesUID]
)
for seriesUID in sorted_data_dict:
_, header = sorted_data_dict[seriesUID]
sorted_header_dict[seriesUID] = header
if non_imaging_dataset_dict:
logger.debug('{}: going to _get_non_image_headers {}'.format(_name, sources))
non_image_header_dict: SortedHeaderDict
non_image_header_dict = self._get_non_image_headers(non_imaging_dataset_dict, opts)
non_image_pixel_dict = {}
if not skip_pixels:
logger.debug('{}: going to _construct_pixel_arrays'.format(_name))
sorted_data_dict: SortedDataDict
sorted_data_dict = combine_data_and_header(non_imaging_dataset_dict, non_image_header_dict, opts)
try:
non_image_pixel_dict = self._construct_pixel_arrays(sorted_data_dict,
opts, skip_pixels)
except EmptyImageError:
pass
for seriesUID in non_image_header_dict:
if seriesUID in sorted_header_dict:
sorted_header_dict[seriesUID].datasets = non_imaging_dataset_dict[seriesUID]
else:
sorted_header_dict[seriesUID] = non_image_header_dict[seriesUID]
sorted_header_dict[seriesUID].datasets = non_imaging_dataset_dict[seriesUID]
if seriesUID in pixel_dict:
raise IndexError('Duplicate pixel data')
elif seriesUID in non_image_pixel_dict:
pixel_dict[seriesUID] = non_image_pixel_dict[seriesUID]
logger.debug('{}: ending'.format(_name))
return sorted_header_dict, pixel_dict
def _get_dicom_files(self,
sources: SourceList
) -> ObjectList:
"""Get DICOM objects.
Args:
self: DICOMPlugin instance
sources: list of sources to image data
Returns:
List of tuples of
- archive
- member
"""
_name: str = '{}.{}'.format(__name__, self._get_dicom_files.__name__)
logger.debug("{}: sources: {} {}".format(
_name, type(sources), sources))
object_list: ObjectList = ObjectList()
for source in sources:
archive = source['archive']
scan_files = source['files']
logger.debug("{}: archive: {}".format(_name, archive))
if scan_files is None or len(scan_files) == 0:
if archive.base is not None:
scan_files = [archive.base]
else:
scan_files = ['*']
elif archive.base is not None:
raise ValueError('When is archive.base with source[files]')
logger.debug("{}: source: {} {}".format(_name, type(source), source))
logger.debug("{}: scan_files: {}".format(_name, scan_files))
for path in archive.getnames(scan_files):
logger.debug("{}: member: {}".format(_name, path))
if os.path.basename(path) == "DICOMDIR":
continue
member = archive.getmembers([path, ])
if len(member) != 1:
raise IndexError('Should not be multiple files for a filename')
member = member[0]
object_list.append((archive, member))
return object_list
def _catalog_on_instance_uid(self,
object_list: ObjectList,
opts: dict = None,
skip_pixels: bool = False) \
-> DatasetDict:
"""Sort files on Series Instance UID
Args:
self: DICOMPlugin instance
object_list: List of (archive, member) tuples
opts: input options (dict)
skip_pixels: Do not read pixel data (default: False)
Returns:
Dict of List of Instance
"""
_name: str = '{}.{}'.format(__name__, self._catalog_on_instance_uid.__name__)
logger.debug('{}:'.format(_name))
dataset_dict: DatasetDict = DatasetDict()
last_message = ''
for archive, member in object_list:
try:
with archive.open(member, mode='rb') as f:
logger.debug('{}: process_member {}'.format(_name, member))
self._extract_member(dataset_dict, f, opts, skip_pixels=skip_pixels)
except DoNotIncludeFile as e:
last_message = '{}'.format(e)
except Exception as e:
logger.debug('{}: Exception {}'.format(_name, e))
last_message = '{}'.format(e)
if len(object_list) > 0 and len(dataset_dict) < 1:
raise NotImageError(last_message)
return dataset_dict
def _select_imaging_datasets(self,
dataset_dict: DatasetDict,
opts: dict = None
) \
-> DatasetDict:
"""Select imaging datasets only
Args:
self: DICOMPlugin instance
dataset_dict: Dict of List of Dataset (DatasetDict)
opts: input options (dict)
Returns:
Dict of List of Instance
"""
_name: str = '{}.{}'.format(__name__, self._select_imaging_datasets.__name__)
# Select datasets on SOPClassUID
selected_dataset_dict: DatasetDict = DatasetDict()
for seriesUID in dataset_dict:
dataset_list = dataset_dict[seriesUID]
dataset: Dataset
dataset = dataset_list[0]
if dataset.SOPClassUID in image_uids:
# Keep imaging datasets
selected_dataset_dict[seriesUID] = dataset_list
logger.debug('{}: end with {}'.format(_name, selected_dataset_dict.keys()))
return selected_dataset_dict
def _select_non_imaging_datasets(self,
dataset_dict: DatasetDict,
opts: dict = {}
) \
-> DatasetDict:
"""Select non-imaging datasets only
Args:
self: DICOMPlugin instance
dataset_dict: Dict of List of Dataset (DatasetDict)
opts: input options (dict)
Returns:
Dict of List of non-imaging Instance
"""
_name: str = '{}.{}'.format(__name__, self._select_non_imaging_datasets.__name__)
# Select datasets on SOPClassUID
selected_dataset_dict: DatasetDict = DatasetDict()
for seriesUID in dataset_dict:
dataset_list = dataset_dict[seriesUID]
dataset: Dataset
dataset = dataset_list[0]
if dataset.SOPClassUID not in image_uids:
# Keep non-imaging datasets
selected_dataset_dict[seriesUID] = dataset_list
logger.debug('{}: end with {}'.format(_name, selected_dataset_dict.keys()))
return selected_dataset_dict
def _extract_member(self,
image_list: DatasetDict,
member: Dataset | Member | str,
opts: dict = None,
skip_pixels: bool = False):
im: Dataset
if issubclass(type(member), Dataset):
im = member
else:
# Read the DICOM object
try:
im = pydicom.filereader.dcmread(member, stop_before_pixels=skip_pixels)
except pydicom.errors.InvalidDicomError as e:
raise DoNotIncludeFile('Invalid Dicom Error: {}'.format(e))
# Verify that the DICOM object has pixel data
if not skip_pixels:
try:
_ = len(im.pixel_array)
except AttributeError:
pass
# raise DoNotIncludeFile('No pixel data in DICOM object')
if 'input_serinsuid' in opts and opts['input_serinsuid'] is not None:
if im.SeriesInstanceUID != opts['input_serinsuid']:
raise DoNotIncludeFile('Series Instance UID not selected')
if 'input_echo' in opts and opts['input_echo'] is not None:
if int(im.EchoNumbers) != int(opts['input_echo']):
raise DoNotIncludeFile('Echo Number not selected')
if 'input_acquisition' in opts and opts['input_acquisition'] is not None:
if int(im.AcquisitionNumber) != int(opts['input_acquisition']):
raise DoNotIncludeFile('Acquisition Number not selected')
# Catalog images with ref as key
acquisition_number = echo_number = None
series_instance_uid = im.SeriesInstanceUID
if 'ignore_series_uid' in opts and opts['ignore_series_uid']:
series_instance_uid = None
if 'split_acquisitions' in opts and opts['split_acquisitions']:
acquisition_number = im.AcquisitionNumber
if 'split_echo_numbers' in opts and opts['split_echo_numbers']:
echo_number = im.EchoNumbers
ref = SeriesUID(im.PatientID, im.StudyInstanceUID, series_instance_uid,
acquisition_number, echo_number)
image_list[ref].append(Instance(im))
def _sort_datasets(self,
image_dict: DatasetDict,
input_order: str,
opts: dict = None
) -> (SortedDataDict, dict[str]):
"""Sort datasets on slice and tags.
Args:
image_dict: Input datasets (DatasetDict)
input_order: Input order (str)
opts: Input options (dict)
Returns:
tuple of SortedDataDict and dict
Process:
_sort_datasets() takes an unordered DatasetDict, sorts on
slice and tags, creates a Header instance, and returns
a SortedDataDict of SortedData.
For each dataset:
sorted_dataset_list = self._sort_dataset_geometry(dataset_list)
-> spacing, tranformationMatrix, imagePositions
_sort_dataset_geometry() takes an unordered DatasetDict, sorts by slice
location, and adds spacing, transformationMatrix and imagePositions.
slice_count = verify_consistent_slices(sorted_dataset_list)
<- from SortedDatasetList : series[sloc]
# Determine (automatic) sorting
sorting[seriesUID] = determine_sorting(sorted_dataset_list)
Catalog tag values
tag_values, shape = scan_tags(seriesUID)
Sort the tags and index the images
shape = ()
axes = ()
tag_db = {}
im.slice_index
im.tag_index
calculate_shape
calculate_shape_with_duplicates
-> tags
# Sort the dataset on selected key for each sloc
sorted_dataset_list[sloc].sort(key=partial(get_tag_value, ...))
_extract_all_tags() takes existing SortedDatasetList and fills
in additional information.
Extract additional DICOM tags.
Find maximum shape in slices.
Place each image on the proper tag.
"""
def _cmp_tags(im1: Instance, im2: Instance) -> int:
tags1 = im1.get_tag_tuple(None, sorting[seriesUID], self.input_options)
tags2 = im2.get_tag_tuple(None, sorting[seriesUID], self.input_options)
return compare_tuples(tags1, tags2)
_name: str = '{}.{}'.format(__name__, self._sort_datasets.__name__)
skip_broken_series = 'skip_broken_series' in opts and opts['skip_broken_series']
# Sort datasets on sloc
sorted_data_dict: SortedDataDict = SortedDataDict()
sorting: dict[SeriesUID, str]
sorting = {}
for seriesUID in image_dict:
sorting[seriesUID] = 'none'
dataset_list: DatasetList
dataset_list = image_dict[seriesUID]
message: str = f'{dataset_list}'
message2: str = ''
sorted_dataset_list: SortedDatasetList = None
try:
sorted_dataset_list = self._sort_dataset_geometry(dataset_list, message, opts)
except CannotSort as e:
message2 = '{}'.format(e)
logger.debug('{}: _sort_dataset_geometry CannotSort: {}'.format(_name, e))
if skip_broken_series:
continue
if sorted_dataset_list is None:
raise CannotSort('Cannot sort: {}'.format(message2))
_: Counter = verify_consistent_slices(sorted_dataset_list, message, opts)
header = sorted_dataset_list.get_headers()
header.sliceLocations = np.array(sorted(sorted_dataset_list.keys()))
# Determine (automatic) sorting
try:
sorting[seriesUID] = determine_sorting(
sorted_dataset_list, input_order, self.input_options
)
except CannotSort:
if skip_broken_series:
logger.debug(f'{_name}: skip_broken_series continue {seriesUID}')
continue # Next series
else:
logger.debug(f'{_name}: skip_broken_series raise')
raise
header.input_order = sorting[seriesUID]
# Sort the dataset on selected key for each sloc
for sloc in sorted(sorted_dataset_list.keys()):
try:
sorted_dataset_list[sloc].sort(key=cmp_to_key(_cmp_tags))
except (ValueError, TypeError):
pass
# Catalog actual tag values keeping other tags fixed
# Important when sorting acquisition time which differ from slice to slice
axes, tags = scan_tags(sorted_dataset_list, sorting[seriesUID], self.input_options)
sorted_dataset_list.axes = self._get_axes(sorted_dataset_list)
header.axes = self._join_axes(sorted_dataset_list, axes)
header.tags = tags
header.SOPInstanceUIDs = self._get_sop_ins_uids(sorted_dataset_list)
header.dicomTemplate = sorted_dataset_list[next(iter(sorted_dataset_list))][0]
header.geometryIsDefined = True
# Catalog images with seriesUID and sloc as keys
sorted_data_dict[seriesUID] = SortedData((sorted_dataset_list, header))
logger.debug('{}: end with {}'.format(_name, sorted_data_dict.keys()))
return sorted_data_dict, sorting
def _get_non_image_headers(self,
dataset_dict: DatasetDict,
opts: dict = None
) -> SortedHeaderDict:
"""Get DICOM headers for non-image datasets"""
_name: str = '{}.{}'.format(__name__, self._get_non_image_headers.__name__)
skip_broken_series = False
if 'skip_broken_series' in opts:
skip_broken_series = opts['skip_broken_series']
sorted_header_dict: SortedHeaderDict = SortedHeaderDict()
for seriesUID in dataset_dict:
series_dataset: DatasetList = dataset_dict[seriesUID]
hdr = Header()
hdr.input_format = 'dicom'
hdr.input_order = 'none'
if len(series_dataset) == 0:
raise ValueError("No DICOM images found.")
try:
self._extract_non_image_dicom_attributes(series_dataset, hdr, opts=opts)
hdr.set_default_values(hdr.axes)
sorted_header_dict[seriesUID] = hdr
except CannotSort:
if skip_broken_series:
logger.debug(
'{}: skip_broken_series continue {}'.format(
_name, seriesUID
))
continue # Next series
else:
logger.debug('{}: skip_broken_series raise'.format(_name))
raise
logger.debug('{}: end with {}'.format(_name,
sorted_header_dict.keys()
))
return sorted_header_dict
def _construct_pixel_arrays(self,
sorted_data_dict: SortedDataDict,
opts: dict = None,
skip_pixels: bool = False
) -> PixelDict:
_name: str = '{}.{}'.format(__name__, self._construct_pixel_arrays.__name__)
skip_broken_series = 'skip_broken_series' in opts and opts['skip_broken_series']
pixel_dict: PixelDict = PixelDict()
for seriesUID in sorted_data_dict:
sorted_dataset_list: SortedDatasetList
header: Header
sorted_dataset_list, header = sorted_data_dict[seriesUID]
setattr(header, 'keep_uid', True)
si = None
if not skip_pixels:
# Extract pixel data
try:
si = self._construct_pixel_array(
sorted_dataset_list, header, header.shape, opts=opts
)
except TypeError:
pass
except Exception:
if skip_broken_series:
logger.debug(
'{}: skip_broken_series continue {}'.format(
_name, seriesUID
))
continue
else:
logger.debug('{}: skip_broken_series raise'.format(_name))
raise
if si is not None:
pixel_dict[seriesUID] = si
return pixel_dict
def _construct_pixel_array(self,
image_dict: SortedDatasetList,
hdr: Header,
shape: tuple,
opts: dict = None
) -> np.ndarray:
def _copy_pixels(_si, _hdr, _image_dict):
_name: str = '{}.{}'.format(__name__, _copy_pixels.__name__)
faulty = 0
for _slice, _sloc in enumerate(sorted(_image_dict)):
_done = {}
for im in _image_dict[_sloc]:
tag = im.tags
idx = im.tag_index
if idx in _done and not accept_duplicate_tag:
raise CannotSort("Overwriting data at index {}, tag {}\n".format(idx, tag) +
"Maybe try accept_duplicate_tag=True?")
_done[idx] = True
idx += (_slice,)
# Simplify index when image is 3D, remove tag index
logger.debug("{}: si.ndim {}, idx {}".format(_name, _si.ndim, idx))
if _si.ndim == 3:
idx = idx[len(tag):]
try:
logger.debug("{}: get idx {} shape {}".format(_name, idx, _si[idx].shape))
if _si.ndim > 2:
_si[idx] = im.get_pixels_with_shape(_si[idx].shape)
else:
_si[...] = im.get_pixels_with_shape(_si.shape)
except Exception as e:
logger.warning("{}: Cannot read pixel data: {}".format(_name, e))
raise
del im
faulty += 1
def _copy_pixels_from_frames(_si, _hdr, _image_dict):
_name: str = '{}.{}'.format(__name__, _copy_pixels_from_frames.__name__)
assert len(_image_dict) == _si.shape[0], "Do not know how to unpack frames and slices"
if _si.ndim > 3:
for i, im in enumerate(_image_dict):
try:
logger.debug("{}: get shape {}".format(_name, _si.shape))
_si[i] = im.get_pixels_with_shape(_si.shape[1:])
except Exception as e:
logger.warning("{}: Cannot read pixel data: {}".format(_name, e))
raise
del im
else:
try:
im = image_dict[next(iter(image_dict))][0]
except TypeError:
im = image_dict[0]
try:
logger.debug("{}: get shape {}".format(_name, _si.shape))
_si[...] = im.get_pixels_with_shape(_si.shape)
except Exception as e:
logger.warning("{}: Cannot read pixel data: {}".format(_name, e))
raise
del im
_name: str = '{}.{}'.format(__name__, self._construct_pixel_array.__name__)
opts = {} if opts is None else opts
accept_duplicate_tag = 'accept_duplicate_tag' in opts and opts['accept_duplicate_tag']
# Look-up first image to determine pixel type
try:
im: Instance = image_dict[next(iter(image_dict))][0]
except TypeError:
im: Instance = image_dict[0]
if 'BitsAllocated' not in im:
raise EmptyImageError("No pixel data in instance.")
hdr.photometricInterpretation = 'MONOCHROME2'
if 'PhotometricInterpretation' in im:
hdr.photometricInterpretation = im.PhotometricInterpretation
matrix_dtype = np.uint16
if 'PixelRepresentation' in im:
if im.PixelRepresentation == 1:
matrix_dtype = np.int16
if 'RescaleSlope' in im and 'RescaleIntercept' in im and \
(abs(im.RescaleSlope - 1) > 1e-4 or abs(im.RescaleIntercept) > 1e-4):
matrix_dtype = float
elif im.BitsAllocated == 8:
if hdr.color:
matrix_dtype = np.dtype([('R', 'u1'), ('G', 'u1'), ('B', 'u1')])
else:
matrix_dtype = np.uint8
elif im.BitsAllocated == 1:
matrix_dtype = np.bool_
logger.debug('{}: matrix_dtype {}'.format(_name, matrix_dtype))
# Load DICOM image data
logger.debug('{}: shape {}'.format(_name, shape))
si = np.zeros(shape, matrix_dtype)
if 'NumberOfFrames' in im and im.NumberOfFrames > 1:
_copy_pixels_from_frames(si, hdr, image_dict)
else:
_copy_pixels(si, hdr, image_dict)
# Simplify shape
self._reduce_shape(si, hdr.axes)
logger.debug('{}: si {}'.format(_name, si.shape))
return si
def _extract_non_image_dicom_attributes(self,
series: DatasetList,
hdr: Header,
opts: dict = None
) -> None:
"""Extract DICOM attributes
Args:
self: DICOMPlugin instance
series: DatasetList
hdr: existing header (Header)
opts:
Returns:
hdr: header
- seriesNumber
- seriesDescription
- imageType
- modality, laterality, protocolName, bodyPartExamined
- seriesDate, seriesTime
"""
dataset = series[0]
dataset.copy_attributes_to_header(hdr)
if 'Rows' not in dataset:
return
# Construct axes. Required to determine matrix shape
if len(series) > 1:
descriptions = []
for im in series:
ss = im.SegmentSequence
# ars = ss[0].AnatomicRegionSequence
# code_value = ars[0].CodeValue
# spccs = ss[0].SegmentedPropertyCategoryCodeSequence
sptcs = ss[0].SegmentedPropertyTypeCodeSequence
code_value = sptcs[0].CodeValue
code_meaning = sptcs[0].CodeMeaning
descriptions.append('{}:{}'.format(code_value, code_meaning))
hdr.axes = namedtuple('Axes', [
'text', 'slice', 'row', 'column'
])(VariableAxis('text', descriptions),
UniformLengthAxis('slice', 0, dataset.NumberOfFrames, 1),
UniformLengthAxis('row', 1, dataset.Rows, 1), # dataset.PixelSpacing[0]
UniformLengthAxis('column', 2, dataset.Columns, 1)) # dataset.PixelSpacing[1]
hdr.input_order = 'text'
elif 'NumberOfFrames' in dataset:
hdr.axes = namedtuple('Axes', [
'slice', 'row', 'column'
])(UniformLengthAxis('slice', 0, dataset.NumberOfFrames, 1),
UniformLengthAxis('row', 1, dataset.Rows, 1), # dataset.PixelSpacing[0]
UniformLengthAxis('column', 2, dataset.Columns, 1)) # dataset.PixelSpacing[1]
else:
hdr.axes = namedtuple('Axes', [
'row', 'column'
])(UniformLengthAxis('row', 0, dataset.Rows, 1), # dataset.PixelSpacing[0]
UniformLengthAxis('column', 1, dataset.Columns, 1)) # dataset.PixelSpacing[1]
def _sort_dataset_geometry(self, dataset_list: DatasetList, message: str, opts: dict = None) -> SortedDatasetList:
# _name: str = '{}.{}'.format(__name__, self._sort_dataset_geometry.__name__)
def _get_spacing(dataset_list: DatasetList) -> np.ndarray:
_name: str = '{}.{}'.format(__name__, _get_spacing.__name__)
# Spacing
dr = dc = 1.0
try:
pixel_spacing = dataset_list.getDicomAttribute(tag_for_keyword("PixelSpacing"))
if pixel_spacing is not None:
# Notice that DICOM row spacing comes first, column spacing second!
dr = float(pixel_spacing[0])
dc = float(pixel_spacing[1])
except (AttributeError, KeyError) as e:
logger.debug('{}: {}'.format(_name, e))
pass
try:
slice_spacing = float(dataset_list.getDicomAttribute(tag_for_keyword("SpacingBetweenSlices")))
except KeyError:
try:
slice_spacing = float(dataset_list.getDicomAttribute(tag_for_keyword("SliceThickness")))
except KeyError:
slice_spacing = 1.0
return np.array([slice_spacing, dr, dc])
def _verify_no_gantry_tilt(dataset_list: DatasetList) -> None:
try:
gantry = dataset_list.getDicomAttributeValues(tag_for_keyword("GantryDetectorTilt"))
if gantry is None:
return
gantry = np.unique(gantry)
if len(gantry) > 1:
raise CannotSort('{}: More than one Gantry/Detector Tilt'.format(message))
elif len(gantry) == 1:
if gantry[0] != 0.0:
raise CannotSort('{}: Gantry/Detector Tilt is not zero'.format(message))
except Exception:
raise
def _get_orientation(dataset_list: DatasetList) -> list[np.ndarray]:
# iops = self.getDicomAttribute(dictionary, tag_for_keyword("ImageOrientationPatient"))
orients = []
for s in range(len(dataset_list)):
# orient = dataset_list.get_image_orientation_patient()
orients.append(dataset_list.get_image_orientation_patient())
if self.dir_cosine_tolerance == 0.0:
if len(orients) != 1:
found = None
for it in orients:
if found is None:
found = it
elif (it != found).all():
raise CannotSort('{}: More than one IOP. Try changing dir_cosine_tolerance'.format(message))
if found is None:
raise CannotSort('{}: No IOP.'.format(message))
return orients[0]
def _verify_single_frame_of_reference(dataset_list: DatasetList):
frames = dataset_list.getDicomAttributeValues(tag_for_keyword("FrameOfReferenceUID"))
frames = sorted(set(frames))
if len(frames) > 1:
logger.warning('{}: Multiple values of FrameOfReferenceUID'.format(message))
def _calculate_distances(dataset_list: DatasetList, orient: np.ndarray, spacing: np.ndarray,
opts: dict = None)\
-> list[np.ndarray, np.ndarray]:
# _name: str = '{}.{}'.format(__name__, _calculate_distances.__name__)
sort_on_slice_location = 'sort_on_slice_location' in opts and opts['sort_on_slice_location']
# Calculate slice normal from IOP, will be the same for all slices
colr = np.array(orient[:3]).reshape(3, 1)
colc = np.array(orient[3:]).reshape(3, 1)
colr = colr / np.linalg.norm(colr)
colc = colc / np.linalg.norm(colc)
normal = np.cross(colc, colr, axis=0).reshape(3)
# For each slice, calculate distance along the slice normal using IPP
distances = []
ipps = []
for _slice in range(len(dataset_list)):
ipp = self.getOriginForSlice(dataset_list, _slice)
if ipp is None:
ipp = np.array([0.0, 0.0, 0.0])
if self.dir_cosine_tolerance != 0.0:
orient2 = orient[_slice]
colr2 = np.array(orient2[:3]).reshape(3, 1)
colc2 = np.array(orient2[3:]).reshape(3, 1)
colr2 = colr2 / np.linalg.norm(colr2)
colc2 = colc2 / np.linalg.norm(colc2)
normal2 = np.cross(colr2, colc2, axis=0)
cd = sum(normal[:] * normal2[:])[0]
if np.fabs(1 - cd) > self.dir_cosine_tolerance:
raise CannotSort('{}: Problem with dir_cosine_tolerance'.format(message))
dist = np.dot(normal, ipp)
distances.append(dist)
ipps.append(ipp)
# Determine sorting of the slices based on distance
distances = np.array(distances)
distance_idx = np.argsort(distances)
unique_distances = np.unique(distances)
# Construct transformationMatrix
slices = len(unique_distances)
T0 = ipps[distance_idx[0]]
Tn = ipps[distance_idx[-1]]
k = ((Tn - T0) / (slices - 1)) if slices > 1 else np.array([0, 0, 1])
transform = np.eye(4)
transform[:3, :4] = np.hstack([
k.reshape(3, 1),
colc.reshape(3, 1) * spacing[1],
colr.reshape(3, 1) * spacing[2],
T0.reshape(3, 1)])
if sort_on_slice_location:
# If we do not trust sorting on ipp, repeat with slice locations
distances = []
for _slice in range(len(dataset_list)):
try:
dist = float(self.getDicomAttribute(dataset_list, tag_for_keyword("SliceLocation"), _slice))
except TypeError:
raise CannotSort('{}: Missing SliceLocation'.format(message))
distances.append(dist)
distances = np.array(distances)
distance_idx = np.argsort(distances)
unique_distances = np.unique(distances)
if len(unique_distances) != slices:
raise CannotSort('{}: Problem with sorting, {} unique distances do not match {} slices'.format(
message, len(unique_distances), slices
))
# Sort imagePositions
imagePositions = {}
for i in range(slices):
pos = np.where(distances == unique_distances[i])[0][0]
imagePositions[i] = ipps[pos]
return distances, distance_idx, transform, imagePositions
def _verify_spacing(distances: np.ndarray):
# Verify spacing
spacings = []
spacing_is_good = True
has_warned = False
d = np.unique(distances)
prev = d[0]
if len(d) > 1:
current = d[1]
slice_spacing = current - prev
for it in range(1, len(d)):
current = d[it]
spacings.append(abs(current - prev))
if abs(current - prev) - slice_spacing > self.slice_tolerance:
if not has_warned:
logger.warning('{}: Slice spacing differs too much, {} vs {}. Decrease slice_tolerance.'.format(
message,
abs(current - prev), slice_spacing
))
has_warned = True
spacing_is_good = False
prev = current
if not spacing_is_good:
raise CannotSort(
'{}: Slice spacing varies:\n Distances: {}\n Spacing: {}'.format(
message, distances, spacings
))
spacing = _get_spacing(dataset_list)
_verify_no_gantry_tilt(dataset_list)
orient = _get_orientation(dataset_list)
_verify_single_frame_of_reference(dataset_list)
distances, distance_idx, transform, ipps = _calculate_distances(dataset_list, orient, spacing, opts)
_verify_spacing(distances)
# Sort dataset on distances
sorted_dataset: SortedDatasetList = SortedDatasetList()
sorted_dataset.spacing = spacing
sorted_dataset.transformationMatrix = transform
sorted_dataset.imagePositions = ipps
for idx in distance_idx:
distance = distances[idx]
# Catalog images with distance (sloc) as key
sorted_dataset[distance].append(dataset_list[idx])
return sorted_dataset
def _get_axes(self, sorted_dataset: SortedDatasetList) -> tuple[Axis]:
frames = None
rows = columns = 0
for _slice, sloc in enumerate(sorted_dataset):
im: Instance
for im in sorted_dataset[sloc]:
rows = max(rows, im.Rows)
columns = max(columns, im.Columns)
if 'NumberOfFrames' in im:
frames = im.NumberOfFrames
if frames is not None and frames > 1:
slices = frames
else:
slices = len(sorted_dataset)
ipp = sorted_dataset.imagePositions[0]
slice_axis = UniformLengthAxis('slice', ipp[0], slices, sorted_dataset.spacing[0])
row_axis = UniformLengthAxis('row', ipp[1], rows, sorted_dataset.spacing[1])
column_axis = UniformLengthAxis('column', ipp[2], columns, sorted_dataset.spacing[2])
return slice_axis, row_axis, column_axis
def _join_axes(self, sorted_dataset: SortedDatasetList, axes: tuple[Axis]) -> namedtuple:
"""Join given sorted dataset with given axes.
"""
geometry_axes = sorted_dataset.axes
if len(geometry_axes[0]) <= 1 and not len(axes):
geometry_axes = geometry_axes[1:]
tag_axes = []
axis_names = []
if len(axes):
for axis in axes:
if len(axis) > 1:
tag_axes.append(axis)
axis_names.append(axis.name)
for axis in geometry_axes:
tag_axes.append(axis)
axis_names.append(axis.name)
Axes = namedtuple('Axes', axis_names)
axes = Axes(*tag_axes)
return axes
def _get_sop_ins_uids(self, sorted_dataset: SortedDatasetList) -> dict:
SOPInstanceUIDs = {}
for _slice, sloc in enumerate(sorted_dataset):
im: Instance
for im in sorted_dataset[sloc]:
_idx = im.tag_index
SOPInstanceUIDs[_idx + (_slice,)] = im.SOPInstanceUID
return SOPInstanceUIDs
[docs]
def getOriginForSlice(self, dictionary, slice):
"""Get origin of given slice.
Args:
self: DICOMPlugin instance
dictionary: image dictionary
slice: slice number (int)
Returns:
z,y,x: coordinate for origin of given slice (np.array)
"""
try:
origin = self.getDicomAttribute(dictionary, tag_for_keyword("ImagePositionPatient"), slice)
if origin is not None:
x = float(origin[0])
y = float(origin[1])
z = float(origin[2])
return np.array([z, y, x])
except TypeError:
pass
if issubclass(type(slice), Dataset):
d = slice
s = 0
else:
d = dictionary
s = slice
while not issubclass(type(d), Dataset):
if issubclass(type(d), dict):
d = d[next(iter(d))]
elif issubclass(type(d), (tuple, list)):
for d in d:
if issubclass(type(d), Dataset):
break
try:
origin = self.getDicomAttribute(d, tag_for_keyword("ImagePositionPatient"), s)
except TypeError:
origin = self.getDicomAttribute(slice, tag_for_keyword("ImagePositionPatient"), 0)
if origin is not None:
x = float(origin[0])
y = float(origin[1])
z = float(origin[2])
return np.array([z, y, x])
return None
[docs]
def setDicomAttribute(self, dictionary, tag, value):
"""Set a given DICOM attribute to the provided value.
Ignore if no real dicom header exists.
Args:
self: DICOMPlugin instance
dictionary: image dictionary
tag: DICOM tag of addressed attribute.
value: Set attribute to this value.
"""
if dictionary is not None:
for _slice in dictionary:
for tg, im in dictionary[_slice]:
if tag not in im:
VR = pydicom.datadict.dictionary_VR(tag)
im.add_new(tag, VR, value)
else:
im[tag].value = value
[docs]
def getDicomAttribute(self, dictionary, tag, slice=0):
"""Get DICOM attribute from first image for given slice.
Args:
self: DICOMPlugin instance
dictionary: image dictionary
tag: DICOM tag of requested attribute.
slice: which slice to access. Default: slice=0
"""
assert dictionary is not None, "dicomplugin.getDicomAttribute: dictionary is None"
try:
_, im = dictionary[slice][next(iter(dictionary[slice]))]
except TypeError:
try:
_, im = dictionary[slice][0]
except (KeyError, TypeError):
im = dictionary[slice]
except KeyError:
try:
im = dictionary[slice]
except KeyError:
im = dictionary
if tag in im:
return im[tag].value
else:
return None
def _read_image(self, f, opts, hdr):
"""Read image data from given file handle
Args:
self: format plugin instance
f: file handle or filename (depending on self._need_local_file)
opts: Input options (dict)
hdr: Header
Returns:
Tuple of
- hdr: Header
Return values:
- info: Internal data for the plugin
None if the given file should not be included (e.g. raw file)
- si: numpy array (multi-dimensional)
"""
pass
def _set_tags(self, image_list, hdr, si):
"""Set header tags.
Args:
self: format plugin instance
image_list: list with (info,img) tuples
hdr: Header
si: numpy array (multi-dimensional)
Returns:
hdr: Header
"""
pass
def _correct_acqtimes_for_dynamic_series(self, hdr: Header, si: np.ndarray):
# si[t,slice,rows,columns]
_name: str = '{}.{}'.format(__name__, self._correct_acqtimes_for_dynamic_series.__name__)
# Extract acqtime for each image
slices = len(hdr.sliceLocations)
timesteps = self._count_timesteps(hdr)
logger.info(
"{}: Slices: {}, apparent time steps: {}, actual time steps: {}".format(
_name, slices, len(hdr.tags), timesteps))
new_shape = (timesteps, slices, si.shape[2], si.shape[3])
newsi = np.zeros(new_shape, dtype=si.dtype)
acq = np.zeros([slices, timesteps])
for _slice in self.DicomHeaderDict:
t = 0
for tg, im in self.DicomHeaderDict[_slice]:
acq[_slice, t] = tg
t += 1
# Correct acqtimes by setting acqtime for each slice of a volume to
# the smallest time
for t in range(acq.shape[1]):
min_acq = np.min(acq[:, t])
for _slice in range(acq.shape[0]):
acq[_slice, t] = min_acq
# Set new acqtime for each image
for _slice in self.DicomHeaderDict:
t = 0
for tg, im in self.DicomHeaderDict[_slice]:
im.AcquisitionTime = "%f" % acq[_slice, t]
newsi[t, _slice, :, :] = si[t, _slice, :, :]
t += 1
# Update taglist in hdr
hdr.tags = {}
for _slice in self.DicomHeaderDict:
hdr.tags[_slice] = np.empty((acq.shape[1],))
for t in range(acq.shape[1]):
hdr.tags[_slice][t] = acq[0, t]
return newsi
@staticmethod
def _count_timesteps(hdr):
slices = len(hdr.sliceLocations)
timesteps = np.zeros([slices], dtype=int)
for _slice in hdr.DicomHeaderDict:
timesteps[_slice] = len(hdr.DicomHeaderDict[_slice])
if timesteps.min() != timesteps.max():
raise ValueError("Number of time steps ranges from %d to %d." % (
timesteps.min(), timesteps.max()))
return timesteps.max()
[docs]
def write_3d_numpy(self, si: Series, destination, opts):
"""Write 3D Series image as DICOM files
Args:
self: DICOMPlugin instance
si: Series array (3D or 4D)
destination: dict of archive and filenames
opts: Output options (dict)
"""
_name: str = '{}.{}'.format(__name__, self.write_3d_numpy.__name__)
logger.debug('{}: destination {}'.format(_name, destination))
archive = destination['archive']
archive.set_member_naming_scheme(
fallback='Image_{:05d}.dcm',
level=max(0, si.ndim - 2),
default_extension='.dcm',
extensions=self.extensions
)
self.keep_uid = False if 'keep_uid' not in opts else opts['keep_uid']
self.instanceNumber = 0
logger.debug('{}: orig shape {}, slices {} len {}'.format(
_name, si.shape, si.slices, si.ndim))
assert si.ndim in [0, 2, 3], \
"write_3d_series: input dimension %d is not 2D/3D." % si.ndim
if si.ndim > 0:
self._calculate_rescale(si)
logger.info("{}: Smallest/largest pixel value in series: {}/{}".format(
_name, self.smallestPixelValueInSeries, self.largestPixelValueInSeries))
if 'window' in opts and opts['window'] == 'original':
raise ValueError('No longer supported: opts["window"] is set')
self.center = si.windowCenter
self.width = si.windowWidth
self.today = date.today().strftime("%Y%m%d")
self.now = datetime.now().strftime("%H%M%S.%f")
# Set series instance UID when writing
self.serInsUid = si.header.seriesInstanceUID if self.keep_uid else si.header.new_uid()
logger.debug("{}: {}".format(_name, self.serInsUid))
for key, value in opts.items(): # Copy opts to self.input_options
self.input_options[key] = value
if pydicom.uid.UID(si.SOPClassUID).keyword == 'EnhancedMRImageStorage' or \
pydicom.uid.UID(si.SOPClassUID).keyword == 'EnhancedCTImageStorage':
# Write Enhanced CT/MR
self.write_enhanced(si, destination)
else:
# Either legacy CT/MR, or another modality
if si.ndim < 3:
logger.debug('{}: write 2D ({})'.format(_name, si.ndim))
if self.keep_uid:
sop_ins_uid = si.SOPInstanceUIDs[(0, 0)]
else:
sop_ins_uid = si.header.new_uid()
self.write_slice('none', None, si, destination, 0,
sop_ins_uid=sop_ins_uid)
else:
logger.debug('{}: write 3D slices {}'.format(_name, si.slices))
for _slice in range(si.slices):
if self.keep_uid:
sop_ins_uid = si.SOPInstanceUIDs[(0, _slice)]
else:
sop_ins_uid = si.header.new_uid()
try:
self.write_slice('none', (_slice,), si[_slice], destination, _slice,
sop_ins_uid=sop_ins_uid)
except Exception:
traceback.print_exc(file=sys.stdout)
raise
[docs]
def write_4d_numpy(self, si: Series, destination, opts):
"""Write 4D Series image as DICOM files
si.series_number is inserted into each dicom object
si.series_description is inserted into each dicom object
si.image_type: Dicom image type attribute
opts['output_sort']: Which tag will sort the output images (slice or tag)
opts['output_dir']: Store all images in a single or multiple directories
Args:
self: DICOMPlugin instance
si: Series array si[tag,slice,rows,columns]
destination: dict of archive and filenames
opts: Output options (dict)
"""
_name: str = '{}.{}'.format(__name__, self.write_4d_numpy.__name__)
logger.debug('{}: destination {}'.format(_name, destination))
archive = destination['archive']
self.keep_uid = False if 'keep_uid' not in opts else opts['keep_uid']
# Defaults
self.output_sort = SORT_ON_SLICE
if 'output_sort' in opts:
self.output_sort = opts['output_sort']
self.output_dir = 'single'
if 'output_dir' in opts:
self.output_dir = opts['output_dir']
self.instanceNumber = 0
logger.debug('{}: orig shape {}, len {}'.format(_name, si.shape, si.ndim))
assert si.ndim >= 4, "write_4d_series: input dimension %d is less than 4D." % si.ndim
tags = si.tags[0].ndim
steps = si.shape[:tags]
self._calculate_rescale(si)
logger.info("{}: Smallest/largest pixel value in series: {}/{}".format(
_name, self.smallestPixelValueInSeries, self.largestPixelValueInSeries))
self.today = date.today().strftime("%Y%m%d")
self.now = datetime.now().strftime("%H%M%S.%f")
# Not used # self.seriesTime = obj.getDicomAttribute(tag_for_keyword("AcquisitionTime"))
# Set series instance UID when writing
if not self.keep_uid:
si.header.seriesInstanceUID = si.header.new_uid()
self.serInsUid = si.header.seriesInstanceUID
self.input_options = opts
if pydicom.uid.UID(si.SOPClassUID).keyword == 'EnhancedMRImageStorage' or \
pydicom.uid.UID(si.SOPClassUID).keyword == 'EnhancedCTImageStorage':
# Write Enhanced CT/MR
self.write_enhanced(si, destination)
return
# Either legacy CT/MR, or another modality
if self.output_sort == SORT_ON_SLICE:
if self.output_dir == 'single':
# Filenames: Image_00000.dcm, sort slice fastest
archive.set_member_naming_scheme(
fallback='Image_{:05d}.dcm',
level=1,
default_extension='.dcm',
extensions=self.extensions
)
else: # self.output_dir == 'multi'
# Filenames: Tag0/../TagN/Image_00000.dcm, sort slice fastest
dirn = []
for i, order in enumerate(si.input_order.split(',')):
digits = len("{}".format(steps[i]))
dirn.append(
"{0}{{{1}:0{2}}}".format(
order,
i,
digits)
)
archive.set_member_naming_scheme(
fallback=os.path.join(
*dirn,
'Image_{' + '{}'.format(len(dirn)) + ':05d}.dcm'),
level=max(0, si.ndim - 2),
default_extension='.dcm',
extensions=self.extensions
)
ifile = 0
for tag in np.ndindex(steps):
for _slice in range(si.slices):
if si.tags[_slice][tag] is None:
continue
_tag = tag + (_slice,)
if self.keep_uid:
sop_ins_uid = si.SOPInstanceUIDs[tag + (_slice,)]
else:
sop_ins_uid = si.header.new_uid()
if self.output_dir == 'multi' and _slice == 0:
# Restart file number in each subdirectory
ifile = 0
if self.output_dir == 'multi':
_file_tag = _tag
else:
_file_tag = (ifile,)
try:
_t = si.header.tags[_slice][tag]
if _t is None:
continue
self.write_slice(si.input_order, _file_tag, si[_tag],
destination, ifile,
tag_value=si.header.tags[_slice][tag],
sop_ins_uid=sop_ins_uid)
except Exception:
traceback.print_exc(file=sys.stdout)
raise
ifile += 1
else: # self.output_sort == SORT_ON_TAG:
if self.output_dir == 'single':
# Filenames: Image_00000.dcm, sort tags fastest
archive.set_member_naming_scheme(
fallback='Image_{:05d}.dcm',
level=1,
default_extension='.dcm',
extensions=self.extensions
)
else: # self.output_dir == 'multi'
# Filenames: slice/tag0/../tagN/Image_00000.dcm, sort tags fastest
digits = len("{}".format(si.slices))
dirn = ["slice{{0:0{0}}}".format(digits)]
for i, order in enumerate(si.input_order.split(',')[:-1]):
digits = len("{}".format(steps[i]))
dirn.append(
"{0}{{{1}:0{2}}}".format(
order,
i + 1,
digits
)
)
order = si.input_order.split(',')[-1]
digits = len("{}".format(steps[-1]))
archive.set_member_naming_scheme(
fallback=os.path.join(
*dirn,
order + '{' + '{}'.format(len(dirn)) + ':0{}'.format(digits) + '}.dcm'),
level=max(0, si.ndim - 2),
default_extension='.dcm',
extensions=self.extensions
)
ifile = 0
for _slice in range(si.slices):
for tag in np.ndindex(steps):
_tag = (_slice,) + tag
if si.tags[_slice][tag] is None:
continue
if self.keep_uid:
sop_ins_uid = si.SOPInstanceUIDs[tag + (_slice,)]
else:
sop_ins_uid = si.header.new_uid()
if self.output_dir == 'multi' and tag == 0:
# Restart file number in each subdirectory
ifile = 0
if self.output_dir == 'multi':
_file_tag = _tag
else:
_file_tag = (ifile,)
try:
_t = si.header.tags[_slice][tag]
if _t is None:
continue
self.write_slice(si.input_order, _file_tag, si[tag + (_slice,)],
destination, ifile,
tag_value=si.header.tags[_slice][tag],
sop_ins_uid=sop_ins_uid)
except Exception:
traceback.print_exc(file=sys.stdout)
raise
ifile += 1
[docs]
def write_enhanced(self, si, archive, filename_template, opts):
"""Write enhanced CT/MR object to DICOM file
Args:
self: DICOMPlugin instance
si: Series instance, including these attributes:
archive: archive object
filename_template: file name template, possible without '.dcm' extension
opts: Output options (dict)
Raises:
"""
_name: str = '{}.{}'.format(__name__, self.write_enhanced.__name__)
filename = 'dummy'
logger.debug("{}: {} {}".format(_name, filename, self.serInsUid))
try:
tg, member_name, im = si.DicomHeaderDict[0][0]
except (KeyError, IndexError):
raise IndexError("Cannot address dicom_template.DicomHeaderDict[0][0]")
except ValueError:
raise NoDICOMAttributes("Cannot write DICOM object when no DICOM attributes exist.")
logger.debug("{}: member_name {}".format(_name, member_name))
self.keep_uid = False if 'keep_uid' not in opts else opts['keep_uid']
if not self.keep_uid:
si.header.seriesInstanceUID = si.header.new_uid()
self.serInsUid = si.header.seriesInstanceUID
ds = self.construct_enhanced_dicom(filename_template, im, si)
# Add header information
try:
ds.SliceLocation = si.sliceLocations[0]
except (AttributeError, ValueError):
# Dont know the SliceLocation, attempt to calculate from image geometry
try:
ds.SliceLocation = im.calculate_slice_location()
except ValueError:
# Dont know the SliceLocation, so will set this to be the slice index
ds.SliceLocation = slice
try:
dz, dy, dx = si.spacing
except ValueError:
dz, dy, dx = 1, 1, 1
ds.PixelSpacing = [str(dy), str(dx)]
ds.SliceThickness = str(dz)
try:
ipp = si.imagePositions
if len(ipp) > 0:
ipp = ipp[0]
else:
ipp = np.array([0, 0, 0])
except ValueError:
ipp = np.array([0, 0, 0])
if ipp.shape == (3, 1):
ipp.shape = (3,)
z, y, x = ipp[:]
ds.ImagePositionPatient = [str(x), str(y), str(z)]
# Reverse orientation vectors from zyx to xyz
try:
ds.ImageOrientationPatient = [
si.orientation[2], si.orientation[1], si.orientation[0],
si.orientation[5], si.orientation[4], si.orientation[3]]
except ValueError:
ds.ImageOrientationPatient = [0, 0, 1, 0, 1, 0]
try:
ds.SeriesNumber = si.seriesNumber
except ValueError:
ds.SeriesNumber = 1
try:
ds.SeriesDescription = si.seriesDescription
except ValueError:
ds.SeriesDescription = ''
try:
ds.ImageType = "\\".join(si.imageType)
except ValueError:
ds.ImageType = 'DERIVED\\SECONDARY'
try:
ds.FrameOfReferenceUID = si.frameOfReferenceUID
except ValueError:
pass
ds.SmallestPixelValueInSeries = np.uint16(self.smallestPixelValueInSeries)
ds.LargestPixelValueInSeries = np.uint16(self.largestPixelValueInSeries)
ds[0x0028, 0x0108].VR = 'US'
ds[0x0028, 0x0109].VR = 'US'
ds.WindowCenter = self.center
ds.WindowWidth = self.width
if si.dtype in self.smallint or np.issubdtype(si.dtype, np.bool_):
ds.SmallestImagePixelValue = np.uint16(si.min().astype('uint16'))
ds.LargestImagePixelValue = np.uint16(si.max().astype('uint16'))
if 'RescaleSlope' in ds:
del ds.RescaleSlope
if 'RescaleIntercept' in ds:
del ds.RescaleIntercept
else:
ds.SmallestImagePixelValue = np.uint16((si.min().item() - self.b) / self.a)
ds.LargestImagePixelValue = np.uint16((si.max().item() - self.b) / self.a)
try:
ds.RescaleSlope = "%f" % self.a
except OverflowError:
ds.RescaleSlope = "%d" % int(self.a)
ds.RescaleIntercept = "%f" % self.b
ds[0x0028, 0x0106].VR = 'US'
ds[0x0028, 0x0107].VR = 'US'
# General Image Module Attributes
ds.InstanceNumber = 1
ds.ContentDate = self.today
ds.ContentTime = self.now
# ds.AcquisitionTime = self.add_time(self.seriesTime, timeline[tag])
ds.Rows = si.rows
ds.Columns = si.columns
self._insert_pixel_data(ds, si)
# logger.debug("write_enhanced: filename {}".format(filename))
# Set tag
# si will always have only the present tag
ds.set_dicom_tag(self.input_options, si.input_order, si.tags[0])
if len(os.path.splitext(filename)[1]) > 0:
fn = filename
else:
fn = filename + '.dcm'
logger.debug("{}: filename {}".format(_name, fn))
# if archive.transport.name == 'dicom':
# # Store dicom set ds directly
# archive.transport.store(ds)
# else:
# # Store dicom set ds as file
# with archive.open(fn, 'wb') as f:
# ds.save_as(f)
raise ValueError("write_enhanced: to be implemented")
[docs]
def write_slice(self, input_order, tag, si, destination, ifile,
tag_value=None,
sop_ins_uid=None):
"""Write single slice to DICOM file
Args:
self: DICOMPlugin instance
input_order: input order
tag: tag index
si: Series instance, including these attributes:
- slices
- sliceLocations
- dicomTemplate
- dicomToDo
- seriesNumber
- seriesDescription
- imageType
- frame
- spacing
- orientation
- imagePositions
- photometricInterpretation
destination: destination object
ifile: instance number in series
tag_value: set tag value
sop_ins_uid: set SOP Instance UID
"""
_name: str = '{}.{}'.format(__name__, self.write_slice.__name__)
archive: AbstractArchive = destination['archive']
query = None
if destination['files'] and len(destination['files']):
query = destination['files'][0]
filename = archive.construct_filename(
tag=tag,
query=query
)
logger.debug("{}: {} {}".format(_name, filename, self.serInsUid))
try:
ds = self.construct_dicom(filename, si.dicomTemplate, si, sop_ins_uid=sop_ins_uid)
except ValueError:
ds = self.construct_basic_dicom(si, sop_ins_uid=sop_ins_uid)
ds.SeriesInstanceUID = si.header.seriesInstanceUID
# Add header information
try:
ds.SliceLocation = pydicom.valuerep.format_number_as_ds(float(si.sliceLocations[0]))
except (AttributeError, ValueError):
# Do not know the SliceLocation, so will set this to be the slice index
if tag is None:
ds.SliceLocation = 0
else:
ds.SliceLocation = tag[-1]
try:
dz, dy, dx = si.spacing
except ValueError:
dz, dy, dx = 1, 1, 1
ds.PixelSpacing = [pydicom.valuerep.format_number_as_ds(float(dy)),
pydicom.valuerep.format_number_as_ds(float(dx))]
ds.SliceThickness = pydicom.valuerep.format_number_as_ds(float(dz))
try:
ipp = si.imagePositions
if len(ipp) > 0:
ipp = ipp[0]
else:
ipp = np.array([0, 0, 0])
except ValueError:
ipp = np.array([0, 0, 0])
if ipp.shape == (3, 1):
ipp.shape = (3,)
z, y, x = ipp[:]
ds.ImagePositionPatient = [pydicom.valuerep.format_number_as_ds(float(x)),
pydicom.valuerep.format_number_as_ds(float(y)),
pydicom.valuerep.format_number_as_ds(float(z))]
# Reverse orientation vectors from zyx to xyz
try:
ds.ImageOrientationPatient = [
pydicom.valuerep.format_number_as_ds(float(si.orientation[2])),
pydicom.valuerep.format_number_as_ds(float(si.orientation[1])),
pydicom.valuerep.format_number_as_ds(float(si.orientation[0])),
pydicom.valuerep.format_number_as_ds(float(si.orientation[5])),
pydicom.valuerep.format_number_as_ds(float(si.orientation[4])),
pydicom.valuerep.format_number_as_ds(float(si.orientation[3]))]
except ValueError:
ds.ImageOrientationPatient = [0, 0, 1, 0, 0, 1]
try:
ds.SeriesNumber = si.seriesNumber
except ValueError:
ds.SeriesNumber = 1
try:
ds.SeriesDescription = si.seriesDescription
except ValueError:
ds.SeriesDescription = ''
try:
ds.ImageType = "\\".join(si.imageType)
except ValueError:
ds.ImageType = 'DERIVED\\SECONDARY'
try:
ds.FrameOfReferenceUID = si.frameOfReferenceUID
except ValueError:
pass
# Add DICOM To Do items to present slice
for _attr, _value, _slice, _tag in si.header.dicomToDo:
_this_slice = True if _slice is None else _slice == tag[-1]
_this_tag = True if _tag is None else _tag == tag
if _this_slice and _this_tag:
# Set Dicom Attribute
if _attr not in ds:
VR = pydicom.datadict.dictionary_VR(_attr)
ds.add_new(_attr, VR, _value)
elif issubclass(type(_value), str):
if ds[_attr].VR == 'DA':
ds[_attr].value = pydicom.valuerep.DA(_value)
elif ds[_attr].VR == 'DT':
ds[_attr].value = pydicom.valuerep.DT(_value)
else:
ds[_attr].value = _value
else:
ds[_attr].value = _value
try:
self._set_pixel_rescale(ds, si)
except ValueError:
pass
# General Image Module Attributes
ds.InstanceNumber = ifile + 1
ds.ContentDate = self.today
ds.ContentTime = self.now
# ds.AcquisitionTime = self.add_time(self.seriesTime, timeline[tag])
if si.ndim > 0:
ds.Rows = si.rows
ds.Columns = si.columns
self._insert_pixel_data(ds, si)
# Set tag
# si will always have only the present tag
self.set_dicom_tag(ds, self.input_options, input_order, tag_value)
# TODO # This one? ds.set_dicom_tag(self.input_options, input_order, tag_value)
logger.debug("{}: filename {}".format(_name, filename))
if archive.transport.name == 'dicom':
# Store dicom set ds directly
archive.transport.store(ds)
else:
# Store dicom set ds as file
with archive.open(filename, 'wb') as f:
ds.save_as(f)
[docs]
def construct_basic_dicom(self,
template: Series = None,
filename: str = 'NA',
sop_ins_uid: str = None
) -> FileDataset:
if sop_ins_uid is None:
raise ValueError('SOPInstanceUID is undefined.')
# Populate required values for file meta information
file_meta = FileMetaDataset()
sop_class_uid = getattr(template, 'SOPClassUID', None)
if sop_class_uid is None:
sop_class_uid = pydicom.uid.UID('1.2.840.10008.5.1.4.1.1.7')
file_meta.MediaStorageSOPClassUID = sop_class_uid
if sop_ins_uid is not None:
file_meta.MediaStorageSOPInstanceUID = sop_ins_uid
else:
file_meta.MediaStorageSOPInstanceUID = template.header.new_uid()
file_meta.ImplementationClassUID = pydicom.uid.UID("%s.1" % self.root)
file_meta.TransferSyntaxUID = pydicom.uid.ImplicitVRLittleEndian
# Create the FileDataset instance
# (initially no data elements, but file_meta supplied)
ds = FileDataset(
filename,
{},
file_meta=file_meta,
preamble=b"\0" * 128)
ds.SOPClassUID = sop_class_uid
ds.SOPInstanceUID = sop_ins_uid
ds.PatientName = 'NA'
ds.PatientID = 'NA'
ds.PatientBirthDate = '00000000'
ds.PatientSex = 'O'
ds.StudyDate = self.today
ds.StudyTime = '000000'
try:
ds.StudyInstanceUID = template.header.studyInstanceUID
ds.SeriesInstanceUID = template.header.seriesInstanceUID
except Exception as e:
print(e)
ds.StudyID = '0'
ds.ReferringPhysicianName = 'NA'
ds.AccessionNumber = 'NA'
ds.Modality = 'SC'
return ds
[docs]
def construct_dicom(self,
filename: str,
template: Series,
si: Series,
sop_ins_uid=None) -> FileDataset:
self.instanceNumber += 1
if sop_ins_uid is None:
sop_ins_uid = si.header.new_uid()
# Populate required values for file meta information
file_meta = FileMetaDataset()
file_meta.MediaStorageSOPClassUID = si.SOPClassUID
file_meta.MediaStorageSOPInstanceUID = sop_ins_uid
file_meta.ImplementationClassUID = pydicom.uid.UID("%s.1" % self.root)
file_meta.TransferSyntaxUID = pydicom.uid.ImplicitVRLittleEndian
# Create the FileDataset instance
# (initially no data elements, but file_meta supplied)
ds = FileDataset(
filename,
{},
file_meta=file_meta,
preamble=b"\0" * 128)
# Add the data elements
# -- not trying to set all required here. Check DICOM standard
# copy_general_dicom_attributes(template, ds)
for element in template.iterall():
if element.tag == 0x7fe00010:
continue # Do not copy pixel data, will be added later
ds.add(element)
ds.StudyInstanceUID = si.header.studyInstanceUID
ds.StudyID = si.header.studyID
ds.SeriesInstanceUID = self.serInsUid
ds.SOPClassUID = si.SOPClassUID
ds.SOPInstanceUID = sop_ins_uid
ds.AccessionNumber = si.header.accessionNumber
ds.PatientName = si.header.patientName
ds.PatientID = si.header.patientID
ds.PatientBirthDate = si.header.patientBirthDate
return ds
[docs]
def set_dicom_tag(self, ds: Dataset, input_options: dict, input_order: str, values) -> None:
if input_order is None or values is None:
return
try:
_ = len(values)
except TypeError:
values = [values]
for order, value in zip(input_order.split(sep=','), values):
if order == INPUT_ORDER_NONE:
pass
elif order == INPUT_ORDER_TIME:
# AcquisitionTime
time_tag = self.choose_tag("time", "AcquisitionTime")
if time_tag not in ds:
vr = dictionary_VR(time_tag)
if vr == 'TM':
ds.add_new(time_tag, vr, datetime.fromtimestamp(
float(0.0), timezone.utc
).strftime("%H%M%S.%f")
)
else:
ds.add_new(time_tag, vr, 0.0)
if ds.data_element(time_tag).VR == 'TM':
time_str = datetime.fromtimestamp(float(value), timezone.utc).strftime("%H%M%S.%f")
ds.data_element(time_tag).value = time_str
else:
ds.data_element(time_tag).value = float(value)
elif order == INPUT_ORDER_DTI:
set_ds_b_value(ds, value[0])
set_ds_b_vector(ds, value[1])
elif order == INPUT_ORDER_B:
set_ds_b_value(ds, value)
elif order == INPUT_ORDER_BVECTOR:
set_ds_b_vector(ds, value)
elif order == INPUT_ORDER_FA:
fa_tag = self.choose_tag('fa', 'FlipAngle')
if fa_tag not in ds:
vr = dictionary_VR(fa_tag)
ds.add_new(fa_tag, vr, float(value))
else:
ds.data_element(fa_tag).value = float(value)
elif order == INPUT_ORDER_TE:
te_tag = self.choose_tag('te', 'EchoTime')
if te_tag not in ds:
vr = dictionary_VR(te_tag)
ds.add_new(te_tag, vr, float(value))
else:
ds.data_element(te_tag).value = float(value)
else:
# User-defined tag
if order in input_options:
_tag = input_options[order]
if _tag not in ds:
vr = dictionary_VR(_tag)
ds.add_new(_tag, vr, float(value))
else:
ds.data_element(_tag).value = float(value)
else:
raise (UnknownTag(f'Unknown input_order {order}.'))
[docs]
def choose_tag(self, tag, default):
# Example: _tag = choose_tag('b', 'csa_header')
if tag in self.input_options:
return self.input_options[tag]
else:
return default
@staticmethod
def _copy_dicom_group(groupno, ds_in, ds_out):
sub_dataset = ds_in.group_dataset(groupno)
for data_element in sub_dataset:
if data_element.VR != "SQ":
ds_out[data_element.tag] = ds_in[data_element.tag]
def _insert_pixel_data(self, ds, arr):
"""Insert pixel data into dicom object
If float array, scale to uint16
"""
ds.SamplesPerPixel = 1
ds.PixelRepresentation = 1 if np.issubdtype(arr.dtype, np.signedinteger) else 0
try:
ds.PhotometricInterpretation = arr.photometricInterpretation
if arr.photometricInterpretation == 'RGB':
ds.SamplesPerPixel = 3
ds.PlanarConfiguration = 0
except ValueError:
ds.PhotometricInterpretation = 'MONOCHROME2'
if np.issubdtype(arr.dtype, np.bool_):
# No scaling. Pack bits in 16-bit words
ds.PixelData = arr.astype('uint16').tobytes()
ds[0x7fe0, 0x0010].VR = 'OW'
ds.BitsAllocated = 16
ds.BitsStored = 16
ds.HighBit = 15
elif arr.dtype in self.smallint:
# No scaling of pixel values
ds.PixelData = arr.tobytes()
if arr.itemsize == 1:
ds[0x7fe0, 0x0010].VR = 'OB'
ds.BitsAllocated = 8
ds.BitsStored = 8
ds.HighBit = 7
elif arr.itemsize == 2:
ds[0x7fe0, 0x0010].VR = 'OW'
ds.BitsAllocated = 16
ds.BitsStored = 16
ds.HighBit = 15
else:
raise TypeError('Cannot store {} itemsize {} without scaling'.format(
arr.dtype, arr.itemsize))
elif arr.dtype == np.dtype([('R', 'u1'), ('G', 'u1'), ('B', 'u1')]):
# RGB image
ds.PixelData = arr.tobytes()
ds[0x7fe0, 0x0010].VR = 'OB'
ds.BitsAllocated = 8
ds.BitsStored = 8
ds.HighBit = 7
else:
# Other high precision data type, like float:
# rescale to uint16
rescaled = (np.asarray(arr) - self.b) / self.a
ds.PixelData = rescaled.astype('uint16').tobytes()
ds[0x7fe0, 0x0010].VR = 'OW'
ds.BitsAllocated = 16
ds.BitsStored = 16
ds.HighBit = 15
def _calculate_rescale(self, arr):
"""Calculate rescale parameters for series.
y = ax + b
x in 0:65535 correspond to y in ymin:ymax
2^16 = 65536 possible steps in 16 bits dicom
Returns:
self.a: Rescale slope
self.b: Rescale intercept
self.center: Window center
self.width: Window width
self.smallestPixelValueInSeries: arr.min()
self.largestPixelValueInSeries: arr.max()
self.range_VR: The VR to use for DICOM elements (SS or US)
"""
_name: str = '{}.{}'.format(__name__, self._calculate_rescale.__name__)
self.range_VR = 'SS' if np.issubdtype(arr.dtype, np.signedinteger) else 'US'
self.range_VR = 'US' if arr.color else self.range_VR
_range = 65536. if self.range_VR == 'US' else 32768.
# Window center/width
try:
ymin = np.min(arr).item()
ymax = np.max(arr).item()
except AttributeError:
ymin = np.min(arr)
ymax = np.max(arr)
if issubclass(type(ymin), tuple):
ymin = 0
ymax = 255
self.center = 127
self.width = 256
else:
self.center = (ymax + ymin) / 2
self.width = max(1, ymax - ymin)
# y = ax + b,
if arr.dtype in self.smallint or \
np.issubdtype(arr.dtype, np.bool_) or \
arr.dtype == np.dtype([('R', 'u1'), ('G', 'u1'), ('B', 'u1')]):
# No need to rescale
self.a = None
self.b = None
else:
# Other high precision data type, like float
# Must rescale data
self.b = ymin
if math.fabs(ymax - ymin) > 1e-6:
self.a = (ymax - ymin) / (_range - 1)
else:
self.a = 1.0
logger.debug("{}: Rescale slope {}, rescale intercept {}".format(
_name, self.a, self.b
))
self.smallestPixelValueInSeries = ymin
self.largestPixelValueInSeries = ymax
def _set_pixel_rescale(self, ds, arr):
"""Set pixel rescale elements:
- RescaleSlope
- RescaleIntercept
- WindowCenter
- WindowWidth
- SmallestPixelValueInSeries
- LargestPixelValueInSeries
Args:
self.a: Rescale slope
self.b: Rescale intercept
self.center: Window center
self.width: Window width
self.smallestPixelValueInSeries: arr.min()
self.largestPixelValueInSeries: arr.max()
self.range_VR: The VR to use for DICOM elements (SS or US)
ds: DICOM dataset
arr: pixel series
"""
ds.WindowCenter = pydicom.valuerep.format_number_as_ds(float(self.center))
ds.WindowWidth = pydicom.valuerep.format_number_as_ds(float(self.width))
# Remove existing elements
for element in ['SmallestImagePixelValue', 'LargestImagePixelValue',
'SmallestPixelValueInSeries', 'LargestPixelValueInSeries',
'RescaleSlope', 'RescaleIntercept']:
if element in ds:
del ds[element]
if self.a is None:
# No rescale slope
_min = 0 if arr.color else arr.min()
_max = 255 if arr.color else arr.max()
_series_min = 0 if arr.color else self.smallestPixelValueInSeries
_series_max = 255 if arr.color else self.largestPixelValueInSeries
if isinstance(_min, np.bool_):
_min, _max = 0, 1
_series_min, _series_max = 0, 1
else:
try:
ds.RescaleSlope = pydicom.valuerep.format_number_as_ds(self.a)
except OverflowError:
ds.RescaleSlope = "%d" % int(self.a)
ds.RescaleIntercept = pydicom.valuerep.format_number_as_ds(float(self.b))
_min = np.array((arr.min() - self.b) / self.a).astype('uint16')
_max = np.array((arr.max() - self.b) / self.a).astype('uint16')
_series_min = np.array(
(self.smallestPixelValueInSeries - self.b) / self.a).astype('uint16')
_series_max = np.array(
(self.largestPixelValueInSeries - self.b) / self.a).astype('uint16')
ds.add_new(tag_for_keyword('SmallestImagePixelValue'), self.range_VR, _min)
ds.add_new(tag_for_keyword('LargestImagePixelValue'), self.range_VR, _max)
ds.add_new(tag_for_keyword('SmallestPixelValueInSeries'), self.range_VR, _series_min)
ds.add_new(tag_for_keyword('LargestPixelValueInSeries'), self.range_VR, _series_max)
@staticmethod
def _add_time(now, add):
"""Add time to present time now
Args:
now: string hhmmss.ms
add: float [s]
Returns:
newtime: string hhmmss.ms
"""
tnow = datetime.strptime(now, "%H%M%S.%f")
s = int(add)
ms = (add - s) * 1000.
tadd = timedelta(seconds=s, milliseconds=ms)
tnew = tnow + tadd
return tnew.strftime("%H%M%S.%f")