import nibabel as nib
import numpy as np
import pandas as pd
import pickle
import trimesh
import warnings
from nibabel.processing import resample_from_to
from skimage.measure import marching_cubes
from skimage.morphology import (opening, remove_small_holes,
remove_small_objects)
from tqdm import tqdm
from trimesh import smoothing
from .utils import convex_hull_objects, pad_dimensions
[docs]
class QLayers:
"""
This class takes in a mask of a kidney and calculates the depth of each
voxel from the surface of the kidney. The kidney is then divided into
layers of a specified thickness that can be used as regions of interest.
It can then be used to add maps of other quantitative parameters to the
kidney and generate dataframes of these parameters with depth/layer.
Parameters
----------
mask_img : nibabel.nifti1.Nifti1Image
Binary mask of kidney
thickness : float, optional
Default 1
Thickness of layers to use when quantising depth, in millimeters
fill_ml : float, optional
Default 10
Volume of holes in the mask to fill (usually cycst), in cubic
centimeters (ml)
pelvis_dist : float, optional
Default 0
Voxels within `pelvis_dist` of the renal pelvis are excluded from
the resulting depth/layer calculations.
If 0, no pelvis segmentation is performed
space : {"map", "layers"}, optional
Default "map"
If "map", the depth map is resampled to the space of the
depth/layers are resampled to the space of the quantitative map. If
"layers", the quantitative map is resampled to the space of the
layers/depth. "map" gives more accurate quantitative results as the
map is not resampled however "layers" allows for the output of a
wide dataframe where each row contains the depth, layer and all
quantitative measurements for a voxel.
"""
# TODO add verbose option
def __init__(
self, mask_img, thickness=1, fill_ml=10, pelvis_dist=0, space="map"
):
self.mask_img = mask_img
self.mask = mask_img.get_fdata() > 0.5
self.zoom = mask_img.header.get_zooms()
self.affine = mask_img.affine
self.thickness = thickness
self.fill_ml = fill_ml
self.pelvis_dist = pelvis_dist
if space not in ["map", "layers"]:
raise ValueError("space must be 'map' or 'layers'")
self.space = space
self.depth = self._calculate_depth()
self.layers = np.ceil(self.depth * (1 / self.thickness)) / (
1 / self.thickness
)
self.layers_list = np.unique(self.layers)
self.df_long = pd.DataFrame()
self.maps = []
self._tissue_data_ls = None
self._tissue_labels = None
self._tissue_img = None
if self.space == "layers":
self.df_wide = pd.DataFrame(columns=["depth", "layer"])
self.df_wide["depth"] = self.depth[self.mask]
self.df_wide["layer"] = self.layers[self.mask]
[docs]
def add_map(self, map_img, name, norm=False):
"""
Add a quantitative map to the object. The either map or
layers will be resampled to a common space depending on the
`space` parameter of the `QLayers` object.
Parameters
----------
map_img : nibabel.nifti1.Nifti1Image
Quantitative map to be added to the object
name : str
Name of the quantitative map to be added. This name will be used
as a column name in the output dataframe.
norm: bool, optional
Default False
If True, the map will be normalized to have a mean of 0 and a
standard deviation of 1 before being added to the object.
"""
self.maps.append(name)
if map_img.ndim == 2:
map_img = pad_dimensions(map_img)
if self.space == "layers":
# Resample map into space of layers. Doing cval as a big and
# unusual number as cval=np.nan doesn't work
map_img = resample_from_to(map_img, self.mask_img,
cval=2 ** 16 - 2)
map_data = map_img.get_fdata()
map_data[map_data == 2 ** 16 - 2] = np.nan
if norm:
map_data = self._normalise_data(map_data, self.mask)
self.df_wide[name] = map_data[self.mask]
if self._tissue_img is None:
sub_df = pd.DataFrame(
columns=["depth", "layer", "measurement", "value"]
)
else:
sub_df = pd.DataFrame(
columns=[
"depth",
"layer",
"tissue",
"measurement",
"value",
]
)
sub_df["tissue"] = self._tissue_data_ls[self.mask]
sub_df["depth"] = self.depth[self.mask]
sub_df["layer"] = self.layers[self.mask]
sub_df["measurement"] = name
sub_df["value"] = map_data[self.mask]
if self.df_long.empty:
self.df_long = sub_df
else:
self.df_long = pd.concat([self.df_long, sub_df])
if self.space == "map":
# Resample layers into space of map
layers_img = nib.Nifti1Image(self.layers, self.affine)
layers_img_rs = resample_from_to(layers_img, map_img, order=0)
layers_rs = layers_img_rs.get_fdata()
depth_img = nib.Nifti1Image(self.depth, self.affine)
depth_img_rs = resample_from_to(depth_img, map_img)
depth_rs = depth_img_rs.get_fdata()
mask_img_rs = resample_from_to(self.mask_img, map_img, order=0)
mask_rs = mask_img_rs.get_fdata() > 0.5
map_data = map_img.get_fdata()
if norm:
map_data = self._normalise_data(map_data, mask_rs)
if self._tissue_img is None:
sub_df = pd.DataFrame(
columns=["depth", "layer", "measurement", "value"]
)
else:
sub_df = pd.DataFrame(
columns=["depth", "layer", "tissue", "measurement",
"value"]
)
tissue_rs = resample_from_to(
self._tissue_img, map_img, order=0
).get_fdata()
sub_df["tissue"] = tissue_rs[mask_rs]
sub_df["depth"] = depth_rs[mask_rs]
sub_df["layer"] = layers_rs[mask_rs]
sub_df["measurement"] = name
sub_df["value"] = map_data[mask_rs]
self.df_long = pd.concat([self.df_long, sub_df])
[docs]
def add_tissue(self, tissue_img, tissue_labels=None):
"""
Add a tissue segmentation image to the object. This segmentation
should contain integer labels for each tissue type in the kidney
where 0 is background. The labels will be used in the output
dataframe. The tissue segmentation will be resampled to the space of
the layers if the `space` parameter of the `QLayers` object is set
to "layers" or the space of the quantitative maps added if the
`space` parameter is set to "map".
Parameters
----------
tissue_img : nibabel.nifti1.Nifti1Image
Tissue segmentation image to be added to the object
tissue_labels : list of str, optional
Default None
List of text labels for each tissue type in the segmentation
e.g. ["cortex", "medulla"] if 1 in `tissue_img` represents
renal cortex and 2 represents medulla. If None, the integer labels
from `tissue_img` will be used.
"""
if len(self.maps) > 0:
raise ValueError(
"Tissue labels must be added before any quantitative maps."
)
self._tissue_img = tissue_img
self._tissue_labels = tissue_labels
self._tissue_data_ls = resample_from_to(
tissue_img, self.mask_img, order=0, cval=2 ** 16 - 2
).get_fdata()
self._tissue_data_ls[self._tissue_data_ls == 2 ** 16 - 2] = np.nan
self._tissue_data_ls[self._tissue_data_ls == 0] = np.nan
if self._tissue_labels is not None:
if len(np.unique(np.nan_to_num(self._tissue_data_ls))) - 1 != len(
self._tissue_labels
):
raise ValueError(
"Number of tissue labels must equal number of unique "
"non-zero values in tissue image."
)
if self.space == "layers":
self.df_wide["tissue"] = self._tissue_data_ls[self.mask]
[docs]
def get_df(self, format="long"):
"""
Returns a dataframe of all the quantitative maps added to the object
with the depth/layer of each voxel.
Parameters
----------
format : {"wide", "long"}, optional
Default "long"
If "long", the dataframe is returned in a long
format where each row contains the depth, layer and a single
quantitative measurement for a voxel. If "wide", the dataframe is
returned in a wide format where each row contains the depth,
layer and all quantitative measurements for a voxel. This option
is only available if the `space` parameter of the `QLayers`
object is set to "layers".
Returns
-------
pandas.DataFrame
Dataframe of all quantitative maps added to the object with the
depth/layer of each voxel.
"""
if self._tissue_labels is not None:
if len(self._tissue_labels) > 1:
_tissue_labels_type = type(self._tissue_labels[0])
else:
_tissue_labels_type = type(self._tissue_labels)
if len(self.df_long) > 0:
self.df_long = self.df_long.dropna(subset=["tissue"])
self.df_long["tissue"] = self.df_long["tissue"].astype(
_tissue_labels_type
)
for ind, label in zip(
np.sort(self.df_long["tissue"].unique()),
self._tissue_labels
):
self.df_long.loc[
self.df_long["tissue"] == ind, "tissue"
] = label
if self.space == "layers":
self.df_wide = self.df_wide.dropna(subset=["tissue"])
self.df_wide["tissue"] = self.df_wide["tissue"].astype(
_tissue_labels_type
)
for ind, label in zip(
np.sort(self.df_wide["tissue"].unique()),
self._tissue_labels,
):
self.df_wide.loc[
self.df_wide["tissue"] == ind, "tissue"
] = label
if format == "wide":
if self.space == "map":
raise NotImplementedError(
"Data cannot be retrieved in wide "
"format when space is 'map'. This "
"is because each quantitative map "
"is in a different space so each "
"row of a wide dataframe would not "
"come from the same space in the "
"kidney."
)
else:
return self.df_wide.loc[self.df_wide["depth"] > 0]
elif format == "long":
if "tissue" in self.df_long.columns:
self.df_long = self.df_long[
["depth", "layer", "tissue", "measurement", "value"]
]
return self.df_long.loc[self.df_long["depth"] > 0]
else:
raise ValueError("format must be 'wide' or 'long'")
[docs]
def get_depth(self):
"""
Returns distance from each voxel to the surface of the kidney as a
numpy array.
Returns
-------
numpy.ndarray
Distance from each voxel to the surface of the kidney
"""
return self.depth
[docs]
def get_layers(self):
"""
Returns layer of each voxel in the kidney as a numpy array.
Returns
-------
numpy.ndarray
Layer of each voxel
"""
return self.layers
[docs]
def remove_all_maps(self):
"""
Removes all quantitative maps from the object and resets the
dataframe to only contain depth/layer information.
"""
self.maps = []
if self._tissue_img is None:
self.df_long = pd.DataFrame(
columns=["depth", "layer", "measurement", "value"]
)
else:
self.df_long = pd.DataFrame(
columns=["depth", "layer", "tissue", "measurement", "value"]
)
if self.space == "layers":
self.df_wide = pd.DataFrame(columns=["depth", "layer"])
self.df_wide["depth"] = self.depth[self.mask]
self.df_wide["layer"] = self.layers[self.mask]
if self._tissue_img is not None:
self.df_wide["tissue"] = self._tissue_data_ls[self.mask]
[docs]
def save_depth(self, fname):
"""
Saves the depth map as a nifti file.
Parameters
----------
fname : str
Filename to save the depth map to.
"""
depth_img = nib.Nifti1Image(self.depth, self.affine)
nib.save(depth_img, fname)
[docs]
def save_layers(self, fname):
"""
Saves the layers as a nifti file.
Parameters
----------
fname : str
Filename to save the layers to.
"""
layer_img = nib.Nifti1Image(self.layers, self.affine)
nib.save(layer_img, fname)
[docs]
def save_pelvis(self, fname):
"""
Saves the pelvis segmentation as a nifti file.
Parameters
----------
fname : str
Filename to save the pelvis segmentation to.
"""
if not hasattr(self, "pelvis"):
self._segment_pelvis()
pelvis_img = nib.Nifti1Image(self.pelvis.astype(np.int32), self.affine)
nib.save(pelvis_img, fname)
[docs]
def save_surface(self, fname):
"""
Saves the kidney surface as a stl file.
Parameters
----------
fname : str
Filename to save the kidney surface to.
"""
if not hasattr(self, "smooth_mesh"):
self._calculate_depth()
self.smooth_mesh.export(fname)
[docs]
def to_pickle(self, fname):
"""
Saves the QLayers object as a pickle file.
Warning: The ability to import pickle files between Python versions
is not guaranteed, as such this is not recommended for long term
storage of QLayers objects!
Parameters
----------
fname : str
Filename to save the QLayers to.
"""
with open(fname, "wb") as f:
pickle.dump(self, f)
def _calculate_depth(self):
"""
Calculates the distance from each voxel to the surface of the kidney.
Returns
-------
numpy.ndarray
Distance from each voxel to the surface of the kidney
"""
# Fill any holes in the mask with volume less than fill_ml
# (measured in millileters)
fill_vox = int(self.fill_ml / (np.prod(self.zoom) / 1000))
mask_filled = remove_small_holes(self.mask, fill_vox)
# Convert the voxel mask into a mesh using the marching cubes
# algorithm and trimesh
print("Making Mesh")
verts, faces, normals, _ = marching_cubes(
mask_filled.astype(np.uint8),
spacing=self.zoom,
level=0.5,
step_size=1.0,
)
mesh = trimesh.Trimesh(
vertices=verts, faces=faces, vertex_normals=normals
)
# Smooth the resulting mesh
print("Smoothing Mesh")
mesh = smoothing.filter_mut_dif_laplacian(mesh, lamb=1, iterations=50)
self.smooth_mesh = mesh
# Generate a pointcloud of query points
x, y, z = np.meshgrid(
(np.arange(self.mask.shape[0]) * self.zoom[0]),
(np.arange(self.mask.shape[1]) * self.zoom[1]),
(np.arange(self.mask.shape[2]) * self.zoom[2]),
indexing="ij",
)
x, y, z = x.reshape(-1), y.reshape(-1), z.reshape(-1)
points = np.array([x, y, z]).T
# Find the nearest surface to each point inside the kidney
points = points[self.mask.reshape(-1) > 0.5]
batch_size = 10000
batches = np.ceil(len(points) / batch_size).astype(int)
distances = np.zeros(len(points))
i = 0
# print("Calculating Distances")
with tqdm(total=batches, desc="Distance Calculation") as pbar:
while i < len(points):
(closest_points, dist, triangle_id) = mesh.nearest.on_surface(
points[i:i + batch_size]
)
distances[i:i + batch_size] = dist
i += batch_size
pbar.update()
# Write these distances to voxels in the shape of the original image
depth = np.zeros(self.mask.shape)
depth[self.mask > 0.5] = distances
if self.pelvis_dist != 0:
noise_ml = 2.5 # Start with pretty high noise exclusion
self._segment_pelvis(noise_ml=2.5)
# If no pelvis is found, reduce the noise exclusion until a pelvis
# is found or noise_ml is 0
while (np.sum(self.pelvis) == 0) and (noise_ml > 0):
noise_ml -= 0.5
warnings.warn(f"No pelvis found, reducing noise exclusion to {noise_ml} ml")
self._segment_pelvis(noise_ml=noise_ml)
if np.sum(self.pelvis) > 0:
verts_p, faces_p, normals_p, _ = marching_cubes(
self.pelvis.astype(np.uint8),
spacing=self.zoom,
level=0.5,
step_size=1.0,
)
mesh_p = trimesh.Trimesh(
vertices=verts_p, faces=faces_p, vertex_normals=normals_p
)
i = 0
distances_p = np.zeros(len(points))
with tqdm(total=batches,
desc="Pelvis Distance Calculation") as pbar:
while i < len(points):
(
closest_points_p,
dist_p,
triangle_id_p,
) = mesh_p.nearest.on_surface(points[i:i + batch_size])
distances_p[i:i + batch_size] = dist_p
i += batch_size
pbar.update()
depth_p = np.zeros(self.mask.shape)
depth_p[self.mask > 0.5] = distances_p
depth[depth_p < self.pelvis_dist] = 0
else:
warnings.warn("No pelvis found, returning depth without pelvis exclusion.")
return depth
def _segment_pelvis(self, noise_ml=2.5):
"""
Segments the renal pelvis from the kidney mask and saves it as an
attribute of the object.
Parameters
----------
noise_ml : float, optional
Default 2.5
Volume of noise in the mask to remove, in cubic centimeters (ml)
"""
fill_vox = int(self.fill_ml / (np.prod(self.zoom) / 1000))
mask_filled = remove_small_holes(self.mask, fill_vox)
mask_ch = convex_hull_objects(mask_filled)
hulls = (mask_ch ^ mask_filled) & mask_ch
hulls = opening(hulls)
noise_vox = int(noise_ml / (np.prod(self.zoom) / 1000))
self.pelvis = remove_small_objects(hulls, noise_vox)
@staticmethod
def _normalise_data(data, mask=None):
"""
Normalise data to have a mean of 0 and a standard deviation of 1.
Parameters
----------
data : numpy.ndarray
Data to be normalised
Returns
-------
numpy.ndarray
Normalised data
"""
if mask is None:
mask = np.ones(data.shape, dtype=bool)
return (data - np.nanmean(data[mask])) / np.nanstd(data[mask])
[docs]
def load_pickle(fname):
"""
Load a pickle file containing a QLayers object.
Parameters
----------
fname : str
Filename of the pickle file to load.
Returns
-------
QLayers
The QLayers object stored in the pickle file.
"""
with open(fname, "rb") as f:
qlayers = pickle.load(f)
return qlayers