"""Abstract class for image formats.
Defines generic functions.
"""
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
# Copyright (c) 2017-2024 Erling Andersen, Haukeland University Hospital, Bergen, Norway
from abc import ABCMeta, abstractmethod # , abstractproperty
import logging
import numpy as np
from . import NotImageError, shape_to_str, INPUT_ORDER_TIME, SORT_ON_TAG
from ..header import Header
from ..archives.abstractarchive import AbstractArchive
logger = logging.getLogger(__name__)
[docs]
class NoOtherInstance(Exception):
pass
[docs]
class AbstractPlugin(object, metaclass=ABCMeta):
"""Abstract base class definition for imagedata format plugins.
Plugins must be a subclass of AbstractPlugin and
must define the atttributes set in __init__() and
the following methods:
read() method
write_3d_numpy() method
write_4d_numpy() method
Attributes:
input_order
tags
transformationMatrix
"""
plugin_type = 'format'
def __init__(self, name, description, authors, version, url):
object.__init__(self)
self.__name = name
self.__description = description
self.__authors = authors
self.__version = version
self.__url = url
self.input_order = None
self.tags = None
self.transformationMatrix = None
@property
def name(self):
"""Plugin name
Single word string describing the image format.
Typical names: dicom, nifti, itk.
"""
return self.__name
@property
def description(self):
"""Plugin description
Single line string describing the image format.
"""
return self.__description
@property
def authors(self):
"""Plugin authors
Multi-line string naming the author(s) of the plugin.
"""
return self.__authors
@property
def version(self):
"""Plugin version
String giving the plugin version.
Version scheme: 1.0.0
"""
return self.__version
@property
def url(self):
"""Plugin URL
URL string to the site of the plugin or the author(s).
"""
return self.__url
[docs]
def read(self, sources, pre_hdr, input_order, opts):
"""Read image data
Generic version for images which will be sorted on their filenames.
Args:
sources: list of sources to image data
pre_hdr: DICOM template
input_order: sort order
opts: Input options (dict)
Returns:
Tuple of
- hdr: Header dict
- si[tag,slice,rows,columns]: numpy array
"""
hdr = Header()
hdr.input_format = self.name
hdr.input_order = input_order
# image_list: list of tuples (hdr,si)
logger.debug("AbstractPlugin.read: sources {}".format(sources))
image_list = list()
for source in sources:
logger.debug("AbstractPlugin.read: source: {} {}".format(type(source), source))
archive: AbstractArchive = source['archive']
scan_files = source['files']
if scan_files is None or len(scan_files) == 0:
if archive.base is not None:
scan_files = [archive.base]
else:
scan_files = ['*']
elif archive.base is not None:
raise ValueError('When is archive.base with source[files]')
# if scan_files is None or len(scan_files) == 0:
# scan_files = archive.getnames()
# logger.debug("AbstractPlugin.read: scan_files {}".format(scan_files))
for file_handle in archive.getmembers(scan_files):
logger.debug("AbstractPlugin.read: file_handle {}".format(file_handle.filename))
if self._need_local_file():
logger.debug("AbstractPlugin.read: need local file {}".format(
file_handle.filename))
f = archive.to_localfile(file_handle)
logger.debug("AbstractPlugin.read: local file {}".format(f))
info, si = self._read_image(f, opts, hdr)
else:
f = archive.open(file_handle, mode='rb')
logger.debug("AbstractPlugin.read: file {}".format(f))
try:
info, si = self._read_image(f, opts, hdr)
except NotImageError:
raise
finally:
f.close()
# info is None when no image was read
if info is not None:
image_list.append((info, si))
if len(image_list) < 1:
raise ValueError('No image data read')
info, si = image_list[0]
self._reduce_shape(si)
logger.debug('AbstractPlugin.read: reduced si {}'.format(si.shape))
shape = (len(image_list),) + si.shape
dtype = si.dtype
logger.debug('AbstractPlugin.read: shape {}'.format(shape))
si = np.zeros(shape, dtype)
i = 0
for info, img in image_list:
if 'input_sort' in opts and opts['input_sort'] == SORT_ON_TAG:
si[:,i] = img
else:
si[i] = img
i += 1
logger.debug('AbstractPlugin.read: si {}'.format(si.shape))
# Simplify shape
self._reduce_shape(si)
logger.debug('AbstractPlugin.read: reduced si {}'.format(si.shape))
_shape = si.shape
logger.debug('AbstractPlugin.read: _shape {}'.format(_shape))
_ndim = len(_shape)
nz = 1
if _ndim > 2:
nz = _shape[-3]
logger.debug('AbstractPlugin.read: slices {}'.format(nz))
logger.debug('AbstractPlugin.read: calling _set_tags')
self._set_tags(image_list, hdr, si)
# logger.debug('AbstractPlugin.read: return _set_tags: {}'.format(hdr))
logger.info("Data shape read: {}".format(shape_to_str(si.shape)))
# Add any DICOM template
if pre_hdr is not None:
hdr.update(pre_hdr)
logger.debug('AbstractPlugin.read: hdr {}'.format(hdr))
return hdr, si
def _need_local_file(self):
"""Do the plugin need access to local files?
Returns:
Boolean
- True: The plugin need access to local filenames
- False: The plugin can access files given by an open file handle
"""
return False
@abstractmethod
def _read_image(self, f, opts, hdr):
"""Read image data from given file handle
Args:
self: format plugin instance
f: file handle or filename (depending on self._need_local_file)
opts: Input options (dict)
hdr: Header dict
Returns:
Tuple of
hdr: Header dict
Return values:
- info: Internal data for the plugin
None if the given file should not be included (e.g. raw file)
si: numpy array (multi-dimensional)
"""
pass
@abstractmethod
def _set_tags(self, image_list, hdr, si):
"""Set header tags.
Args:
self: format plugin instance
image_list: list with (info,img) tuples
hdr: Header dict
si: numpy array (multi-dimensional)
Returns:
hdr: Header dict
"""
pass
[docs]
@abstractmethod
def write_3d_numpy(self, si, destination, opts):
"""Write 3D Series image
Args:
self: format plugin instance
si[slice,rows,columns]: Series array
destination: dict of archive and filenames
opts: Output options (dict)
"""
pass
[docs]
@abstractmethod
def write_4d_numpy(self, si, destination, opts):
"""Write 4D Series image
Args:
self: format plugin instance
si[tag,slice,rows,columns]: Series array
destination: dict of archive and filenames
opts: Output options (dict)
"""
pass
[docs]
def getTimeline(self):
"""Get timeline
Returns:
Timeline in seconds, as numpy array of floats
Delta time is given as seconds. First image is t=0.
Length of array is number of tags.
Raises:
ValueError: tags for dataset is not time tags
"""
if self.input_order == INPUT_ORDER_TIME:
timeline = [0.0]
for t in range(1, len(self.tags[0])):
timeline.append(self.tags[0][t] - self.tags[0][0])
return np.array(timeline)
else:
raise ValueError("No timeline tags are available. Input order: {}".format(
self.input_order))
'''
def getQform(self):
"""Get Nifti version of the transformation matrix, aka qform
Input:
- self.spacing
- self.imagePositions
- self.orientation
Returns:
- transformation matrix as numpy array
"""
def normalize_column(x,row):
val = np.vdot(x, x)
if val > 0:
x = x / math.sqrt(val)
else:
shape = x.shape
x = np.array([0., 0., 0.])
x[row]=1
x.shape = shape
return x
#debug = None
debug = True
ds,dr,dc = self.spacing
z,y,x = self.imagePositions[0]
slices = len(self.imagePositions)
T0 = self.imagePositions[0]
Tn = self.imagePositions[slices-1]
orient = self.orientation
#print("ds,dr,dc={},{},{}".format(ds,dr,dc))
#print("z ,y ,x ={},{},{}".format(z,y,x))
q = np.eye(4)
# Set column 3 and row 3 to zeros, except [3,3]
colr=np.array([[orient[3]], [orient[4]], [orient[5]]])
colc=np.array([[orient[0]], [orient[1]], [orient[2]]])
colr = normalize_column(colr,0)
colc = normalize_column(colc,1)
k=np.cross(colr, colc, axis=0)
q[:3, :3] = np.hstack((colr, colc, k))
if debug:
logger.debug("q")
logger.debug( q)
if debug:
logger.debug("determinant(q) {}".format(np.linalg.det(q)))
if np.linalg.det(q) < 0:
q[:3,2] = -q[:3,2]
# Scale matrix
diagVox = np.eye(3)
diagVox[0,0] = dc
diagVox[1,1] = dr
diagVox[2,2] = ds
if debug:
logger.debug("diagVox")
logger.debug( diagVox)
logger.debug("q without scaling {}".format(q.dtype))
logger.debug( q)
q[:3,:3] = np.dot(q[:3,:3],diagVox)
if debug:
logger.debug("q with scaling {}".format(q.dtype))
logger.debug( q)
# Add translations
q[0,3] = x; q[1,3] = y; q[2,3] = z # pos x,y,z
if debug:
logger.debug("q with translations")
logger.debug( q)
# q now equals dicom_to_patient in spm_dicom_convert
return q
'''
'''
def setQform(self, A):
"""Set transformationMatrix from Nifti affine A."""
#print("setQform: input\n{}".format(A))
M=np.eye(4)
M[:3,0]=A[2::-1,2]
M[:3,1]=A[2::-1,0]
M[:3,2]=A[2::-1,1]
M[:3,3]=A[2::-1,3]
#print("setQform: output\n{}".format(M))
self.transformationMatrix=M
return
'''
[docs]
def getPositionForVoxel(self, r, transformation=None):
"""Get patient position for center of given voxel r
Use Patient Position and Image Orientation to calculate world
coordinates for given voxel
Args:
r: (z,y,x) of voxel in voxel coordinates as numpy.array
transformation: transformation matrix when different from self.transformationMatrix
Returns:
(z,y,x) of voxel in world coordinates (mm) as numpy.array
"""
if transformation is None:
transformation = self.transformationMatrix
# q = self.getTransformationMatrix()
# V = np.array([[r[2]], [r[1]], [r[0]], [1]]) # V is [x,y,z,1]
newpos = np.dot(transformation, np.hstack((r, [1])))
# return np.array([newpos[2,0],newpos[1,0],newpos[0,0]]) # z,y,x
return newpos[:3]
[docs]
def getVoxelForPosition(self, p, transformation=None):
""" Get voxel for given patient position p
Use Patient Position and Image Orientation to calculate world
coordinates for given voxel
Args:
p: (z,y,x) of voxel in world coordinates (mm) as numpy.array
transformation: transformation matrix when different from self.transformationMatrix
Returns:
(z,y,x) of voxel in voxel coordinates as numpy.array
"""
if transformation is None:
transformation = self.transformationMatrix
# q = self.getTransformationMatrix()
# V = np.array([[p[2]], [p[1]], [p[0]], [1]]) # V is [x,y,z,1]
qinv = np.linalg.inv(transformation)
r = np.dot(qinv, np.hstack((p, [1])))
# z,y,x
# return np.array([int(r[2,0]+0.5),int(r[1,0]+0.5),int(r[0,0]+0.5)], dtype=int)
# return int(r+0.5)[:3]
return (r + 0.5).astype(int)[:3]
[docs]
@staticmethod
def replace_geometry_attributes(im, gim):
"""Replace geometry attributes in image with values from gim
"""
im.SliceLocation = gim.SliceLocation
im.ImagePositionPatient = gim.ImagePositionPatient
im.ImageOrientationPatient = gim.ImageOrientationPatient
im.FrameOfReferenceUID = gim.FrameOfReferenceUID
im.PositionReferenceIndicator = gim.PositionReferenceIndicator
im.SliceThickness = gim.SliceThickness
try:
im.SpacingBetweenSlices = gim.SpacingBetweenSlices
except AttributeError:
pass
im.AcquisitionMatrix = gim.AcquisitionMatrix
im.PixelSpacing = gim.PixelSpacing
@staticmethod
def _reduce_shape(si, axes=None):
"""Reduce shape when leading shape(s) are 1.
Will not reduce to less than 2-dimensional image.
Also reduce axes when reducing shape.
Args:
self: format plugin instance
si[...]: Series array
Raises:
ValueError: tags for dataset is not time tags
"""
mindim = 2
while si.ndim > mindim:
if si.shape[0] == 1:
si.shape = si.shape[1:]
if axes is not None:
del axes[0]
else:
break
def _reorder_to_dicom(self, data, flip=False, flipud=False):
"""Reorder data to internal DICOM format.
Swap axes, except for rows and columns.
5D:
data order: data[rows,columns,slices,tags,d5]
DICOM order: si [d5,tags,slices,rows,columns]
4D:
data order: data[rows,columns,slices,tags]
DICOM order: si [tags,slices,rows,columns]
3D:
data order: data[rows,columns,slices]
DICOM order: si [slices,rows,columns]
2D:
data order: data[rows,columns]
DICOM order: si [rows,columns]
flip: Whether rows and columns are swapped.
flipud: Whether matrix is transposed
"""
logger.debug('AbstractPlugin._reorder_to_dicom: shape in {}'.format(
data.shape))
if data.ndim == 5:
rows, columns, slices, tags, d5 = data.shape
if flipud:
rows, columns = columns, rows
si = np.zeros((d5, tags, slices, rows, columns), data.dtype)
for d in range(d5):
for tag in range(tags):
for slice in range(slices):
si[d, tag, slice, :, :] = self._reorder_slice(data[:, :, slice, tag, d],
flip=flip, flipud=flipud)
# if flip:
# si[d,tag,slice,:,:] = \
# (data[:,:,slice,tag,d]).T
# #np.fliplr(data[:,:,slice,tag,d]).T
# else:
# si[d,tag,slice,:,:] = data[:,:,slice,tag,d]
elif data.ndim == 4:
rows, columns, slices, tags = data.shape
if flipud:
rows, columns = columns, rows
si = np.zeros((tags, slices, rows, columns), data.dtype)
for tag in range(tags):
for slice in range(slices):
si[tag, slice, :, :] = self._reorder_slice(data[:, :, slice, tag],
flip=flip, flipud=flipud)
# if flip:
# si[tag,slice,:,:] = (data[:,:,slice,tag]).T
# #si[tag,slice,:,:] = np.fliplr(data[:,:,slice,tag]).T
# else:
# si[tag,slice,:,:] = data[:,:,slice,tag]
elif data.ndim == 3:
rows, columns, slices = data.shape
if flipud:
rows, columns = columns, rows
si = np.zeros((slices, rows, columns), data.dtype)
for slice in range(slices):
si[slice, :, :] = self._reorder_slice(data[:, :, slice], flip=flip, flipud=flipud)
# if flip:
# si[slice,:,:] = (data[:,:,slice]).T
# #si[slice,:,:] = np.fliplr(data[:,:,slice]).T
# else:
# si[slice,:,:] = data[:,:,slice]
elif data.ndim == 2:
rows, columns = data.shape
if flipud:
rows, columns = columns, rows
si = np.zeros((rows, columns), data.dtype)
si[:] = self._reorder_slice(data[:], flip=flip, flipud=flipud)
# if flip:
# si[:] = (data[:]).T
# #si[:] = np.fliplr(data[:]).T
# else:
# si[:] = data[:]
else:
raise ValueError('Dimension %d is not implemented' % data.ndim)
logger.debug('AbstractPlugin._reorder_to_dicom: shape out {}'.format(
si.shape))
return si
def _reorder_from_dicom(self, data, flip=False, flipud=False):
"""Reorder data from internal DICOM format.
Swap axes, except for rows and columns.
5D:
DICOM order: data[d5,tags,slices,rows,columns]
return order: si [rows,columns,slices,tags,d5]
4D:
DICOM order: data[tags,slices,rows,columns]
return order: si [rows,columns,slices,tags]
3D:
DICOM order: data[slices,rows,columns]
return order: si [rows,columns,slices]
2D:
DICOM order: data[rows,columns]
return order: si [rows,columns]
flip: Whether rows and columns are swapped.
flipud: Whether matrix is transposed
"""
logger.debug('AbstractPlugin._reorder_from_dicom: shape in {}'.format(data.shape))
if data.ndim == 5:
d5, tags, slices, rows, columns = data.shape
if flipud:
rows, columns = columns, rows
si = np.zeros((rows, columns, slices, tags, d5), data.dtype)
for d in range(d5):
for tag in range(tags):
for slice in range(slices):
si[:, :, slice, tag, d] = self._reorder_slice(data[d, tag, slice, :, :],
flip=flip, flipud=flipud)
# if flip:
# si[:,:,slice,tag,d] = \
# np.fliplr(data[d,tag,slice,:,:]).T
# else:
# si[:,:,slice,tag,d] = data[d,tag,slice,:,:]
elif data.ndim == 4:
tags, slices, rows, columns = data.shape
if flipud:
rows, columns = columns, rows
si = np.zeros((rows, columns, slices, tags), data.dtype)
for tag in range(tags):
for slice in range(slices):
si[:, :, slice, tag] = self._reorder_slice(data[tag, slice, :, :],
flip=flip, flipud=flipud)
# if flip:
# si[:,:,slice,tag] = np.fliplr(data[tag,slice,:,:]).T
# else:
# si[:,:,slice,tag] = data[tag,slice,:,:]
elif data.ndim == 3:
slices, rows, columns = data.shape
if flipud:
rows, columns = columns, rows
si = np.zeros((rows, columns, slices), data.dtype)
for slice in range(slices):
si[:, :, slice] = self._reorder_slice(data[slice, :, :], flip=flip, flipud=flipud)
# if flip:
# si[:,:,slice] = np.fliplr(data[slice,:,:]).T
# else:
# si[:,:,slice] = data[slice,:,:]
elif data.ndim == 2:
rows, columns = data.shape
if flipud:
rows, columns = columns, rows
si = np.zeros((rows, columns), data.dtype)
si[:, :] = self._reorder_slice(data[:, :], flip=flip, flipud=flipud)
# if flip:
# si[:] = np.fliplr(data[:]).T
# else:
# si[:] = data[:]
else:
raise ValueError('Dimension %d is not implemented' % data.ndim)
logger.debug('AbstractPlugin._reorder_from_dicom: shape out {}'.format(
si.shape))
return si
@staticmethod
def _reorder_slice(data: np.ndarray, flip: bool, flipud: bool) -> np.ndarray:
if flip and flipud:
return np.fliplr(data).T
elif flip:
return np.fliplr(data)
elif flipud:
return data.T
else:
return data