"""Image cohort
The Cohort class is a collection of Patient objects.
Typical example usage:
cohort = Cohort('input')
"""
# Copyright (c) 2023-2024 Erling Andersen, Haukeland University Hospital, Bergen, Norway
from datetime import datetime, date, time
import argparse
from pathlib import Path
from typing import Union
from .series import Series
from .readdata import read as r_read
from .formats import UnknownInputError
def _get_attribute(_data, _attr):
# Get attribute from first instance in _data
if len(_data) < 1:
raise IndexError('No instance in present {}.'.format(type(_data)))
_instance = list(_data.values())[0]
if issubclass(type(_instance), Series):
return _instance.getDicomAttribute(_attr)
# Recurse to lower level, until finding a Series instance
return _get_attribute(_instance, _attr)
def _sort_in_series(
_data: Union[str, Path, dict, list],
_opts: dict) -> dict:
"""Sort series into a dict of Series
Called by Cohort for url data
Called by Patient, Study
Args:
_data: url (str) or dict of Series instances
_opts
Returns:
dict of Series instances, key is SeriesInstanceUID
"""
_series_dict = {}
if issubclass(type(_data), list):
if len(_data):
if issubclass(type(_data[0]), Series):
# _data is list of Series
for _series in _data:
_series_dict[_series.seriesInstanceUID] = _series
else:
raise ValueError(
'Unexpected series data type {}'.format(
type(_data[0])))
elif issubclass(type(_data), dict):
if len(_data):
if issubclass(type(list(_data.values())[0]), Series):
# _data is list of Series
for _seriesInstanceUID in _data:
_series_dict[_seriesInstanceUID] = _data[_seriesInstanceUID]
else:
raise ValueError(
'Unexpected series data type {}'.format(
type(list(_data.values())[0])))
elif issubclass(type(_data), Study):
raise ValueError('Why here')
# return {_data.seriesInstanceUID: _data}
elif issubclass(type(_data), str) or issubclass(type(_data), Path):
# _data is URL
# Read input, hdr is dict of attributes
_hdr, _si = r_read(_data, order='auto', opts=_opts)
if len(_hdr) < 1:
raise UnknownInputError('No input data found.')
for _uid in _hdr:
if _uid in _si:
_series = Series(_si[_uid], opts=_opts)
else:
_series = Series(None, opts=_opts)
_series.header = _hdr[_uid]
_series_dict[_uid] = _series
else:
raise ValueError('Unexpected series type {}'.format(type(_data)))
return _series_dict
def _sort_in_studies(_series_dict, _opts):
"""Sort series in studies
Called by Cohort for series dict
Args:
_series_dict: dict of Series instances
_opts:
Returns:
dict of Study instances, key is StudyInstanceUID
"""
_studies = {}
for _seriesInstanceUID in _series_dict:
_series = _series_dict[_seriesInstanceUID]
_studyInstanceUID = _series.studyInstanceUID
if _studyInstanceUID not in _studies:
_studies[_studyInstanceUID] = {}
_studies[_studyInstanceUID][_seriesInstanceUID] = _series
# Make found _studies into Study instances
_study_dict = {}
for _studyInstanceUID in _studies:
_study_dict[_studyInstanceUID] = Study(_studies[_studyInstanceUID], opts=_opts)
return _study_dict
# if issubclass(type(_data), dict):
# # _data is list of Study
# for _seriesInstanceUID in _data:
# _series = _data[_seriesInstanceUID]
# if _series.studyInstanceUID not in _studies:
# _studies[_series.studyInstanceUID] = {}
# _studies[_series.studyInstanceUID][_seriesInstanceUID] = _series
# return _studies
# raise ValueError('Why here?')
def _sort_in_patients(_study_dict, _opts):
"""Sort studies in patients
Called by Cohort for study dict
Args:
_study_dict: dict of Study instances
_opts:
Returns:
dict of Patient instances
"""
_patients = {}
for _studyInstanceUID in _study_dict:
_study = _study_dict[_studyInstanceUID]
_series = list(_study.values())[0]
_patientID = _series.patientID
if _patientID not in _patients:
_patients[_patientID] = {}
_study = Study(_study_dict[_studyInstanceUID])
_patients[_patientID][_studyInstanceUID] = _study
# Make found _patients into Patient instances
_patient_dict = {}
for _patientID in _patients:
# _patient_dict[_patientID] = _patients[_patientID]
_patient_dict[_patientID] = Patient(_patients[_patientID], opts=_opts)
return _patient_dict
# return _patients
# class IndexedDict(UserDict):
class IndexedDict(dict):
def __init__(self):
super(IndexedDict, self).__init__()
def _item_to_key_(self, item):
if isinstance(item, int):
return [list(self.keys())[item]]
elif isinstance(item, slice):
uids = []
start = 0 if item.start is None else item.start
stop = len(self) if item.stop is None else item.stop
step = 1 if item.step is None else item.step
for i in range(start, stop, step):
uids.append(list(self.keys())[i])
return uids
return [item]
def __getitem__(self, item):
keys = self._item_to_key_(item)
items = []
for key in keys:
items.append(
super(IndexedDict, self).__getitem__(key)
)
if len(items) == 1:
return items[0]
else:
return items
def __setitem__(self, item, val):
keys = self._item_to_key_(item)
if len(keys) != 1:
raise IndexError('Bad index: {}'.format(item))
super(IndexedDict, self).__setitem__(keys[0], val)
def __repr__(self, item=None):
if item is None:
dictrepr = super(IndexedDict, self).__repr__()
return '%s(%s)' % (type(self).__name__, dictrepr)
keys = self._item_to_key_(item)
s = ""
for key in keys:
dictrepr = super(IndexedDict, self).__repr__(key)
s += '%s(%s)' % (type(self).__name__, dictrepr)
return s
def __str__(self):
dictrepr = super(IndexedDict, self).__str__()
return '%s(%s)' % (type(self).__name__, dictrepr)
def update(self, *args, **kwargs):
for item, v in dict(*args, **kwargs).items():
keys = self._item_to_key_(item)
key = keys[0]
self[key] = v
class UnknownOptionType(Exception):
pass
class GeneralEquipment(object):
_equipment_attributes = [
'manufacturer', 'manufacturersModelName', 'stationName', 'deviceSerialNumber',
'softwareVersions'
]
def __init__(self, obj):
super(GeneralEquipment, self).__init__()
self.defined = False
for _attr in self._equipment_attributes:
_dicom_attribute = _attr[0].upper() + _attr[1:]
_value = obj.getDicomAttribute(_dicom_attribute)
if _value is not None:
setattr(self, _attr, _value)
self.defined = True
class ClinicalTrialSubject(object):
_clinical_trial_attributes = [
'sponsorName',
'protocolID',
'protocolName',
'siteID',
'siteName',
'subjectID',
'subjectReadingID',
'protocolEthicsCommitteeName',
'protocolEthicsCommitteeApprovalNumber'
]
def __init__(self, obj):
super(ClinicalTrialSubject, self).__init__()
for _attr in self._clinical_trial_attributes:
_dicom_attribute = 'ClinicalTrial' + _attr[0].upper() + _attr[1:]
setattr(self, _attr, _get_attribute(obj, _dicom_attribute))
[docs]
class Study(IndexedDict):
"""Study -- Read and sort images into a collection of Series objects.
Study(data, opts=None)
Examples:
>>> from imagedata import Study
>>> study = Study('directory/')
>>> for uid in study:
>>> series = study[uid]
Args:
data: URL to input data, or list of Series instances
opts: Dict of input options, mostly for format specific plugins
(argparse.Namespace or dict)
* 'strict_values': Whether study attributes should match in each series (bool)
Returns:
Study instance
"""
name = "Study"
description = "Image study"
authors = "Erling Andersen"
version = "1.0.0"
url = "www.helse-bergen.no"
_attributes = [
'studyDate', 'studyTime', 'studyDescription', 'studyID', 'studyInstanceUID',
'referringPhysiciansName'
]
def _verify_study_attributes(self, _series_dict: dict, _strict_values: bool):
"""Verify that study attributes match in each series"""
for _seriesUID in _series_dict:
_series = _series_dict[_seriesUID]
self[_seriesUID] = _series
for _attr in self._attributes:
_dicom_attribute = _attr[0].upper() + _attr[1:]
_value = self[_seriesUID].getDicomAttribute(_dicom_attribute)
if _value is not None and _attr == 'studyDate':
try:
_value = datetime.strptime(_value, "%Y%m%d")
except ValueError:
_value = None
elif _value is not None and _attr == 'studyTime':
try:
if '.' in _value:
_value = datetime.strptime(_value, "%H%M%S.%f").time()
else:
_value = datetime.strptime(_value, "%H%M%S").time()
except ValueError:
_value = datetime.time(_value)
# Update self property if None from series
if getattr(self, _attr, None) is None:
# _series = self[_seriesInstanceUID]
setattr(self, _attr, _value)
elif _value is None:
pass
elif _strict_values and \
getattr(self, _attr, None) != _value:
# Study attributes differ, should be considered an exception.
raise ValueError('Study attribute "{}" differ ("{}" vs. "{}")'.format(
_attr,
getattr(self, _attr, None),
_value
))
def _find_seriesInstanceUID(self, seriesInstanceUID: str) -> Series:
"""Find a series with a given seriesInstanceUID"""
for _seriesUID in self.keys():
_series = self[_seriesUID]
if _series.seriesInstanceUID == seriesInstanceUID:
return _series
return None
def _add_missing_geometry(self):
"""Add missing geometry information to series"""
for _seriesUID in self.keys():
_series = self[_seriesUID]
if not _series.header.geometryIsDefined:
template = self._find_seriesInstanceUID(_series.header.referencedSeriesUID)
if template is not None and template.header.geometryIsDefined is True \
and template.header.frameOfReferenceUID == _series.header.frameOfReferenceUID:
_series.header.add_geometry(template.header)
_series.header.geometryIsDefined = True
def __init__(self, data, opts: dict=None, **kwargs):
super(Study, self).__init__()
for _attr in self._attributes:
setattr(self, _attr, None)
if opts is None:
_in_opts = {}
elif issubclass(type(opts), dict):
_in_opts = opts
elif issubclass(type(opts), argparse.Namespace):
_in_opts = vars(opts)
else:
raise UnknownOptionType('Unknown opts type ({}): {}'.format(type(opts),
opts))
for key, value in kwargs.items():
_in_opts[key] = value
if 'input_options' in _in_opts:
for key, value in _in_opts['input_options'].items():
_in_opts[key] = value
_strict_values = True if 'strict_values' not in _in_opts \
else _in_opts['strict_values']
if issubclass(type(data), dict):
_series_dict = data
elif issubclass(type(data), Study):
raise ValueError("Why here?")
else:
# Assume data is URL
try:
_series_dict = _sort_in_series(data, _in_opts)
except Exception:
raise
self._verify_study_attributes(_series_dict, _strict_values)
self._add_missing_geometry()
for _seriesUID in _series_dict:
_equipment = GeneralEquipment(self[_seriesUID])
if _equipment.defined:
setattr(self, 'generalEquipment', _equipment)
break
def __str__(self):
_date = self.studyDate if self.studyDate is not None else date.min
_time = self.studyTime if self.studyTime is not None else time.min
_descr = self.studyDescription if self.studyDescription is not None else ''
return \
"Study: {} {}".format(datetime.combine(_date, _time), _descr)
[docs]
def write(self, url, opts=None, formats=None):
"""Write image data, calling appropriate format plugins
Args:
self: Study instance
url: output destination url
opts: Output options (argparse.Namespace or dict)
formats: list of output formats, overriding opts.output_format (list or str)
Raises:
imagedata.collections.UnknownOptionType: When opts cannot be made into a dict.
TypeError: List of output format is not list().
ValueError: Wrong number of destinations given,
or no way to write multidimensional image.
imagedata.formats.WriteNotImplemented: Cannot write this image format.
"""
_used_urls = []
for _seriesUID in self.keys():
_series = self[_seriesUID]
try:
series_str = '{}-{}'.format(_series.seriesNumber, _series.seriesDescription)
except (TypeError, ValueError):
series_str = _seriesUID
_url = "{}/{}".format(
url,
series_str
)
while _url in _used_urls:
_url = _url + "0" # Make unique output file
_used_urls.append(_url)
try:
_series.write(_url, opts=opts, formats=formats)
except Exception as e:
raise Exception(_url) from e
[docs]
class Patient(IndexedDict):
"""Patient -- Read and sort images into a collection of Study objects.
Patient(data, opts=None)
Examples:
>>> from imagedata import Patient
>>> patient = Patient('directory/')
>>> for uid in patient:
>>> study = patient[uid]
Args:
data: URL to input data, or list of Study instances
opts: Dict of input options, mostly for format specific plugins
(argparse.Namespace or dict)
* 'strict_values': Whether attributes should match in each study (bool)
Returns:
Patient instance
"""
name = "Patient"
description = "Image patient"
authors = "Erling Andersen"
version = "1.0.0"
url = "www.helse-bergen.no"
_attributes = ['patientName', 'patientID', 'patientBirthDate', 'patientSex',
'patientAge', 'patientSize', 'patientWeight', 'qualityControlSubject',
'patientIdentityRemoved', 'deidentificationMethod'
]
_strict_attributes = ['patientName', 'patientID', 'patientBirthDate', 'patientSex']
def __init__(self, data, opts=None, **kwargs):
super(Patient, self).__init__()
for _attr in self._attributes:
setattr(self, _attr, None)
if opts is None:
_in_opts = {}
elif issubclass(type(opts), dict):
_in_opts = opts
elif issubclass(type(opts), argparse.Namespace):
_in_opts = vars(opts)
else:
raise UnknownOptionType('Unknown opts type ({}): {}'.format(type(opts),
opts))
for key, value in kwargs.items():
_in_opts[key] = value
if 'input_options' in _in_opts:
for key, value in _in_opts['input_options'].items():
_in_opts[key] = value
_strict_values = True if 'strict_values' not in _in_opts \
else _in_opts['strict_values']
if issubclass(type(data), list) and len(data) and issubclass(type(data[0]), Study):
# Add each study to self patient
self._add_studies(data)
elif issubclass(type(data), dict) and issubclass(type(list(data.values())[0]), Study):
self._add_studies(data)
# _study_dict = data
# for _studyInstanceUID in data:
# self[_studyInstanceUID] = _study_dict[_studyInstanceUID]
else:
_series_dict = _sort_in_series(data, _in_opts)
_study_dict = _sort_in_studies(_series_dict, _in_opts)
# Add each study to self patient
for _studyInstanceUID in _study_dict:
# self[_studyInstanceUID] = Study(_study_dict[_studyInstanceUID])
self[_studyInstanceUID] = _study_dict[_studyInstanceUID]
for _studyInstanceUID in self.keys():
for _attr in self._attributes:
_dicom_attribute = _attr[0].upper() + _attr[1:]
_value = _get_attribute(self[_studyInstanceUID], _dicom_attribute)
if _value is not None and (_attr == 'patientSize' or _attr == 'patientWeight'):
try:
_value = float(_value)
except ValueError:
_value = None
# Update self property if None from study
if getattr(self, _attr, None) is None:
setattr(self, _attr, _value)
elif _strict_values and \
_attr in self._strict_attributes and \
getattr(self, _attr, None) != _value:
# Patient attributes differ, should be considered an exception.
raise ValueError('Patient attribute "{}" differ ("{}" vs. "{}")'.format(
_attr,
getattr(self, _attr, None),
_value
))
setattr(self, 'clinicalTrialSubject', ClinicalTrialSubject(self[_studyInstanceUID]))
def _add_studies(self, data):
"""Add each study to self patient
Args:
data: dict of Study instances
Returns:
self: added Study instances to self
"""
if issubclass(type(data), dict):
for _studyInstanceUID in data:
if issubclass(type(data[_studyInstanceUID]), Study):
self[_studyInstanceUID] = data[_studyInstanceUID]
else:
raise ValueError(
'Unexpected patient data type {}'.format(
type(data[_studyInstanceUID])
))
elif issubclass(type(data), list):
for _study in data:
if issubclass(type(_study), Study):
self[_study.studyInstanceUID] = _study
else:
raise ValueError(
'Unexpected study data type {}'.format(
type(_study)
))
else:
raise ValueError('Unexpected data type {} (expected: dict or list)'.format(type(data)))
def __str__(self):
try:
_id = self.patientID
except ValueError:
_id = ''
try:
_patientName = self.patientName
except ValueError:
_patientName = ''
return \
"Patient: [{}] {}".format(_id, _patientName)
[docs]
def write(self, url, opts=None, formats=None):
"""Write image data, calling appropriate format plugins
Args:
self: Patient instance
url: output destination url
opts: Output options (argparse.Namespace or dict)
formats: list of output formats, overriding opts.output_format (list or str)
Raises:
imagedata.collections.UnknownOptionType: When opts cannot be made into a dict.
TypeError: List of output format is not list().
ValueError: Wrong number of destinations given,
or no way to write multidimensional image.
imagedata.formats.WriteNotImplemented: Cannot write this image format.
"""
_used_urls = []
for _studyInstanceUID in self.keys():
_study = self[_studyInstanceUID]
try:
study_str = datetime.combine(
_study.studyDate, _study.studyTime
).strftime('%Y%m%d-%H%M%S')
except (TypeError, ValueError):
study_str = _studyInstanceUID
for _seriesInstanceUID in _study:
_series = _study[_seriesInstanceUID]
try:
series_str = '{}-{}'.format(_series.seriesNumber, _series.seriesDescription)
except (TypeError, ValueError):
series_str = _seriesInstanceUID
_url = "{}/{}/{}".format(
url,
study_str,
series_str
)
while _url in _used_urls:
_url = _url + "0" # Make unique output file
_used_urls.append(_url)
try:
_series.write(_url, opts=opts, formats=formats)
except Exception as e:
raise Exception(_url) from e
[docs]
class Cohort(IndexedDict):
"""Cohort -- Read and sort images into a collection of Patient objects.
Cohort(data, opts=None)
Examples:
>>> from imagedata import Cohort
>>> cohort = Cohort('directory/')
>>> for id in cohort:
>>> patient = cohort[id]
Args:
data: URL to input data, or list of Patient instances
opts: Dict of input options, mostly for format specific plugins
(argparse.Namespace or dict)
* 'strict_values': Whether attributes should match in each study (bool)
Returns:
Cohort instance
"""
name = "Cohort"
description = "Image cohort"
authors = "Erling Andersen"
version = "1.0.0"
url = "www.helse-bergen.no"
_attributes = []
def __init__(self, data, opts=None, **kwargs):
super(Cohort, self).__init__()
self.data = data
for _attr in self._attributes:
setattr(self, _attr, None)
if opts is None:
_in_opts = {}
elif issubclass(type(opts), dict):
_in_opts = opts
elif issubclass(type(opts), argparse.Namespace):
_in_opts = vars(opts)
else:
raise UnknownOptionType('Unknown opts type ({}): {}'.format(type(opts),
opts))
for key, value in kwargs.items():
_in_opts[key] = value
if 'input_options' in _in_opts:
for key, value in _in_opts['input_options'].items():
_in_opts[key] = value
_strict_values = True if 'strict_values' not in _in_opts \
else _in_opts['strict_values']
if issubclass(type(data), list) and len(data) and issubclass(type(data[0]), Patient):
# Add each patient to self cohort
self._add_patients(data, _in_opts)
elif issubclass(type(data), str) or issubclass(type(data), Path):
# Read input data from url
_series_dict = _sort_in_series(data, _in_opts)
_study_dict = _sort_in_studies(_series_dict, _in_opts)
_patient_dict = _sort_in_patients(_study_dict, _in_opts)
# Add each patient to self cohort
for _patientID in _patient_dict:
# self[_patientID] = Patient(_patient_dict[_patientID])
self[_patientID] = _patient_dict[_patientID]
else:
raise ValueError('Unexpected cohort data type {}'.format(type(data)))
for _patientID in self.keys():
for _attr in self._attributes:
_dicom_attribute = _attr[0].upper() + _attr[1:]
# Update self property if None from study
if getattr(self, _attr, None) is None:
setattr(self, _attr,
_get_attribute(self[_patientID], _dicom_attribute))
elif _strict_values and\
getattr(self, _attr, None) != \
_get_attribute(self[_patientID], _dicom_attribute):
# Patient attributes differ, should be considered an exception.
raise ValueError('Cohort attribute "{}" differ ("{}" vs. "{}")'.format(
_attr,
getattr(self, _attr, None),
_get_attribute(self[_patientID], _dicom_attribute)
))
def __str__(self):
return \
"Cohort: {}".format(self.data)
def _add_patients(self, data, opts=None):
"""Add each patient to self cohort
Args:
data: dict of Patient instances
opts:
Returns:
self: added Patient instances to self
"""
if issubclass(type(data), dict):
for _patientID in data:
if issubclass(type(data[_patientID], Patient)):
self[_patientID] = data[_patientID]
else:
raise ValueError(
'Unexpected patient data type {}'.format(
type(data[_patientID])
))
else:
raise ValueError('Unexpected data type {} (expected: dict)'.format(type(data)))
[docs]
def write(self, url, opts=None, formats=None):
"""Write image data, calling appropriate format plugins
Args:
self: Cohort instance
url: output destination url
opts: Output options (argparse.Namespace or dict)
formats: list of output formats, overriding opts.output_format (list or str)
Raises:
imagedata.collections.UnknownOptionType: When opts cannot be made into a dict.
TypeError: List of output format is not list().
ValueError: Wrong number of destinations given,
or no way to write multidimensional image.
imagedata.formats.WriteNotImplemented: Cannot write this image format.
"""
_used_urls = []
for _patientID in self.keys():
_patient = self[_patientID]
for _studyInstanceUID in _patient:
_study = _patient[_studyInstanceUID]
try:
study_str = datetime.combine(
_study.studyDate, _study.studyTime
).strftime('%Y%m%d-%H%M%S')
except (TypeError, ValueError):
study_str = _studyInstanceUID
for _seriesInstanceUID in _study:
_series = _study[_seriesInstanceUID]
try:
series_str = '{}-{}'.format(
_series.seriesNumber, _series.seriesDescription
)
except (TypeError, ValueError):
series_str = _seriesInstanceUID
_url = "{}/{}/{}/{}".format(
url,
_patientID,
study_str,
series_str
)
while _url in _used_urls:
_url = _url + "0" # Make unique output file
_used_urls.append(_url)
try:
_series.write(_url, opts=opts, formats=formats)
except Exception as e:
raise Exception(_url) from e