Source code for imagedata.transports.dicomtransport

"""Transfer DICOM images to and from DICOM Storage SCP
"""

# Copyright (c) 2019-2024 Erling Andersen, Haukeland University Hospital, Bergen, Norway

from typing import Optional
import platform
import urllib
import logging
import pydicom
import pynetdicom
from .abstracttransport import AbstractTransport
from . import FunctionNotSupported

logger = logging.getLogger(__name__)

presentation_contexts = [pynetdicom.sop_class.CTImageStorage, pynetdicom.sop_class.MRImageStorage,
                         pynetdicom.sop_class.ComputedRadiographyImageStorage,
                         pynetdicom.sop_class.DigitalXRayImagePresentationStorage,
                         pynetdicom.sop_class.DigitalXRayImageProcessingStorage,
                         pynetdicom.sop_class.DigitalMammographyXRayImagePresentationStorage,
                         pynetdicom.sop_class.DigitalMammographyXRayImageProcessingStorage,
                         pynetdicom.sop_class.UltrasoundMultiframeImageStorage,
                         pynetdicom.sop_class.UltrasoundImageStorage,
                         pynetdicom.sop_class.SecondaryCaptureImageStorage,
                         pynetdicom.sop_class.XRayAngiographicImageStorage,
                         pynetdicom.sop_class.XRayRadiofluoroscopicImageStorage,
                         pynetdicom.sop_class.NuclearMedicineImageStorage,
                         pynetdicom.sop_class.ParametricMapStorage
                         ]


# Future enhancements:
# EnhancedCTImageStorage,LegacyConvertedEnhancedCTImageStorage,
# EnhancedMRImageStorage, EnhancedMRColorImageStorage, LegacyConvertedEnhancedMRImageStorage,
# EnhancedUSVolumeStorage,
# MultiFrameSingleBitSecondaryCaptureImageStorage,
# MultiFrameGrayscaleByteSecondaryCaptureImageStorage,
# MultiFrameGrayscaleWordSecondaryCaptureImageStorage,
# MultiFrameTrueColorSecondaryCaptureImageStorage,
# StandaloneOverlayStorage, StandaloneCurveStorage,
# EnhancedXAImageStorage, EnhancedXRFImageStorage, XRayAngiographicBiPlaneImageStorage,
# XRay3DAngiographicImageStorage, XRay3DCraniofacialImageStorage, BreastTomosynthesisImageStorage,
# SpatialRegistrationStorage, SpatialFiducialsStorage, DeformableSpatialRegistrationStorage,
# SegmentationStorage, SurfaceSegmentationStorage, TractographyResultsStorage,
# PositronEmissionTomographyImageStorage, LegacyConvertedEnhancedPETImageStorage,
# StandalonePETCurveStorage, EnhancedPETImageStorage,
# RTImageStorage,


