Source code for tests.test_transport_dicomtransport

import unittest
import os.path
import tempfile
import numpy as np
import argparse
from pydicom import dcmread
from pydicom.dataset import Dataset, FileMetaDataset
from pynetdicom import (
    AE, evt,
    debug_logger,
    QueryRetrievePresentationContexts,
    StoragePresentationContexts,
    PYNETDICOM_IMPLEMENTATION_UID,
    PYNETDICOM_IMPLEMENTATION_VERSION
)
from pynetdicom.sop_class import (
    StudyRootQueryRetrieveInformationModelGet,
    StudyRootQueryRetrieveInformationModelFind,
    PatientRootQueryRetrieveInformationModelFind
)

import imagedata.cmdline as cmdline
import imagedata.formats as formats
import imagedata.transports as transports
from imagedata.series import Series
from imagedata.transports.dicomtransport import storage_presentation_contexts


scpdir = None


# debug_logger()


[docs] class TestDicomTransport(unittest.TestCase):
[docs] def setUp(self): parser = argparse.ArgumentParser() cmdline.add_argparse_options(parser) self.opts = parser.parse_args([]) if len(self.opts.output_format) < 1: self.opts.output_format = ['dicom'] self.opts_calling_aet = parser.parse_args(['--calling_aet', 'Temp']) if len(self.opts_calling_aet.output_format) < 1: self.opts_calling_aet.output_format = ['dicom'] plugins = formats.get_plugins_list() self.dicom_plugin = None for pname, ptype, pclass in plugins: if ptype == 'dicom': self.dicom_plugin = pclass self.assertIsNotNone(self.dicom_plugin) self.setupDicomSCP()
[docs] def setupDicomSCP(self, root=None, port=11112): global scpdir ae = AE() ae.ae_title = 'Temp' ae.supported_contexts = StoragePresentationContexts handlers = [(evt.EVT_C_STORE, self.handle_store)] # Returns a ThreadedAssociationServer instance self.server = ae.start_server(('localhost', port), block=False, evt_handlers=handlers) if root is None: scpdir = [tempfile.TemporaryDirectory()] else: scpdir = root
[docs] def tearDown(self): global scpdir self.server.shutdown() for _dir in scpdir: if not issubclass(type(_dir), str): _dir.cleanup()
# Implement a handler evt.EVT_C_STORE # @staticmethod # def on_c_store(ds, context, info):
[docs] @staticmethod def handle_store(event): """Store the pydicom Dataset `ds`. Parameters ---------- event : pydicom.event Returns ------- status : int or pydicom.dataset.Dataset The status returned to the peer AE in the C-STORE response. Must be a valid C-STORE status value for the applicable Service Class as either an ``int`` or a ``Dataset`` object containing (at a minimum) a (0000,0900) *Status* element. """ global scpdir ds = event.dataset context = event.context # logging.debug('on_c_store: info {}'.format(info)) # calling_aet = info['requestor']['ae_title'] # called_aet = info['acceptor']['ae_title'] # logging.debug('on_c_store: calling_aet {}'.format(calling_aet)) # logging.debug('on_c_store: called_aet {}'.format(called_aet)) # Add the DICOM File Meta Information meta = FileMetaDataset() meta.MediaStorageSOPClassUID = ds.SOPClassUID meta.MediaStorageSOPInstanceUID = ds.SOPInstanceUID meta.ImplementationClassUID = PYNETDICOM_IMPLEMENTATION_UID meta.ImplementationVersionName = PYNETDICOM_IMPLEMENTATION_VERSION meta.TransferSyntaxUID = context.transfer_syntax # Add the file meta to the dataset ds.file_meta = meta # Save the dataset using the SOP Instance UID as the filename ds.save_as( os.path.join(scpdir[0].name, ds.SOPInstanceUID), ) # Return a 'Success' status return 0x0000
# @unittest.skip("skipping test_transport_single_image")
[docs] def test_transport_single_image(self): si1 = Series( os.path.join('data', 'dicom', 'time', 'time00', 'Image_00020.dcm') ) self.assertEqual((192, 152), si1.shape) si1.write('dicom://localhost:11112/Temp', formats=['dicom'], opts={'calling_aet': 'MyAET'}) si2 = Series( scpdir[0].name ) # compare_headers(obj, si1, si2) self.assertEqual(si1.dtype, si2.dtype) self.assertEqual(si1.shape, si2.shape) np.testing.assert_array_equal(si1, si2)
# @unittest.skip("skipping test_transport_single_image_calling_aet")
[docs] def test_transport_single_image_calling_aet(self): si1 = Series( os.path.join('data', 'dicom', 'time', 'time00', 'Image_00020.dcm') ) si1.write( 'dicom://localhost:11112/Temp', formats=['dicom'], opts={'calling_aet': 'MyAET'} )
[docs] class TestDicomTransportWithDB(unittest.TestCase): instances = {}
[docs] def setUp(self): parser = argparse.ArgumentParser() cmdline.add_argparse_options(parser) self.opts = parser.parse_args([]) if len(self.opts.output_format) < 1: self.opts.output_format = ['dicom'] self.opts_calling_aet = parser.parse_args(['--calling_aet', 'Temp']) if len(self.opts_calling_aet.output_format) < 1: self.opts_calling_aet.output_format = ['dicom'] plugins = formats.get_plugins_list() self.dicom_plugin = None for pname, ptype, pclass in plugins: if ptype == 'dicom': self.dicom_plugin = pclass self.assertIsNotNone(self.dicom_plugin)
[docs] def setupDicomSCP(self, root=None, port=11112): global scpdir global instances ae = AE() ae.ae_title = 'Temp' ae.supported_contexts = StoragePresentationContexts # QueryRetrievePresentationContexts # for context in storage_presentation_contexts: # ae.add_supported_context(context) for context in ae.supported_contexts: context.scp_role = True context.scu_role = False ae.add_supported_context(StudyRootQueryRetrieveInformationModelGet) ae.add_supported_context(StudyRootQueryRetrieveInformationModelFind) ae.add_supported_context(PatientRootQueryRetrieveInformationModelFind) handlers = [(evt.EVT_C_GET, self.handle_get), (evt.EVT_C_FIND, self.handle_find)] # Returns a ThreadedAssociationServer instance self.server = ae.start_server(('localhost', port), block=False, evt_handlers=handlers) if root is None: scpdir = [tempfile.TemporaryDirectory()] else: scpdir = root # Import stored SOP Instances instances = {} for fdir in scpdir: for fpath in os.listdir(fdir): instance = dcmread(os.path.join(fdir, fpath)) instances[instance.SOPInstanceUID] = instance
[docs] def tearDown(self): global scpdir self.server.shutdown() for _dir in scpdir: if not issubclass(type(_dir), str): _dir.cleanup()
[docs] @staticmethod def handle_find(event): global scpdir global instances ds = event.identifier if 'QueryRetrieveLevel' not in ds: # Failure yield 0xC000, None return matching = instances.values() if 'PatientID' in ds and ds.PatientID not in ['*', '', '?', None]: _matching = [ inst for inst in matching if inst.PatientID == ds.PatientID ] matching = _matching if 'StudyInstanceUID' in ds and ds.StudyInstanceUID not in ['*', '', '?', None]: _matching = [ inst for inst in matching if inst.StudyInstanceUID == ds.StudyInstanceUID ] matching = _matching if 'SeriesInstanceUID' in ds and ds.SeriesInstanceUID not in ['*', '', '?', None]: _matching = [ inst for inst in matching if inst.SeriesInstanceUID == ds.SeriesInstanceUID ] matching = _matching if 'SOPInstanceUID' in ds and ds.SOPInstanceUID not in ['*', '', '?', None]: _matching = [ inst for inst in matching if inst.SOPInstanceUID == ds.SOPInstanceUID ] matching = _matching match ds.QueryRetrieveLevel: case 'PATIENT': _matching, _patients = [], [] for inst in matching: if inst.PatientID not in _patients: _patients.append(inst.PatientID) _matching.append(inst) case 'STUDY': _matching, _studies = [], [] for inst in matching: if inst.StudyInstanceUID not in _studies: _studies.append(inst.StudyInstanceUID) _matching.append(inst) case 'SERIES': _matching, _series = [], [] for inst in matching: if inst.SeriesInstanceUID not in _series: _series.append(inst.SeriesInstanceUID) _matching.append(inst) case 'IMAGE': _matching, _instances = [], [] for inst in matching: if inst.SOPInstanceUID not in _instances: _instances.append(inst.SOPInstanceUID) _matching.append(inst) matching = _matching for instance in matching: # Check if C-CANCEL has been received if event.is_cancelled: yield (0xFE00, None) return identifier = Dataset() # Copy requested elements for element in ds: if element.value in ['*', '', '?', None] and element.tag in instance: identifier[element.tag] = instance[element.tag] identifier.QueryRetrieveLevel = ds.QueryRetrieveLevel # Pending yield (0xFF00, identifier)
[docs] @staticmethod def handle_get(event): global scpdir global instances ds = event.identifier context = event.context meta = FileMetaDataset() if 'QueryRetrieveLevel' not in ds: # Failure yield 0xC000, None return if ds.QueryRetrieveLevel == 'SERIES': if 'SeriesInstanceUID' in ds: matching = [ inst for inst in instances.values() if inst.SeriesInstanceUID == ds.SeriesInstanceUID ] # Yield the total number of C-STORE sub-operations required yield len(matching) # Yield the matching instances for instance in matching: # Check if C-CANCEL has been received if event.is_cancelled: yield (0xFE00, None) return # Pending yield (0xFF00, instance)
# @unittest.skip("skipping test_transport_walk")
[docs] def test_transport_walk(self): si0 = Series( os.path.join('data', 'dicom', 'time', 'time00'), 'none', input_format='dicom') patID = si0.patientID stuInsUID = si0.studyInstanceUID serInsUID = si0.seriesInstanceUID si1 = Series( os.path.join('data', 'dicom', 'dwi'), 'b', input_format='dicom') si2 = Series( os.path.join('data', 'dicom', 'cor_oblique'), 'none', input_format='dicom') self.setupDicomSCP(root=[os.path.join('data', 'dicom', 'time', 'time00'), os.path.join('data', 'dicom', 'dwi'), os.path.join('data', 'dicom', 'cor_oblique')], port=11112) transport = transports.Transport( 'dicom://localhost:11112/Temp', opts={'calling_aet': 'MyAET'}) # for root, dirs, files in transport.walk('{}/*cerebrum*'.format(patID)): expect = [] expect.append([ '/Temp/19.02.07-14:04:17-STD-1.3.12.2.1107.5.2.43.66035', '1.3.12.2.1107.5.2.43.66035.30000019020708423676500000016', 0, '2019.02.07 140516.555000' ]) expect.append([ '/Temp/19.02.07-14:04:17-STD-1.3.12.2.1107.5.2.43.66035/1.3.12.2.1107.5.2.43.66035.30000019020708423676500000016', '2.16.578.1.37.1.1.2.8323329.32554.1549625089', '1.3.12.2.1107.5.2.43.66035.2019020714224132517098797.0.0.0', 0, '#14: 0 MR fl3d_dynamic', '#7: 0 MR ep2d_diff_b50_400_800_tra_p2_TRACEW_DFC_MIX' ]) expect.append([ '/Temp/19.02.07-14:04:17-STD-1.3.12.2.1107.5.2.43.66035/1.3.12.2.1107.5.2.43.66035.30000019020708423676500000016/2.16.578.1.37.1.1.2.8323329.32554.1549625089', 3 ]) expect.append([ '/Temp/19.02.07-14:04:17-STD-1.3.12.2.1107.5.2.43.66035/1.3.12.2.1107.5.2.43.66035.30000019020708423676500000016/1.3.12.2.1107.5.2.43.66035.2019020714224132517098797.0.0.0', 90 ]) i = 0 for root, dirs, files in transport.walk('{}/*'.format(patID)): response = [] response.append(root) for dir in dirs: response.append(dir) response.append(len(files)) for dir in dirs: info = transport.info('{}/{}'.format(root, dir)) response.append(info.strip()) self.assertEqual(expect[i], response) i += 1 expect = [] expect.append([ '/Temp/372372478', '1.3.12.2.1107.5.2.43.66035.30000025050805093675600000001', 0, '2025.05.08 070936.739000 imagedata' ]) expect.append([ '/Temp/372372478/1.3.12.2.1107.5.2.43.66035.30000025050805093675600000001', '2.16.578.1.37.1.1.2.5056857de2.3033286.1747206092.145', 0, '#7: 0 MR t2_tse_c_t_30' ]) expect.append([ '/Temp/372372478/1.3.12.2.1107.5.2.43.66035.30000025050805093675600000001/2.16.578.1.37.1.1.2.5056857de2.3033286.1747206092.145', 40 ]) i = 0 for root, dirs, files in transport.walk('{}/'.format(si2.patientID)): response = [] response.append(root) for dir in dirs: response.append(dir) response.append(len(files)) for dir in dirs: info = transport.info('{}/{}'.format(root, dir)) response.append(info.strip()) self.assertEqual(expect[i], response) i += 1 expect = [] expect.append([ '/Temp/19.02.07-14:04:17-STD-1.3.12.2.1107.5.2.43.66035/1.3.12.2.1107.5.2.43.66035.30000019020708423676500000016', '2.16.578.1.37.1.1.2.8323329.32554.1549625089', '1.3.12.2.1107.5.2.43.66035.2019020714224132517098797.0.0.0', 0, '#14: 0 MR fl3d_dynamic', '#7: 0 MR ep2d_diff_b50_400_800_tra_p2_TRACEW_DFC_MIX' ]) expect.append([ '/Temp/19.02.07-14:04:17-STD-1.3.12.2.1107.5.2.43.66035/1.3.12.2.1107.5.2.43.66035.30000019020708423676500000016/2.16.578.1.37.1.1.2.8323329.32554.1549625089', 3 ]) expect.append([ '/Temp/19.02.07-14:04:17-STD-1.3.12.2.1107.5.2.43.66035/1.3.12.2.1107.5.2.43.66035.30000019020708423676500000016/1.3.12.2.1107.5.2.43.66035.2019020714224132517098797.0.0.0', 90 ]) i = 0 for root, dirs, files in transport.walk('{}/{}/*'.format(patID, stuInsUID)): response = [] response.append(root) for dir in dirs: response.append(dir) response.append(len(files)) for dir in dirs: info = transport.info('{}/{}'.format(root, dir)) response.append(info.strip()) self.assertEqual(expect[i], response) i += 1 expect = [] expect.append([ '/Temp/19.02.07-14:04:17-STD-1.3.12.2.1107.5.2.43.66035/1.3.12.2.1107.5.2.43.66035.30000019020708423676500000016/2.16.578.1.37.1.1.2.8323329.32554.1549625089', 3, '', '', '' ]) i = 0 for root, dirs, files in transport.walk('{}/{}/{}/*'.format(patID, stuInsUID, serInsUID)): response = [] response.append(root) for dir in dirs: response.append(dir) response.append(len(files)) for dir in dirs: info = transport.info('{}/{}'.format(root, dir)) response.append(info.strip()) for filename in files: info = transport.info('{}/{}'.format(root, filename)) response.append(info.strip()) self.assertEqual(expect[i], response) i += 1 transport.close()
# @unittest.skip("skipping test_transport_cget_series")
[docs] def test_transport_cget_series(self): si0 = Series( os.path.join('data', 'dicom', 'time', 'time00'), 'none', input_format='dicom') patID = si0.patientID stuInsUID = si0.studyInstanceUID serInsUID = si0.seriesInstanceUID accno = si0.accessionNumber serNum = si0.seriesNumber self.setupDicomSCP(root=[os.path.join('data', 'dicom', 'time', 'time00')], port=11112) si1 = Series('dicom://localhost:11112/Temp/{}/{}/{}'.format( patID, stuInsUID, serInsUID), opts = {'calling_aet': 'MyAET'}, input_format='dicom') self.assertEqual((3, 192, 152), si1.shape) np.testing.assert_array_almost_equal((3, 2.0833, 2.0833), si1.spacing, decimal=4) self.assertEqual(14, si1.seriesNumber) si2 = Series('dicom://localhost:11112/Temp/{}/{}/{}'.format( patID, accno, serNum), opts = {'calling_aet': 'MyAET'}, input_format='dicom') self.assertEqual((3, 192, 152), si1.shape) np.testing.assert_array_almost_equal((3, 2.0833, 2.0833), si2.spacing, decimal=4)