"""
Operations of signal quantification.
"""
from __future__ import annotations
import typing as tp
import numpy as np
import pandas as pd
import parmap
import skimage.measure
from skimage.segmentation import clear_border
from imc.data_models import roi as _roi
from imc.types import DataFrame, Array, Path
from imc.utils import read_image_from_file, minmax_scale
[docs]def quantify_cell_intensity(
stack: tp.Union[Array, Path],
mask: tp.Union[Array, Path],
red_func: str = "mean",
border_objs: bool = False,
equalize: bool = True,
scale: bool = False,
channel_include: Array = None,
channel_exclude: Array = None,
) -> DataFrame:
"""
Measure the intensity of each channel in each cell
Parameters
----------
stack: tp.Union[Array, Path]
Image to quantify.
mask: tp.Union[Array, Path]
Mask to quantify.
red_func: str
Function to reduce pixels to object borders. Defaults to 'mean'.
border_objs: bool
Whether to quantify objects touching image border. Defaults to False.
channel_include: :class:`~np.ndarray`
Boolean array for channels to include.
channel_exclude: :class:`~np.ndarray`
Boolean array for channels to exclude.
"""
from skimage.exposure import equalize_hist as eq
if isinstance(stack, Path):
stack = read_image_from_file(stack)
if isinstance(mask, Path):
mask = read_image_from_file(mask)
if not border_objs:
mask = clear_border(mask)
if equalize:
# stack = np.asarray([eq(x) for x in stack])
_stack = list()
for x in stack:
p = np.percentile(x, 98)
x[x > p] = p
_stack.append(x)
stack = np.asarray(_stack)
if scale:
stack = np.asarray([minmax_scale(x) for x in stack])
cells = np.unique(mask)
# the minus one here is to skip the background "0" label which is also
# ignored by `skimage.measure.regionprops`.
n_cells = len(cells) - 1
n_channels = stack.shape[0]
if channel_include is None:
channel_include = np.asarray([True] * n_channels)
if channel_exclude is None:
channel_exclude = np.asarray([False] * n_channels)
res = np.zeros((n_cells, n_channels), dtype=int if red_func == "sum" else float)
for channel in np.arange(stack.shape[0])[channel_include & ~channel_exclude]:
res[:, channel] = [
getattr(x.intensity_image[x.image], red_func)()
for x in skimage.measure.regionprops(mask, stack[channel])
]
return pd.DataFrame(res, index=cells[1:]).rename_axis(index="obj_id")
def quantify_cell_morphology(
mask: tp.Union[Array, Path],
attributes: tp.Sequence[str] = [
"area",
"perimeter",
"major_axis_length",
# 'minor_axis_length', in some images I get ValueError
# just like https://github.com/scikit-image/scikit-image/issues/2625
# 'orientation',
# orientation should be random for non-optical imaging, so I'm not including it
"eccentricity",
"solidity",
"centroid",
],
border_objs: bool = False,
) -> DataFrame:
if isinstance(mask, Path):
mask = read_image_from_file(mask)
if not border_objs:
mask = clear_border(mask)
return (
pd.DataFrame(
skimage.measure.regionprops_table(mask, properties=attributes),
index=np.unique(mask)[1:],
)
.rename_axis(index="obj_id")
.rename(columns={"centroid-0": "X_centroid", "centroid-1": "Y_centroid"})
)
def _quantify_cell_intensity__roi(roi: _roi.ROI, **kwargs) -> DataFrame:
assignment = dict(roi=roi.name)
if roi.sample is not None:
assignment["sample"] = roi.sample.name
return roi.quantify_cell_intensity(**kwargs).assign(**assignment)
def _quantify_cell_morphology__roi(roi: _roi.ROI, **kwargs) -> DataFrame:
assignment = dict(roi=roi.name)
if roi.sample is not None:
assignment["sample"] = roi.sample.name
return roi.quantify_cell_morphology(**kwargs).assign(**assignment)
def _correlate_channels__roi(roi: _roi.ROI, labels: str = "channel_names") -> DataFrame:
xcorr = np.corrcoef(roi.stack.reshape((roi.channel_number, -1)))
np.fill_diagonal(xcorr, 0)
labs = getattr(roi, labels)
return pd.DataFrame(xcorr, index=labs, columns=labs)
# def _get_adjacency_graph__roi(roi: _roi.ROI, **kwargs) -> DataFrame:
# output_prefix = roi.sample.root_dir / "single_cell" / roi.name
# return get_adjacency_graph(roi.stack, roi.mask, roi.clusters, output_prefix, **kwargs)
[docs]def quantify_cell_intensity_rois(
rois: tp.Sequence[_roi.ROI],
**kwargs,
) -> DataFrame:
"""
Measure the intensity of each channel in each single cell.
"""
return pd.concat(
parmap.map(_quantify_cell_intensity__roi, rois, pm_pbar=True, **kwargs)
).rename_axis(index="obj_id")
[docs]def quantify_cell_morphology_rois(
rois: tp.Sequence[_roi.ROI],
**kwargs,
) -> DataFrame:
"""
Measure the shape parameters of each single cell.
"""
return pd.concat(
parmap.map(_quantify_cell_morphology__roi, rois, pm_pbar=True, **kwargs)
).rename_axis(index="obj_id")
[docs]def quantify_cells_rois(
rois: tp.Sequence[_roi.ROI],
layers: tp.Sequence[str],
intensity: bool = True,
intensity_kwargs: tp.Dict[str, tp.Any] = {},
morphology: bool = True,
morphology_kwargs: tp.Dict[str, tp.Any] = {},
) -> DataFrame:
"""
Measure the intensity of each channel in each single cell.
"""
quants = list()
if intensity:
quants.append(
quantify_cell_intensity_rois(rois=rois, layers=layers, **intensity_kwargs)
)
if morphology:
quants.append(
quantify_cell_morphology_rois(rois=rois, layers=layers, **morphology_kwargs)
)
return (
# todo: this will fail if there's different layers in intensity and morphology
pd.concat(
# ignore because a ROI is not obliged to have a Sample
[quants[0].drop(["sample", "roi"], axis=1, errors="ignore"), quants[1]],
axis=1,
)
if len(quants) > 1
else quants[0]
).rename_axis(index="obj_id")