[docs] class AssociationNotEstablished(Exception): pass
[docs] class AssociationFailed(Exception): pass
[docs] class DicomTransport(AbstractTransport): """Send DICOM images to DICOM Storage SCP """ name = "dicom" description = "Receive/Send DICOM images to DICOM Storage SCP." authors = "Erling Andersen" version = "2.0.0" url = "www.helse-bergen.no" schemes = ["dicom"] __catalog = {} def __init__(self, netloc: Optional[str] = None, root: Optional[str] = None, mode: Optional[str] = 'r', read_directory_only: Optional[bool] = False, opts: Optional[dict] = None): super(DicomTransport, self).__init__(self.name, self.description, self.authors, self.version, self.url, self.schemes) if opts is None: opts = {} self.read_directory_only = read_directory_only logger.debug("DicomTransport __init__ root: {} ({})".format(root, mode)) try: root_split = root.split('/') aet = root_split[1] if len(aet) < 1: raise ValueError('AE Title not given') except IndexError: raise ValueError('AE Title not given') self.__root = root self.__mode = mode self.__aet = aet self.__local = False self.__files = {} if len(root_split) == 2: self.patID, self.study_or_accession, self.series = None, None, None elif len(root_split) == 3: self.patID, self.study_or_accession, self.series = None, None, root_split[2] elif len(root_split) == 4: self.patID, self.study_or_accession, self.series = None, None, root_split[2] elif len(root_split) == 5: self.patID, self.study_or_accession, self.series = root_split[2:] else: raise ValueError('Wrong URL {}'.format(root)) # Open DICOM Storage Association as SCU if 'calling_aet' in opts and opts['calling_aet'] is not None: self.__local_aet = opts['calling_aet'] else: try: hostname = platform.node() self.__local_aet = hostname.split('.')[0] except IndexError: self.__local_aet = 'IMAGEDATA' logger.debug("DicomTransport __init__ calling AET: {}".format(self.__local_aet)) self.__ae = pynetdicom.AE(ae_title=self.__local_aet) self.__ae.requested_contexts = pynetdicom.presentation.QueryRetrievePresentationContexts # self.__ae.requested_contexts = [ # pynetdicom.presentation.QueryRetrievePresentationContexts, # pynetdicom.presentation.StoragePresentationContexts.MRImageStorage, # pynetdicom.presentation.StoragePresentationContexts.CTImageStorage, # ] # pynetdicom.StoragePresentationContexts for context in [pynetdicom.sop_class.MRImageStorage, pynetdicom.sop_class.CTImageStorage]: self.__ae.add_requested_context(context) # self.__ae.add_requested_context(pynetdicom.sop_class.PatientRootQueryRetrieveInformationModelFind) # self.__ae.add_requested_context(pynetdicom.sop_class.StudyRootQueryRetrieveInformationModelFind) self.__host, port = netloc.split(':') self.__port = int(port) self.__assoc = self.__ae.associate(self.__host, self.__port, ae_title=self.__aet) if self.__assoc.is_established: return else: raise AssociationNotEstablished( 'Association rejected, aborted or never connected')
[docs] def close(self): """Close the DICOM association transport """ self.__assoc.release()
[docs] def walk(self, top): """Generate the file names in a directory tree by walking the tree. Input: - top: starting point for walk (str) Return: - tuples of (root, dirs, files) """ if self.__mode[0] == 'w': # Do not search the DICOM archive on write return if top[0] != '/': # Add self.root to relative tree top top = self.__root + '/' + top url_tuple = urllib.parse.urlsplit(top) url = url_tuple.path.split('/') patient_search = url[2] if len(url) >= 3 else None study_search = url[3] if len(url) >= 4 else None series_search = url[4] if len(url) >= 5 else None instance_search = url[5] if len(url) >= 6 else None # Walk the patient list, should be one only if patient_search is None: # Do not allow to search multiple patients - could result in too many matches raise ValueError('Patient ID must be provided: {}'.format(top)) patients = self._cfind_patient(patient_search) if len(patients) < 1: raise ValueError('Patient ID {} not found'.format(patient_search)) elif len(patients) > 1: raise ValueError('Patient ID {} match multiple patients'.format(patient_search)) patient_id = patients[0] if study_search is None: yield '/{}'.format(self.__aet), [patient_id], [] # Walk the study list studies = self._cfind_studies(patient_id, study_search) # catalog = {} if series_search is None: yield '/{}/{}'.format(self.__aet, patient_id), studies, [] for study_instance_uid in studies: # catalog[study_instance_uid] = {} # Walk the series list series = self._cfind_series(study_instance_uid, series_search) if instance_search is None: yield '/{}/{}/{}'.format(self.__aet, patient_id, study_instance_uid), series, [] for series_instance_uid in series: # catalog[study_instance_uid][series_instance_uid] = {} # Walk the instance list instances = self._cfind_instances(study_instance_uid, series_instance_uid, instance_search) yield '/{}/{}/{}/{}'.format(self.__aet, patient_id, study_instance_uid, series_instance_uid), [], instances
[docs] def isfile(self, path): """Return True if path is an existing regular file. """ raise FunctionNotSupported('Accessing the DICOM server is not supported.')
[docs] def exists(self, path): """Determine whether the named path exists. """ return False
[docs] def open(self, path, mode='r'): """Extract a member from the archive as a file-like object. """ # raise FunctionNotSupported('Open the DICOM server is not supported.') if mode[0] == 'r': study_instance_uid, series_instance_uid, sop_instance_uid = path.split('/')[3:6] if sop_instance_uid not in self.__files: self._cget_series('/tmp', study_instance_uid, series_instance_uid) if sop_instance_uid not in self.__files: raise FileNotFoundError('File not found: {}'.format(path)) return self.__files[sop_instance_uid]
[docs] def store(self, ds): """Store DICOM dataset using DICOM Storage SCU protocol. """ logger.debug('DicomTransport.store: send dataset') status = self.__assoc.send_c_store(ds) if status: logger.debug('DicomTransport.store: C-STORE request status: ' '0x{0:04x}'.format(status.Status)) else: raise AssociationFailed('C-STORE request status: 0x{0:04x}'.format(status.Status))
[docs] def info(self, path) -> str: """Return info describing the object Args: path (str): object path Returns: description (str): Preferably a one-line string describing the object """ if path[0] != '/': # Add self.root to relative path path = self.root + '/' + path url_tuple = urllib.parse.urlsplit(path) url = url_tuple.path.split('/') if len(url) < 2: raise ValueError('Too few terms in directory tree {}'.format(path)) elif len(url) == 2: # Describe AET return url[1] elif len(url) == 3: # Describe patient return url[2] elif len(url) == 4: # Describe study patient_id, study_instance_uid = url[2:] if study_instance_uid in self.__catalog: study = self.__catalog[study_instance_uid] study_date = study.StudyDate if len(study_date) == 8: study_date = '{}.{}.{}'.format( study_date[:4], study_date[4:6], study_date[6:8]) study_time = study.StudyTime if len(study_time) == 6: study_time = '{}:{}:{}'.format( study_time[:2], study_time[2:4], study_time[4:6]) return '{} {} {} {}'.format( study_date, study_time, study.AccessionNumber, study.StudyDescription) return '' elif len(url) == 5: # Describe series patient_id, study_instance_uid, series_instance_uid = url[2:] if series_instance_uid in self.__catalog: series = self.__catalog[series_instance_uid] try: series_number = int(series.SeriesNumber) except AttributeError: series_number = 0 try: series_description = series.SeriesDescription except AttributeError: series_description = '' return '#{}: {} {} {}'.format( series_number, series.NumberOfSeriesRelatedInstances, series.Modality, series_description) return '' elif len(url) == 6: # Describe instance patient_id, study_instance_uid, series_instance_uid, sop_instance_uid = url[2:] if sop_instance_uid in self.__catalog: instance = self.__catalog[sop_instance_uid] try: return '{} {}x{}x{}'.format( instance.InstanceNumber, instance.NumberOfFrames, instance.Rows, instance.Columns) except AttributeError: return '' return ''
# Implement the handler for evt.EVT_C_STORE def _handle_store(self, event): """Handle a C-STORE request event.""" ds = event.dataset ds.file_meta = event.file_meta # Save dataset in self.__files sop_instance_uid = ds.SOPInstanceUID self.__files[sop_instance_uid] = ds # Return a 'Success' status return 0x0000 def _cget_series(self, destdir, study_instance_uid, series_instance_UID): handlers = [(pynetdicom.evt.EVT_C_STORE, self._handle_store)] # # Initialise the Application Entity ae = pynetdicom.AE(ae_title=self.__local_aet) ae.add_requested_context(pynetdicom.sop_class.StudyRootQueryRetrieveInformationModelGet) # Add the requested presentation contexts (Storage SCP) roles = [] for storage_class in presentation_contexts: # Add the requested presentation contexts (QR SCU) ae.add_requested_context(storage_class) # Create an SCP/SCU Role Selection Negotiation item for CT Image Storage roles.append(pynetdicom.build_role(storage_class, scp_role=True)) # Create our Identifier (query) dataset # We need to supply a Unique Key Attribute for each level above the # Query/Retrieve level ds = pydicom.dataset.Dataset() ds.QueryRetrieveLevel = 'SERIES' # Unique key for SERIES level ds.SeriesInstanceUID = series_instance_UID # Associate with peer AE at IP 127.0.0.1 and port 11112 assoc = ae.associate(self.__host, self.__port, ae_title=self.__aet, ext_neg=roles, evt_handlers=handlers) if assoc.is_established: # Use the C-GET service to send the identifier responses = assoc.send_c_get( ds, pynetdicom.sop_class.StudyRootQueryRetrieveInformationModelGet) for (status, identifier) in responses: if status: pass else: raise ConnectionError( 'Connection timed out, was aborted or received invalid response') # Release the association assoc.release() else: raise ConnectionError('Association rejected, aborted or never connected') def _cfind_patient(self, patient_id): # Create our Identifier (query) dataset ds = pydicom.dataset.Dataset() ds.PatientID = patient_id ds.QueryRetrieveLevel = 'PATIENT' ds.PatientName = '' ds.PatientBirthDate = '' ds.PatientSex = '' return self._cfind(ds, pynetdicom.sop_class.PatientRootQueryRetrieveInformationModelFind, 'PatientID') def _cfind_studies(self, patient_id, search): instances = [] # Create our Identifier (query) dataset for keyword in 'StudyInstanceUID', 'AccessionNumber', 'StudyDescription': tag = pydicom.dataset.tag_for_keyword(keyword) ds = pydicom.dataset.Dataset() ds.PatientID = patient_id ds.StudyInstanceUID = '' ds.QueryRetrieveLevel = 'STUDY' ds.StudyDate = '' ds.StudyTime = '' ds.AccessionNumber = '' ds.StudyDescription = '' ds.NumberOfStudyRelatedSeries = '' ds.NumberOfStudyRelatedInstances = '' ds[tag] = pydicom.dataset.DataElement(tag, pydicom.datadict.dictionary_VR(tag), search) instances2 =\ self._cfind(ds, pynetdicom.sop_class.PatientRootQueryRetrieveInformationModelFind, 'StudyInstanceUID') for instance in instances2: if instance not in instances: instances.append(instance) return instances def _cfind_series(self, study_instance_uid, search): instances = [] # Create our Identifier (query) dataset for keyword in 'SeriesInstanceUID', 'SeriesNumber': tag = pydicom.dataset.tag_for_keyword(keyword) ds = pydicom.dataset.Dataset() ds.StudyInstanceUID = study_instance_uid ds.SeriesInstanceUID = '' ds.QueryRetrieveLevel = 'SERIES' ds.SeriesNumber = '' ds.SeriesDescription = '' ds.Modality = '' ds.BodyPartExamined = '' ds.NumberOfSeriesRelatedInstances = '' VR = pydicom.datadict.dictionary_VR(tag) if VR == 'IS': try: ds[tag] = pydicom.dataset.DataElement( tag, pydicom.datadict.dictionary_VR(tag), int(search)) except (ValueError, TypeError): continue else: ds[tag] = pydicom.dataset.DataElement( tag, pydicom.datadict.dictionary_VR(tag), search) instances2 =\ self._cfind(ds, pynetdicom.sop_class.StudyRootQueryRetrieveInformationModelFind, 'SeriesInstanceUID') for instance in instances2: if instance not in instances: instances.append(instance) return instances def _cfind_instances(self, study_instance_uid, series_instance_uid, search): instances = [] # Create our Identifier (query) dataset for keyword in 'SOPInstanceUID', 'InstanceNumber': tag = pydicom.dataset.tag_for_keyword(keyword) ds = pydicom.dataset.Dataset() ds.StudyInstanceUID = study_instance_uid ds.SeriesInstanceUID = series_instance_uid ds.QueryRetrieveLevel = 'IMAGE' ds.SOPInstanceUID = '' ds.InstanceNumber = '' ds.NumberOfFrames = '' ds.Rows = '' ds.Columns = '' VR = pydicom.datadict.dictionary_VR(tag) if VR == 'IS': try: ds[tag] = pydicom.dataset.DataElement( tag, pydicom.datadict.dictionary_VR(tag), int(search)) except (ValueError, TypeError): continue else: ds[tag] = pydicom.dataset.DataElement( tag, pydicom.datadict.dictionary_VR(tag), search) instances2 =\ self._cfind(ds, pynetdicom.sop_class.StudyRootQueryRetrieveInformationModelFind, 'SOPInstanceUID') for instance in instances2: if instance not in instances: instances.append(instance) return instances def _cfind(self, ds, model, tag): # Associate with the peer AE if self.__assoc.is_established: # Send the C-FIND request responses = self.__assoc.send_c_find(ds, model) instances = [] for (status, identifier) in responses: if status: if identifier is not None: uid = identifier[tag].value instances.append(uid) self.__catalog[uid] = identifier else: raise ConnectionError( 'Connection timed out, was aborted or received invalid response') return instances else: raise ConnectionError('Association rejected, aborted or never connected')