Source code for imagedata.apps.diffusion

"""Extract diffusion MRI parameters.
"""
import io

# Copyright (c) 2013-2026 Erling Andersen, Haukeland University Hospital, Bergen, Norway

import numpy as np
import struct
import pandas as pd
from pydicom import Dataset
from typing import Sequence, SupportsFloat
from ..series import Series
from ..formats import NotImageError
from ..archives.abstractarchive import Member

Number = type[SupportsFloat]


[docs] def get_g_vectors(img): """Get diffusion gradient vectors Extracting diffusion gradient vectors has been tested on MRI data from some major vendors. Args: img (imagedata.Series): Series object. Returns: pd.DataFrame: Diffusion gradient vectors, columns b, z, y, x. Raises: IndexError: when no gradient vector is present in dataset. Examples: >>> from imagedata import Series >>> from imagedata.apps.diffusion import get_g_vectors >>> im = Series('data', 'b', opts={'accept_duplicate_tag': 'True'}) >>> g = get_g_vectors(im) >>> print(g) b z y x 0 0 NaN NaN NaN 1 500 -0.706399 0.000000 0.707814 2 500 -0.706399 0.000000 -0.707814 3 500 -0.706399 -0.707814 0.000000 4 500 0.707814 -0.706399 0.000000 5 500 0.000000 -0.707107 0.707107 6 500 0.000000 -0.707107 -0.707107 7 1000 -0.706752 0.000000 0.707461 8 1000 -0.706752 0.000000 -0.707461 9 1000 -0.706753 -0.707460 0.000000 10 1000 0.707460 -0.706754 0.000000 11 1000 0.001414 -0.707106 0.707106 12 1000 0.001414 -0.707106 -0.707106 13 2500 -0.706824 0.000000 0.707390 """ def get_DICOM_g_vector(ds): # Attempt to address standard DICOM attributes return ds['DiffusionGradientOrientation'].value def get_Siemens_g_vector(ds): block = ds.private_block(0x0019, 'SIEMENS MR HEADER') return block[0x0e].value def get_GEMS_g_vector(ds): block = ds.private_block(0x0019, 'GEMS_ACQU_01') return [block[0xbb].value, block[0xbc].value, block[0xbd].value] _v = [] # Extract pydicom dataset for first slice and each tag _dwi_weighted = False for tag in range(img.shape[0]): _b = get_b_value(img, tag) _dwi_weighted = _dwi_weighted or not np.isnan(_b) _G = [np.nan, np.nan, np.nan] _ds: Dataset = img.dicomTemplate for _method in [get_DICOM_g_vector, get_Siemens_g_vector, get_GEMS_g_vector]: try: _G = _method(_ds) break except KeyError: pass _v.append({'b': _b, 'z': _G[2], 'y': _G[1], 'x': _G[0]}) if _dwi_weighted: return pd.DataFrame(_v) else: raise IndexError('No b values found in dataset')
[docs] def get_b_value(img: Series) -> float: """Get diffusion b value Extracting diffusion b value has been tested on MRI data from some major vendors. Args: img (imagedata.Series): Series object. Returns: float: b value. Returns NaN when no b value is present in dataset. """ # Extract pydicom dataset for given slice and tag _ds: Dataset = img.dicomTemplate return get_ds_b_value(_ds)
[docs] def get_ds_b_vectors(ds: Dataset) -> np.ndarray: """Get diffusion b vector from Dataset Getting diffusion b vector has been tested on MRI data from some major vendors. Args: ds: Input dataset Returns: b vector """ def get_DICOM_b_vector(_ds): # Attempt to address standard DICOM attribute return np.array(_ds['DiffusionGradientOrientation'].value) def get_Siemens_b_vector(_ds): block = _ds.private_block(0x0019, 'SIEMENS MR HEADER') diffusionDirectionality = block[0x0d].value if diffusionDirectionality == 'DIRECTIONAL': try: return np.array(block[0x0e].value) except KeyError: pass return np.array([]) def get_GEMS_b_vector(_ds): # Verify this is a GEMS dataset block = ds.private_block(0x0019, 'GEMS_ACQU_01') return np.array([block[0xbb].value, block[0xbc].value, block[0xbd].value]) errmsg = "" for _method in [get_DICOM_b_vector, get_Siemens_b_vector, get_GEMS_b_vector]: try: return _method(ds) except (KeyError, IndexError) as e: errmsg = '{}'.format(e) pass except NotImplementedError: pass raise IndexError('Cannot get b vector: {}'.format(errmsg))
[docs] def get_ds_b_value(ds: Dataset) -> float: """Get diffusion b value from Dataset Setting diffusion b value has been tested on MRI data from some major vendors. Args: ds: Input dataset Returns: b value """ def get_DICOM_b_value(_ds): # Attempt to address standard DICOM attribute return _ds['DiffusionBValue'].value def get_Siemens_b_value(_ds): block = _ds.private_block(0x0019, 'SIEMENS MR HEADER') _value = block[0x0c].value if _value is None: return None return int(_value) def get_GEMS_b_value(_ds): try: return _ds[0x0043, 0x39].value[0] except KeyError: return _ds[0x0043, 0x1039].value[0] errmsg = "" for _method in [get_DICOM_b_value, get_Siemens_b_value, get_GEMS_b_value]: try: return float(_method(ds)) except (KeyError, IndexError) as e: errmsg = '{}'.format(e) pass raise IndexError('Cannot get b value: {}'.format(errmsg))
[docs] def set_ds_b_value(ds: Dataset, value: Number): """Set diffusion b value Setting diffusion b value has been tested on MRI data from some major vendors. Args: ds (pydicom.Dataset): Dataset value: b value """ def set_DICOM_b_value(_ds, _value): # Attempt to address standard DICOM attribute _ds.DiffusionBValue = _value def set_Siemens_b_value(_ds, _value): block = _ds.private_block(0x0019, 'SIEMENS MR HEADER') block[0x0c].value = round(_value) pass def set_GEMS_b_value(_ds, _value): # ds[0x0043, 0x1039] = multival[_value, 0, 0, 0] block = _ds.private_block(0x0043, 'GEMS_PARM_01') _list = block[0x39].value _list[0] = _value block[0x39].value = _list if 'DiffusionBValue' in _ds: _ds.DiffusionBValue = _value for _method in [set_Siemens_b_value, set_GEMS_b_value, set_DICOM_b_value]: try: _method(ds, value) return except (KeyError, IndexError): pass raise IndexError('Cannot set b value')
[docs] def set_ds_b_vector(ds: Dataset, value: Sequence[Number]): """Set diffusion b vector Setting diffusion b vector has been tested on MRI data from some major vendors. Args: ds (pydicom.Dataset): Dataset value: b vector """ def set_DICOM_b_vector(_ds, _value): # Attempt to address standard DICOM attribute _ds.DiffusionGradientOrientation = _value.tolist() def set_Siemens_b_vector(_ds, _value): block = _ds.private_block(0x0019, 'SIEMENS MR HEADER') block[0x0d].value = 'DIRECTIONAL' block[0x0e].value = _value.tolist() if 'DiffusionGradientOrientation' in _ds: _ds.DiffusionGradientOrientation = _value.tolist() def set_GEMS_b_vector(_ds, _value): block = ds.private_block(0x0019, 'GEMS_ACQU_01') block[0xbb].value = _value[0] block[0xbc].value = _value[1] block[0xbd].value = _value[2] if 'DiffusionGradientOrientation' in _ds: _ds.DiffusionGradientOrientation = _value.tolist() # _name: str = '{}.{}'.format(__name__, set_ds_b_vector.__name__) for _method in [set_Siemens_b_vector, set_GEMS_b_vector, set_DICOM_b_vector]: try: _method(ds, value) return except (KeyError, IndexError) as e: errmsg = '{}'.format(e) pass except NotImplementedError: pass raise IndexError('Cannot set b vector: {}'.format(errmsg))
def _get_CSA1_header(data): values = {} try: (n_tags, unused2) = struct.unpack('<ii', data[:8]) except struct.error as e: raise NotImageError('{}'.format(e)) except Exception as e: # logging.debug('{}: exception\n{}'.format(_name, e)) raise NotImageError('{}'.format(e)) pos = 8 for t in range(n_tags): try: (name, vm, vr, syngodt, nitems, xx ) = struct.unpack('<64si4s3i', data[pos:pos + 84]) pos += 84 i = name.find(b'\0') name = name[:i] name = name.decode("utf-8") values[name] = [] for _item in range(nitems): (item_len, xx1, xx2, xx3 ) = struct.unpack('<4i', data[pos:pos + 16]) pos += 16 if item_len > 0: value = data[pos:pos + item_len] value = value.decode("utf-8").split('\0')[0].strip() pos += (item_len // 4) * 4 if item_len % 4 > 0: pos += 4 values[name].append(value) except struct.error as e: raise NotImageError('{}'.format(e)) return values def _get_CSA2_header(data): values = {} try: (hdr_id, unused1, n_tags, unused2) = struct.unpack('<4siii', data[:16]) except struct.error as e: raise NotImageError('{}'.format(e)) except Exception as e: # logging.debug('{}: exception\n{}'.format(_name, e)) raise NotImageError('{}'.format(e)) pos = 16 for t in range(n_tags): try: (name, vm, vr, syngodt, nitems, xx ) = struct.unpack('<64si4s3i', data[pos:pos + 84]) pos += 84 i = name.find(b'\0') name = name[:i] name = name.decode("utf-8") values[name] = [] for _item in range(nitems): (item_len, xx1, xx2, xx3 ) = struct.unpack('<4i', data[pos:pos + 16]) pos += 16 if item_len > 0: value = data[pos:pos + item_len] value = value.decode("utf-8").split('\0')[0].strip() pos += (item_len // 4) * 4 if item_len % 4 > 0: pos += 4 values[name].append(value) except struct.error as e: raise NotImageError('{}'.format(e)) return values def read_b_value_file(f: io.StringIO | str) -> np.ndarray: _bval = [] try: for _ in f.readline().split(): _bval.append(float(_)) except AttributeError: with open(f, 'r') as f: return read_b_value_file(f) return _bval def read_b_vector_file(f: io.StringIO) -> list[np.ndarray]: try: _line0 = f.readline().split() _line1 = f.readline().split() _line2 = f.readline().split() except AttributeError: with open(f, 'r') as f: return read_b_vector_file(f) _bvec = [] for _x, _y, _z in zip(_line0, _line1, _line2): _bvec.append(np.array((float(_x), -float(_y), float(_z)))) # Invert y direction return _bvec def write_b_value_file(f: io.StringIO | Member | str, bvalues: Series | Sequence[float | int]): _bvalues = bvalues if issubclass(type(bvalues), Series): try: _bvalues = bvalues.axes.b.values except AttributeError: _bvalues = [] for _ in bvalues.axes.dti.values: _bvalues.append(_[0]) try: # Assume io.StringIO f.write(' '.join([str(v) for v in _bvalues]) + '\n') except AttributeError: try: # Assume str with open(f, 'w') as _f: write_b_value_file(_f, _bvalues) except TypeError: # Assume Member with open(f.local_file, 'w') as _f: write_b_value_file(_f, _bvalues) def write_b_vector_file(f: io.StringIO | Member | str, bvectors: Series | list[np.ndarray]): _bvectors = bvectors if issubclass(type(bvectors), Series): try: _bvectors = bvectors.axes.bvector.values except AttributeError: _bvectors = [] for _ in bvectors.axes.dti.values: _bvectors.append(_[1]) _x, _y, _z = [], [], [] for _values in _bvectors: try: _x.append(_values[0]) _y.append(-_values[1]) # Invert y axis _z.append(_values[2]) except IndexError: if _values.size == 0: _x.append(0) _y.append(0) _z.append(0) else: raise try: # Assume io.StringIO f.write(' '.join([str(v) for v in _x]) + '\n') f.write(' '.join([str(v) for v in _y]) + '\n') f.write(' '.join([str(v) for v in _z]) + '\n') except AttributeError: try: # Assume str with open(f, 'w') as _f: write_b_vector_file(_f, _bvectors) except TypeError: # Assume Member with open(f.local_file, 'w') as _f: write_b_vector_file(_f, _bvectors)