#! /usr/bin/env python
"""
Convenience utilities for the package.
"""
import re
import json
import typing as tp
import numpy as np
import pandas as pd
import scipy
import scipy.ndimage as ndi
import matplotlib
import matplotlib.pyplot as plt
import h5py
from anndata import AnnData
import skimage
from skimage.exposure import equalize_hist as eq
import skimage.io
import skimage.measure
from skimage.transform import resize
import tifffile
from tqdm import tqdm
from imctools.io.mcd.mcdparser import McdParser
from imc.types import DataFrame, Series, Array, Path, GenericType
matplotlib.rcParams["svg.fonttype"] = "none"
FIG_KWS = dict(bbox_inches="tight", dpi=300)
MAX_BETWEEN_CELL_DIST = 4
DEFAULT_COMMUNITY_RESOLUTION = 0.005
DEFAULT_SUPERCOMMUNITY_RESOLUTION = 0.5
# DEFAULT_SUPER_COMMUNITY_NUMBER = 12
[docs]def filter_kwargs_by_callable(
kwargs: tp.Dict[str, tp.Any], callabl: tp.Callable, exclude: tp.List[str] = None
) -> tp.Dict[str, tp.Any]:
"""Filter a dictionary keeping only the keys which are part of a function signature."""
from inspect import signature
args = signature(callabl).parameters.keys()
return {k: v for k, v in kwargs.items() if (k in args) and k not in (exclude or [])}
def build_channel_name(
labels: tp.Dict[int, tp.Tuple[str, tp.Optional[str]]],
metals: tp.Dict[int, tp.Tuple[str, tp.Optional[str]]],
) -> Series:
return (
cleanup_channel_names(pd.Series(labels)) + "(" + pd.Series(metals) + ")"
).rename("channel")
[docs]def cleanup_channel_names(series: tp.Union[Series, tp.List]) -> Series:
"""Standardize channel naming using a set of defined rules."""
to_replace = [
("-", ""),
("_", ""),
(" ", ""),
("/", ""),
("INFgamma", "IFNgamma"),
("pHistone", "pH"),
("cmycp67", "cMYCp67"),
]
_series = pd.Series(series) if not isinstance(series, pd.Series) else series
for k, v in to_replace:
_series = _series.str.replace(k, v)
_series = _series.replace("", np.nan).fillna("<EMPTY>")
_series[_series.str.contains(r"\(").values] = (
_series.str.extract(r"(.*)\(", expand=False).dropna().values
)
return _series
def parse_acquisition_metadata(
acquisition_csv: Path,
acquisition_id: tp.Union[str, int] = None,
reference: tp.Union[Path, DataFrame] = None,
filter_full: bool = False,
) -> DataFrame:
acquired = pd.read_csv(acquisition_csv)
acquired = acquired.loc[
~acquired["ChannelLabel"].isin(["X", "Y", "Z"]), :
].drop_duplicates()
if acquisition_id is not None:
acquired = acquired.loc[
acquired["AcquisitionID"].isin([acquisition_id, str(acquisition_id)])
]
acquired = acquired[["ChannelLabel", "ChannelName"]]
else:
acquired = acquired[["AcquisitionID", "ChannelLabel", "ChannelName"]]
# remove parenthesis from metal column
acquired["ChannelName"] = (
acquired["ChannelName"].str.replace("(", "").str.replace(")", "")
)
# clean up the channel name
acquired["ChannelLabel"] = cleanup_channel_names(acquired["ChannelLabel"])
acquired.index = acquired["ChannelLabel"] + "(" + acquired["ChannelName"] + ")"
if reference is None:
return acquired
# Check matches, report missing
_reference = (
pd.read_csv(reference, index_col=0)
if not isinstance(reference, pd.DataFrame)
else reference
)
__c = acquired.index.isin(_reference.index)
if not __c.all():
miss = "\n - ".join(acquired.loc[~__c, "ChannelLabel"])
raise ValueError(
f"Given reference panel '{acquisition_csv}'"
f" is missing the following channels: \n - {miss}"
)
# align and sort by acquisition
joint_panel = acquired.join(reference)
# make sure order of ilastik channels is same as the original panel
# this important in order for the channels to always be the same
# and the ilastik models to be reusable
assert all(
_reference.query("ilastik == True").index
== joint_panel.query("ilastik == True").index
)
if filter_full:
joint_panel = joint_panel.loc[joint_panel["full"].isin([1, "1", True, "TRUE"])]
return joint_panel
def metal_order_to_channel_labels(
metal_csv: Path, channel_metadata: Path, roi_number: tp.Union[str, int]
):
order = (
pd.read_csv(metal_csv, header=None, squeeze=True)
.to_frame(name="ChannelName")
.set_index("ChannelName")
)
# read reference
ref = parse_acquisition_metadata(channel_metadata)
ref = ref.loc[ref["AcquisitionID"].isin([roi_number, str(roi_number)])]
return (
order.join(ref.reset_index().set_index("ChannelName"))["index"]
.reset_index(drop=True)
.rename("channel")
)
def align_channels_by_name(res: DataFrame, channel_axis=0) -> DataFrame:
if channel_axis not in [0, 1]:
raise ValueError("Axis must be one of 0 or 1.")
if res.isnull().any().any():
print("Matrix contains NaN values, likely various pannels.")
if channel_axis == 0:
miss = res.index[res.isnull().any(axis=1)]
else:
miss = res.columns[res.isnull().any(axis=0)]
# if there's an even number of channels with NaNs
if len(miss) % 2 == 0:
ex = miss.str.extract(r"^(.*)\(")[0]
# if all channel *names* come in pairs
if (ex.value_counts() == 2).all():
print("Found matching channel names in different metals, will align.")
# try to match channel swaps
for ch in ex.unique():
original = miss[miss.str.startswith(ch)]
chs = "-".join(original.str.extract(r"^.*\((.*)\)")[0].tolist())
new_ch_name = ch + "(" + chs + ")"
# add joined values
if channel_axis == 0:
res.loc[new_ch_name] = (
res.loc[original].T.stack().reset_index(level=1, drop=True)
)
else:
res.loc[:, new_ch_name] = res.loc[:, original].T.stack().values
# drop original rows
res = res.drop(original, axis=channel_axis)
else:
print(
"Not all channels are in pairs - cannot find out how to relate them."
)
else:
print(
"Found an odd number of channels missing - cannot find out how to relate them."
)
return res
def is_datetime(x: Series) -> bool:
if "datetime" in x.dtype.name:
return True
return False
def is_numeric(x: tp.Union[Series, tp.Any]) -> bool:
if not isinstance(x, pd.Series):
x = pd.Series(x)
if (
x.dtype.name
in [
"float",
"float32",
"float64",
"int",
"int8",
"int16",
"int32",
"int64",
"Int64",
]
or is_datetime(x)
):
return True
if x.dtype.name in ["object", "string", "boolean", "bool"]:
return False
if x.dtype.name == "category":
if len(set(type(i) for i in x)) != 1:
raise ValueError("Series contains mixed types. Cannot transfer to color!")
return is_numeric(x.iloc[0])
raise ValueError(f"Cannot transfer data type '{x.dtype}' to color!")
[docs]def sorted_nicely(iterable: tp.Sequence[GenericType]) -> tp.Sequence[GenericType]:
"""
Sort an iterable in the way that humans expect.
Parameters
----------
l : iterable
tp.Sequence to be sorted
Returns
-------
iterable
Sorted iterable
"""
def convert(text):
return int(text) if text.isdigit() else text
def alphanum_key(key):
return [convert(c) for c in re.split("([0-9]+)", key)]
if isinstance(iterable[0], (int, np.int32, np.int64)):
return np.sort(iterable)
return sorted(iterable, key=alphanum_key)
[docs]def read_image_from_file(file: Path, equalize: bool = False) -> Array:
"""
Read images from a tiff or hdf5 file into a numpy array.
Channels, if existing will be in first array dimension.
If `equalize` is :obj:`True`, convert to float type bounded at [0, 1].
"""
if not file.exists():
raise FileNotFoundError(f"Could not find file: '{file}")
if file.as_posix().endswith((".ome.tiff", ".tiff", ".ome.tif", ".tif")):
arr = tifffile.imread(file)
elif file.endswith(".h5"):
with h5py.File(file, "r") as __f:
arr = np.asarray(__f[list(__f.keys())[0]])
if len(arr.shape) == 3:
if min(arr.shape) == arr.shape[-1]:
arr = np.moveaxis(arr, -1, 0)
if equalize:
arr = eq(arr)
return arr
def write_image_to_file(
arr: Array,
channel_labels: tp.Sequence,
output_prefix: Path,
file_format: str = "png",
) -> None:
if len(arr.shape) != 3:
skimage.io.imsave(output_prefix + "." + "channel_mean" + "." + file_format, arr)
else:
__s = np.multiply(eq(arr.mean(axis=0)), 255).astype(np.uint8)
skimage.io.imsave(output_prefix + "." + "channel_mean" + "." + file_format, __s)
for channel, label in tqdm(enumerate(channel_labels), total=arr.shape[0]):
skimage.io.imsave(
output_prefix + "." + label + "." + file_format,
np.multiply(arr[channel], 256).astype(np.uint8),
)
[docs]def download_file(url: str, output_file: tp.Union[Path, str], chunk_size=1024) -> None:
"""
Download a file and write to disk in chunks (not in memory).
Parameters
----------
url : :obj:`str`
URL to download from.
output_file : :obj:`str`
Path to file as output.
chunk_size : :obj:`int`
Size in bytes of chunk to write to disk at a time.
"""
import shutil
from urllib import request
from contextlib import closing
import requests
if url.startswith("ftp://"):
with closing(request.urlopen(url)) as r:
with open(output_file, "wb") as f:
shutil.copyfileobj(r, f)
else:
response = requests.get(url, stream=True)
with open(output_file, "wb") as outfile:
outfile.writelines(response.iter_content(chunk_size=chunk_size))
[docs]def run_shell_command(cmd: str, dry_run: bool = False, quiet: bool = False) -> int:
"""
Run a system command.
Will detect whether a separate shell is required.
"""
import sys
import subprocess
import textwrap
# in case the command has unix pipes or bash builtins,
# the subprocess call must have its own shell
# this should only occur if cellprofiler is being run uncontainerized
# and needs a command to be called prior such as conda activate, etc
symbol = any(x in cmd for x in ["&", "&&", "|"])
source = cmd.startswith("source")
shell = bool(symbol or source)
if not quiet:
print(
"Running command:\n",
" in shell" if shell else "",
textwrap.dedent(cmd) + "\n",
)
if not dry_run:
if shell:
if not quiet:
print("Running command in shell.")
code = subprocess.call(cmd, shell=shell)
else:
# Allow spaces in file names
c = re.findall(r"\S+", cmd.replace(r"\ ", "__space__").replace("\\\n", ""))
c = [x.replace("__space__", " ") for x in c]
code = subprocess.call(c, shell=shell)
if code != 0:
print(
"Process for command below failed with error:\n'%s'\nTerminating pipeline.\n",
textwrap.dedent(cmd),
)
sys.exit(code)
if not shell:
pass
# usage = resource.getrusage(resource.RUSAGE_SELF)
# print(
# "Maximum used memory so far: {:.2f}Gb".format(
# usage.ru_maxrss / 1e6
# )
# )
return code
[docs]def downcast_int(arr: Array, kind: str = "u") -> Array:
"""
Downcast numpy array of integers dependent
on largest number in array compatible with smaller bit depth.
"""
assert kind in ["u", "i"]
if kind == "u":
assert arr.min() >= 0
m = arr.max()
for i in [8, 16, 32, 64]:
if m <= (2 ** i - 1):
return arr.astype(f"{kind}int{i}")
return arr
[docs]def minmax_scale(x, by_channel=True):
"""
Scale array to 0-1 range.
x: np.ndarray
Array to scale
by_channel: bool
Whether to perform scaling by the smallest dimension (channel).
Defaults to `True`.
"""
if by_channel and (x.ndim == 3):
i = np.argmin(x.shape)
if i != 0:
x = np.moveaxis(x, i, 0)
x = np.asarray([minmax_scale(y) for y in x])
return np.moveaxis(x, 0, i)
with np.errstate(divide="ignore", invalid="ignore"):
return (x - x.min()) / (x.max() - x.min())
[docs]def estimate_noise(i):
"""https://stackoverflow.com/a/25436112/1469535"""
import math
from scipy.signal import convolve2d
h, w = i.shape
m = [[1, -2, 1], [-2, 4, -2], [1, -2, 1]]
sigma = np.sum(np.sum(np.absolute(convolve2d(i, m))))
sigma = sigma * math.sqrt(0.5 * math.pi) / (6 * (w - 2) * (h - 2))
return sigma
[docs]def fractal_dimension(Z, threshold=0.9):
"""https://gist.github.com/viveksck/1110dfca01e4ec2c608515f0d5a5b1d1"""
# Only for 2d image
assert len(Z.shape) == 2
# From https://github.com/rougier/numpy-100 (#87)
def boxcount(Z, k):
S = np.add.reduceat(
np.add.reduceat(Z, np.arange(0, Z.shape[0], k), axis=0),
np.arange(0, Z.shape[1], k),
axis=1,
)
# We count non-empty (0) and non-full boxes (k*k)
return len(np.where((S > 0) & (S < k * k))[0])
# Transform Z into a binary array
Z = Z < threshold
# Minimal dimension of image
p = min(Z.shape)
# Greatest power of 2 less than or equal to p
n = 2 ** np.floor(np.log(p) / np.log(2))
# Extract the exponent
n = int(np.log(n) / np.log(2))
# Build successive box sizes (from 2**n down to 2**1)
sizes = 2 ** np.arange(n, 1, -1)
# Actual box counting with decreasing size
counts = []
for size in sizes:
counts.append(boxcount(Z, size))
# Fit the successive log(sizes) with log (counts)
coeffs = np.polyfit(np.log(sizes), np.log(counts), 1)
return -coeffs[0]
[docs]def lacunarity(image, box_size=30):
"""
From here: https://satsense.readthedocs.io/en/latest/_modules/satsense/features/lacunarity.html
Calculate the lacunarity value over an image.
The calculation is performed following these papers:
Kit, Oleksandr, and Matthias Luedeke. "Automated detection of slum area
change in Hyderabad, India using multitemporal satellite imagery."
ISPRS journal of photogrammetry and remote sensing 83 (2013): 130-137.
Kit, Oleksandr, Matthias Luedeke, and Diana Reckien. "Texture-based
identification of urban slums in Hyderabad, India using remote sensing
data." Applied Geography 32.2 (2012): 660-667.
"""
kernel = np.ones((box_size, box_size))
accumulator = scipy.signal.convolve2d(image, kernel, mode="valid")
mean_sqrd = np.mean(accumulator) ** 2
if mean_sqrd == 0:
return 0.0
return np.var(accumulator) / mean_sqrd + 1
[docs]def get_canny_edge_image(image: Array, mask: tp.Optional[Array], radius=30, sigma=0.5):
"""Compute Canny edge image."""
from skimage.filters.rank import equalize
from skimage.morphology import disk
from skimage.feature import canny
inverse_mask = ~mask
result = equalize(image, selem=disk(radius), mask=inverse_mask)
result = canny(result, sigma=sigma, mask=inverse_mask)
return np.ma.array(result, mask=mask)
def mcd_to_dir(
mcd_file: Path,
pannel_csv: Path = None,
ilastik_output: bool = True,
ilastik_channels: tp.Sequence[str] = None,
ilastik_compartment: str = None,
output_dir: Path = None,
output_format: str = "tiff",
overwrite: bool = False,
compression_level: int = 3,
sample_name: str = None,
partition_panels: bool = False,
filter_full: bool = True,
min_area: int = 10000,
export_stacks: bool = True,
export_panoramas: bool = True,
keep_original_roi_names: bool = False,
allow_empty_rois: bool = True,
only_crops: bool = False,
n_crops: int = 5,
crop_width: int = 500,
crop_height: int = 500,
) -> None:
""""""
# TODO: add optional rotation of images if y > x
def get_dataframe_from_channels(mcd):
return pd.DataFrame(
[mcd.get_acquisition_channels(x) for x in session.acquisition_ids],
index=session.acquisition_ids,
)
def all_channels_equal(mcd):
chs = get_dataframe_from_channels(mcd)
return all(
[(chs[c].value_counts() == mcd.n_acquisitions).all() for c in chs.columns]
)
def get_panel_partitions(mcd):
chs = get_dataframe_from_channels(mcd)
partitions = {k: set(k) for k in chs.drop_duplicates().index}
for p in partitions:
for _, row in chs.iterrows():
print(p, row.name)
if (row == chs.loc[list(partitions[p])[0]]).all():
partitions[p] = partitions[p].union(set([row.name]))
return partitions.values()
def clip_hot_pixels(img, hp_filter_shape=(3, 3), hp_threshold=0.0001):
"""
From https://github.com/BodenmillerGroup/ImcPluginsCP/blob/master/plugins/smoothmultichannel.py#L416
"""
if hp_filter_shape[0] % 2 != 1 or hp_filter_shape[1] % 2 != 1:
raise ValueError("Invalid hot pixel filter shape: %s" % str(hp_filter_shape))
hp_filter_footprint = np.ones(hp_filter_shape)
hp_filter_footprint[int(hp_filter_shape[0] / 2), int(hp_filter_shape[1] / 2)] = 0
max_img = ndi.maximum_filter(img, footprint=hp_filter_footprint, mode="reflect")
hp_mask = img - max_img > hp_threshold
img = img.copy()
img[hp_mask] = max_img[hp_mask]
return img
if partition_panels:
raise NotImplementedError("Partitioning sample per panel is not implemented yet.")
if pannel_csv is None and ilastik_channels is None and ilastik_compartment is None:
raise ValueError(
"One of `pannel_csv`, `ilastik_channels` or `ilastik_compartment` must be given!"
)
#
if (
ilastik_compartment is None
and pannel_csv is not None
and ilastik_channels is None
):
panel = pd.read_csv(pannel_csv, index_col=0)
ilastik_channels = panel.query("ilastik == 1").index.tolist()
H5_YXC_AXISTAG = json.dumps(
{
"axes": [
{
"key": "y",
"typeFlags": 2,
"resolution": 0,
"description": "",
},
{
"key": "x",
"typeFlags": 2,
"resolution": 0,
"description": "",
},
{
"key": "c",
"typeFlags": 1,
"resolution": 0,
"description": "",
},
]
}
)
if output_dir is None:
output_dir = mcd_file.parent / "imc_dir"
output_dir.mkdir(exist_ok=True, parents=True)
# Export panoramas
if not only_crops and export_panoramas:
get_panorama_images(
mcd_file,
output_file_prefix=output_dir / "Panorama",
overwrite=overwrite,
)
# Parse MCD
mcd = McdParser(mcd_file)
session = mcd.session
if sample_name is None:
sample_name = session.name.replace(" ", "_")
for i, ac_id in enumerate(session.acquisition_ids):
print(ac_id, end="\t")
try:
ac = mcd.get_acquisition_data(ac_id)
except Exception as e: # imctools.io.abstractparserbase.AcquisitionError
if allow_empty_rois:
print(e)
continue
raise e
if ac.image_data is None:
continue
if np.multiply(*ac.image_data.shape[1:]) < min_area:
print(
f"\nROI {ac_id} has less than the minimum area: {min_area}. Skipping.\n"
)
continue
# Get output prefix
if keep_original_roi_names:
prefix = Path(session.name.replace(" ", "_") + "_ac")
else:
prefix = Path(sample_name + "-" + str(i + 1).zfill(2))
# Skip if not overwrite
file_ending = "tiff"
if (prefix + "_full." + file_ending).exists() and not overwrite:
print("TIFF images exist and overwrite is set to `False`. Continuing.")
continue
# Filter channels
channel_labels = build_channel_name(ac.channel_labels, ac.channel_names)
if filter_full:
# remove background and empty channels
# TODO: find way to do this more systematically
channel_labels = channel_labels[
~(
channel_labels.str.contains(r"^\d")
| channel_labels.str.contains("<EMPTY>")
| channel_labels.str.contains("_EMPTY_")
)
].reset_index(drop=True)
p = output_dir / "tiffs" / prefix + "_full."
if (not only_crops) and export_stacks:
if (not (p + file_ending).exists()) or overwrite:
# Filter hot pixels
ac._image_data = np.asarray([clip_hot_pixels(x) for x in ac.image_data])
# Save full image
(output_dir / "tiffs").mkdir()
if output_format == "tiff":
ac.save_tiff(
p + file_ending,
names=channel_labels.str.extract(r"\((.*)\)")[0],
compression=compression_level,
)
elif output_format == "ome-tiff":
write_ometiff(
arr=ac._image_data,
labels=channel_labels.tolist(),
output_path=p + file_ending,
compression_level=compression_level,
description="; ".join(
[f"{k}={v}" for k, v in ac.acquisition.metadata.items()]
),
)
# Save channel labels for the stack
if not only_crops and ((overwrite) or not (p + "csv").exists()) and export_stacks:
channel_labels.to_csv(p + "csv")
if not ilastik_output:
continue
# Prepare ilastik data
ilastik_input = output_dir / "tiffs" / prefix + "_ilastik_s2.h5"
if (not ilastik_input.exists()) or overwrite:
if ilastik_compartment is None:
# Get index of ilastik channels
to_exp = channel_labels[channel_labels.isin(ilastik_channels)]
to_exp_ind = [
ac.channel_masses.index(y)
for y in to_exp.str.extract(r".*\(..(\d+)\)")[0]
]
assert to_exp_ind == to_exp.index.tolist()
full = ac.image_data[to_exp_ind]
nchannels = len(to_exp)
else:
# Or nuclear/cytoplasmic
from imc.segmentation import prepare_stack
full = prepare_stack(ac.image_data, channel_labels, ilastik_compartment)
if len(full.shape) == 2:
full = full[np.newaxis, ...]
nchannels = 2 if ilastik_compartment == "both" else 1
# Make input for ilastik training
stack_to_ilastik_h5(full, ilastik_input)
# # random crops
# # # make sure height/width are smaller or equal to acquisition dimensions
if n_crops > 0:
(output_dir / "ilastik").mkdir()
s = tuple(x * 2 for x in full.shape[1:])
if (full.shape[1] < crop_width) or (full.shape[0] < crop_height):
msg = (
"Image is smaller than the requested crop size for ilastik training."
)
print(msg)
continue
for _ in range(n_crops):
x = np.random.choice(range(s[0] - crop_width))
y = np.random.choice(range(s[1] - crop_height))
crop = full[x : (x + crop_width), y : (y + crop_height), :]
assert crop.shape == (crop_width, crop_height, nchannels)
with h5py.File(
output_dir / "ilastik" / prefix
+ f"_ilastik_x{x}_y{y}_w{crop_width}_h{crop_height}.h5",
mode="w",
) as handle:
d = handle.create_dataset("stacked_channels", data=crop)
d.attrs["axistags"] = H5_YXC_AXISTAG
print("") # add a newline to the tabs
mcd.close()
# all_channels_equal(mcd)
# partitions = get_panel_partitions(mcd)
# for partition_id, partition in enumerate(partitions, start=1):
# for ac_id in partition:
# ac = mcd.get_imc_acquisition(ac_id)
# ac.save_image(pjoin(output_dir, f"partition_{partition_id}", ""))
[docs]def write_ometiff(
arr: Array,
labels: tp.Sequence[str],
output_path: tp.Union[Path, str],
compression_level: int = 3,
description: str = None,
**tiff_kwargs,
) -> None:
"""
Write DataArray to a multi-page OME-TIFF file.
Parameters
----------
arr: np.ndarray
Array of dimensions CYX.
output_path: str | pathlib.Path
File to write TIFF file to.
**kwargs:
Additional arguments to tifffile.imwrite.
"""
# TODO: Add to OME XML: ROI number
output_path = Path(output_path)
image_name = output_path.stem.replace("_full", "")
labels = pd.Series(labels).str.replace("<", "_").str.replace(">", "_").tolist()
# Generate standard OME-XML
channels_xml = "".join(
[
f"""
<Channel
ID="Channel:0:{i}"
Name="{channel}"
SamplesPerPixel="1"/>"""
for i, channel in enumerate(labels)
]
)
xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<OME
xmlns="http://www.openmicroscopy.org/Schemas/OME/2016-06"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.openmicroscopy.org/Schemas/OME/2016-06 http://www.openmicroscopy.org/Schemas/OME/2016-06/ome.xsd">
<Image
Name="{image_name}"
ID="Image:0"
Description="{description or ''}">
<Pixels
Type="float"
BigEndian="false"
DimensionOrder="XYZCT"
ID="Pixels:0"
Interleaved="false"
SizeX="{arr.shape[2]}"
SizeY="{arr.shape[1]}"
SizeZ="1"
SizeC="{arr.shape[0]}"
SizeT="1"
PhysicalSizeX="1.0"
PhysicalSizeXUnit="µm"
PhysicalSizeY="1.0"
PhysicalSizeYUnit="µm">
{channels_xml}
<TiffData/>
</Pixels>
</Image>
</OME>
"""
output_path.parent.mkdir()
# Notes:
## resolution: 1 um/px = 25400 px/inch;
## even though DimensionOrder is XYZCT, it is read as CYX.
tifffile.imwrite(
output_path,
data=arr,
description=xml,
contiguous=True,
photometric="minisblack",
resolution=(25400, 25400, "inch"),
metadata={"Channel": {"Name": labels}},
compress=compression_level,
ome=True,
**tiff_kwargs,
)
# To validate the XML:
# # Write to disk:
# with open(output_path.replace_(".tiff", ".xml"), "w") as handle:
# handle.write(xml)
# # Validate:
# $> xmlvalid file.xml
def stack_to_ilastik_h5(stack: Array, output_file: Path = None) -> Array:
H5_YXC_AXISTAG = json.dumps(
{
"axes": [
{
"key": "y",
"typeFlags": 2,
"resolution": 0,
"description": "",
},
{
"key": "x",
"typeFlags": 2,
"resolution": 0,
"description": "",
},
{
"key": "c",
"typeFlags": 1,
"resolution": 0,
"description": "",
},
]
}
)
# Make input for ilastik training
# # zoom 2x
s = tuple(x * 2 for x in stack.shape[1:])
full = np.moveaxis(np.asarray([resize(x, s) for x in stack]), 0, -1)
if output_file is None:
return full
# # Save input for ilastik prediction
with h5py.File(output_file, mode="w") as handle:
d = handle.create_dataset("stacked_channels", data=full)
d.attrs["axistags"] = H5_YXC_AXISTAG
return full
[docs]def stack_to_probabilities(
stack: Array,
channel_labels: Series,
nuclear_channels: tp.Sequence[str] = None,
cytoplasm_channels: tp.Sequence[str] = None,
log: bool = True,
# scale: bool = True,
) -> Array:
"""
Very simple way to go from a channel stack to nuclei, cytoplasm and background probabilities.
"""
from skimage.exposure import equalize_hist as eq
# nuclear_channels = ["DNA", "Histone", "pCREB", "cKIT", "pSTAT3"]
nuclear_channels = nuclear_channels or ["DNA", "Histone"]
_nuclear_channels = channel_labels[
channel_labels.str.contains("|".join(nuclear_channels))
]
if cytoplasm_channels is None:
_cytoplasm_channels = channel_labels[~channel_labels.isin(_nuclear_channels)]
else:
_cytoplasm_channels = channel_labels[
channel_labels.str.contains("|".join(cytoplasm_channels))
]
if log:
stack = np.log1p(stack)
# if scale:
# stack = saturize(stack)
# # mean of nuclear signal
ns = stack[_nuclear_channels.index].mean(0)
# # mean of cytoplasmatic signal
cs = stack[_cytoplasm_channels.index].mean(0)
# # normalize
ns = minmax_scale(ns)
cs = minmax_scale(eq(cs, 256 * 4))
# # convert into probabilities
pn = ns
pb = 1 - minmax_scale(pn + cs)
# pb = (pb - pb.min()) / pb.max()
# pc = minmax_scale(cs - (pn + pb))
pc = 1 - (pn + pb)
rgb = np.asarray([pn, pc, pb])
# pnf = ndi.gaussian_filter(pn, 1)
# pcf = ndi.gaussian_filter(pc, 1)
# pbf = ndi.gaussian_filter(pb, 1)
# rgb = np.asarray([pnf, pcf, pbf])
# return saturize(rgb)
return np.clip(rgb, 0, 1)
# # pp = ndi.zoom(roi.probabilities, (1, 0.5, 0.5))
# # pp = pp / pp.max()
# p = get_input_filename(roi.stack, roi.channel_labels)
# fig, axes = plt.subplots(1, 4, sharex=True, sharey=True)
# # axes[0].imshow(np.moveaxis(pp / pp.max(), 0, -1))
# axes[1].imshow(np.moveaxis(p, 0, -1))
# from skimage.exposure import equalize_hist as eq
# axes[2].imshow(minmax_scale(eq(roi._get_channel("mean")[1])))
# axes[3].imshow(roi.mask)
def save_probabilities(probs: Array, output_tiff: Path):
import tifffile
tifffile.imsave(output_tiff, np.moveaxis((probs * 2 ** 16).astype("uint16"), 0, -1))
[docs]def txt_to_tiff(
txt_file: Path, tiff_file: Path, write_channel_labels: bool = True
) -> None:
"""
Convert a Fluidigm TXT file to a TIFF file.
Parameters
----------
txt_file :
Input text file from Fluidigm.
tiff_file :
Path to output file.
write_channel_labels :
Whether to write a file with labels for the channel names.
"""
df = pd.read_table(txt_file)
df = df.drop(
["Start_push", "End_push", "Pushes_duration", "Z"], axis=1, errors="ignore"
)
df = df.pivot_table(index="X", columns="Y")[df.columns.drop(["X", "Y"])]
chs = df.columns.get_level_values(0).unique()
stack = np.asarray([df[c].values for c in chs])
tifffile.imwrite(tiff_file, stack)
if write_channel_labels:
pd.DataFrame({"channel_label": chs}).to_csv(tiff_file.replace_(".tiff", ".csv"))
[docs]def plot_panoramas_rois(
yaml_spec: Path,
output_prefix: Path,
panorama_image_prefix: tp.Optional[Path] = None,
save_roi_arrays: bool = False,
overwrite: bool = False,
) -> None:
"""
Plot the location of panoramas and ROIs of a IMC sample.
yaml_spec: tp.Union[str, pathlib.Path]
Path to YAML file containing the spec of the acquired sample.
output_prefix: tp.Union[str, pathlib.Path]
Prefix path to output the joint image and arrays if `save_roi_arrays` is `True`.
panorama_image_prefix: tp.Union[str, pathlib.Path]
Prefix of images of panoramas captured by the Hyperion instrument.
save_roi_arrays: bool
Whether to output arrays containing the images captured by the Hyperion instrument
in the locations of the ROIs.
"""
import PIL
import yaml
import imageio
from matplotlib.patches import Rectangle
import seaborn as sns
def get_pano_coords(pan):
x1, y1 = float(pan["SlideX1PosUm"]), float(pan["SlideY1PosUm"])
x3, y3 = float(pan["SlideX3PosUm"]), float(pan["SlideY3PosUm"])
return tuple(map(int, [x1, y1, x3 - x1, y3 - y1]))
def get_roi_coords(roi):
# start positions are in a different unit for some reason
x1, x2 = float(acq["ROIStartXPosUm"]) / 1000, float(acq["ROIEndXPosUm"])
y1, y2 = float(acq["ROIStartYPosUm"]) / 1000, float(acq["ROIEndYPosUm"])
x = min(x1, x2)
y = min(y1, y2)
width = max(x2, x1) - x
height = max(y1, y2) - y
return tuple(map(int, [x, y, width, height]))
if save_roi_arrays:
assert (
panorama_image_prefix is not None
), "If `save_arrays`, provide a `panorama_image_prefix`."
output_file = output_prefix + "joint_slide_panorama_ROIs.png"
if output_file.exists() and (not overwrite):
return
PIL.Image.MAX_IMAGE_PIXELS = None # to be able to read PNG file of any size
spec = yaml.safe_load(yaml_spec.open())
w, h = int(spec["Slide"][0]["WidthUm"]), int(spec["Slide"][0]["HeightUm"])
fkws = dict(bbox_inches="tight")
aspect_ratio = w / h
fig, ax = plt.subplots(figsize=(4 * aspect_ratio, 4))
colors = sns.color_palette("tab20c")
pano_pos = dict()
pano_imgs = dict()
for i, pano in enumerate(spec["Panorama"]):
# # Last panorama is not a real one (ROIs)
if pano["Description"] == "ROIs":
continue
x, y, width, height = get_pano_coords(pano)
pano_pos[pano["ID"]] = (x, y, width, height)
# Try to read panorama image
try:
# In case acquisition was made
if pano["Description"].endswith(".jpg"):
f = panorama_image_prefix + f"{i + 1}.png"
else:
i2 = int(pano["Description"].split("_")[1])
f = panorama_image_prefix + f"{i2}.png"
pano_img = imageio.imread(f)[..., :3]
pano_imgs[pano["ID"]] = pano_img
# print(f"Read image file for panorama '{i + 1}'")
ax.imshow(pano_img, extent=(x, x + width, y, y + height))
except (FileNotFoundError, IndexError, ValueError, TypeError):
# IndexError in case description can't be split in two
# ValueError in case i2 can't be made an int
# TypeError in case 'panorama_image_prefix' is None
print(f"Could not find image file for panorama '{i + 1}'")
# Plot rectangles
rect = Rectangle((x, y), width, height, facecolor="none", edgecolor=colors[i])
ax.add_patch(rect)
ax.text(
(x + width / 2),
(y + height),
s=f"Panorama '{pano['ID']}'",
ha="center",
va="bottom",
color=colors[i],
)
ax.scatter((x + width / 2), (y + height), marker="^", color=colors[i])
slide_area = (
float(spec["Panorama"][0]["SlideX2PosUm"])
- float(spec["Panorama"][0]["SlideX1PosUm"])
) * (
float(spec["Panorama"][0]["SlideY2PosUm"])
- float(spec["Panorama"][0]["SlideY3PosUm"])
)
for j, acq in enumerate(spec["Acquisition"]):
x, y, width, height = get_roi_coords(acq)
# print(acq["ID"], x, y, width, height)
## if too large it's likely not a real ROI
large = (width * height / slide_area) > 0.2
if large and x == 0 and y == 0:
print(f"ROI {acq['ID']} is empty. Skipping.")
continue
# Plot rectangle around ROI
rect = Rectangle(
(x, y), width, height, facecolor="none", edgecolor="black", linestyle="--"
)
ax.add_patch(rect)
ax.text(
x + width / 2, y - height, s=f"ROI '{acq['ID']}'", ha="center", color="black"
)
if not save_roi_arrays:
continue
# Export ROI in panorama image
## Find panorama containing ROI
dists = pd.Series(dtype=float)
for i, pos in pano_pos.items():
d1 = pos[0] - x
d2 = (x + width) - (pos[0] + pos[2])
d3 = pos[1] - y
d4 = (y + height) - (pos[0] + pos[3])
dists[i] = abs(d1) + abs(d2) + abs(d3) + abs(d4)
pano_img = pano_imgs[dists.idxmin()]
px, py, pw, ph = pano_pos[dists.idxmin()]
ry = abs(y - py - pano_img.shape[0])
## Plot ROI within panorama
fig2, ax2 = plt.subplots(figsize=(4, 4))
ax2.imshow(pano_img)
rect = Rectangle(
(x - px, ry),
width,
-height,
facecolor="none",
edgecolor=colors[j],
linestyle="--",
)
ax2.add_patch(rect)
fig2.savefig(
output_prefix + f"panorama_ROI_{j + 1}.png",
dpi=1600,
**fkws,
)
roi_img = pano_img[ry - height : ry, x - px : x - px + width, ...]
## Plot ROI on its own
fig3, ax3 = plt.subplots(figsize=(4, 4))
ax3.imshow(roi_img)
fig3.savefig(output_prefix + f"ROI_{j + 1}.png", dpi=600, **fkws)
# Save as array
np.save(output_prefix + f"ROI_{j + 1}", roi_img, allow_pickle=False)
ax.axis("off")
fig.savefig(output_file, dpi=300, **fkws)
def get_mean_expression_per_cluster(a: AnnData) -> DataFrame:
means = dict()
for cluster in a.obs["cluster"].unique():
means[cluster] = a[a.obs["cluster"] == cluster, :].X.mean(0)
mean_expr = pd.DataFrame(means, index=a.var.index)
mean_expr.columns.name = "cluster"
return mean_expr
@tp.overload
def z_score(x: Array, axis: tp.Union[tp.Literal[0], tp.Literal[1]]) -> Array:
...
@tp.overload
def z_score(x: DataFrame, axis: tp.Union[tp.Literal[0], tp.Literal[1]]) -> DataFrame:
...
[docs]def z_score(
x: tp.Union[Array, DataFrame], axis: tp.Union[tp.Literal[0], tp.Literal[1]] = 0
) -> tp.Union[Array, DataFrame]:
"""
Standardize and center an array or dataframe.
Parameters
----------
x :
A numpy array or pandas DataFrame.
axis :
Axis across which to compute - 0 == rows, 1 == columns.
This effectively calculates a column-wise (0) or row-wise (1) Z-score.
"""
return (x - x.mean(axis=axis)) / x.std(axis=axis)
def double_z_score(x, red_func: str = "mean"):
# doubly Z-scored matrix
mmz0 = (x - x.mean()) / x.std()
_tmp = x.T
mmz1 = (_tmp - _tmp.mean()) / _tmp.std()
group = pd.concat([mmz0, mmz1.T]).groupby(level=0)
return getattr(group, red_func)()
def filter_hot_pixels(img, n_bins=1000):
from sklearn.linear_model import LinearRegression
import statsmodels.api as sm
from jax import grad, vmap
m = img.max()
x = np.linspace(0, m, n_bins, dtype=float)
y = np.asarray([(img > i).sum() for i in x]).astype(float)
filter_ = y > 3
x2 = x[filter_]
y2 = y[filter_]
mod = sm.GLM(y2, np.vstack([x2, np.ones(x2.shape)]).T, family=sm.families.Poisson())
res = mod.fit()
f1 = lambda x: np.e ** (res.params[0] * x + res.params[1])
g1 = vmap(grad(f1))
mod = LinearRegression()
mod.fit(x2.reshape((-1, 1)), np.log2(y2))
f2 = lambda x: 2 ** (mod.coef_[0] * x + mod.intercept_)
g2 = vmap(grad(f2))
f3 = scipy.interpolate.interp1d(x2, y2, fill_value="extrapolate")
# g = vmap(grad(f3))
fig, axes = plt.subplots(1, 2)
axes[0].plot(x, y)
axes[0].plot(x2, y2)
axes[0].plot(x2, f1(x2))
axes[1].plot(x2, -g1(x2))
axes[0].plot(x2, f2(x2))
axes[1].plot(x2, -g2(x2))
axes[0].plot(x2, f3(x2))
# axes[1].plot(x2, -g(x2))
axes[0].set_yscale("log")
root = scipy.optimize.root(f3, m).x
axes[0].axvline(root, linestyle="--", color="grey")
img > root
# def segment(
# arr: Array,
# output_dir: str = None
# ) -> Array:
# import h5py
# import skimage.filters
# from skimage.filters import threshold_local
# nuclei, cyto, backgd = arr
# # make cyto
# nuc_cyto = eq(nuclei + cyto)
# # smooth nuclei
# # # typical diameter
# 5, 30
# # # discard objects outside diameter range
# False
# # # discard touching border
# True
# # # local thresholding
# from centrosome.threshold import get_threshold
# nuc = eq(nuclei)
# # # #
# size_range = (5, 30)
# # # # threshold smoothing scale 0
# # # # threshold correction factor 1.2
# # # # threshold bounds 0.0, 1.0
# lt, gt = get_threshold(
# "Otsu", "Adaptive", nuc,
# threshold_range_min=0, threshold_range_max=1.0,
# threshold_correction_factor=1.2, adaptive_window_size=50)
# binary_image = (nuc >= lt) & (nuc >= gt)
# # # # measure variance and entropy in foreground vs background
# # fill holes inside foreground
# def size_fn(size, is_foreground):
# return size < size_range[1] * size_range[1]
# binary_image = centrosome.cpmorphology.fill_labeled_holes(
# binary_image, size_fn=size_fn
# )
# # label
# labeled_image, object_count = scipy.ndimage.label(
# binary_image, np.ones((3, 3), bool)
# )
# # alternatively with CellProfiler
# from cellprofiler.workspace import Workspace
# from cellprofiler.image import ImageSet, ImageSettp.List
# from cellprofiler.modules.identifyprimaryobjects import IdentifyPrimaryObjects
# w = Workspace(
# pipeline="",
# module=IdentifyPrimaryObjects,
# image_set=ImageSet(0, 0, 0),
# object_set=[],
# measurements=None,
# image_set_list=ImageSettp.List())
# mod = IdentifyPrimaryObjects()
# mod.create_settings()
# mod.x_name.value = "filepath"
@tp.overload
def get_panorama_images(
mcd_file: Path, output_file_prefix: Path, overwrite: bool
) -> None:
...
@tp.overload
def get_panorama_images(
mcd_file: Path, output_file_prefix: None, overwrite: bool
) -> tp.List[Array]:
...
def get_panorama_images(
mcd_file: Path, output_file_prefix: Path = None, overwrite: bool = False
) -> tp.Optional[tp.List[Array]]:
import imageio
byteoffset = 161
mcd = McdParser(mcd_file)
imgs = list()
for slide in mcd.session.metadata["Panorama"]:
start, end = (
int(slide["ImageStartOffset"]),
int(slide["ImageEndOffset"]),
)
img = mcd._get_buffer(start + byteoffset, end + byteoffset)
if len(img) == 0: # empty image
continue
if output_file_prefix is not None:
output_file = output_file_prefix + f"_{slide['ID']}.png"
if overwrite or (not output_file.exists()):
with open(output_file, "wb") as f:
f.write(img)
else:
try:
imgs.append(imageio.imread(img))
except ValueError:
continue
mcd.close()
if output_file_prefix is None:
return imgs
else:
return None
# def draw_roi():
# import matplotlib.patches as patches
# fig, ax = plt.subplots(1, 1)
# metas = dict()
# for roi in session.acquisition_ids:
# ac_meta = mcd.meta.get_acquisition_meta(roi)
# metas[roi] = ac_meta
# sx, sy = float(ac_meta['ROIStartXPosUm']), float(ac_meta['ROIStartYPosUm'])
# ex, ey = float(ac_meta['ROIEndXPosUm']), float(ac_meta['ROIEndYPosUm'])
# width = abs(ex - sx)
# height = abs(ey - sy)
# # ax.imshow(imageio.imread(img))
# ax.add_patch(patches.Rectangle((sx, sy), width, height, color="black"))
# plt.plot((sx, sy), (ex, ey))
def get_distance_to_lumen_border(roi):
p = roi.probabilities
back = p[-1] / 65535
cells = p[:-1].sum(0) / 65535
chs = roi.channel_labels
nuclear_channels = ["DNA", "Histone"]
cyto_channels = chs[~chs.str.contains("|".join(nuclear_channels), case=False)]
roi._get_channels(cyto_channels.index.tolist())[1]
[docs]def polygon_to_mask(
polygon_vertices: tp.Sequence[tp.Sequence[float]],
shape: tp.Tuple[int, int],
including_edges: bool = True,
) -> Array:
"""
Convert a set of vertices to a binary array.
Adapted and extended from: https://stackoverflow.com/a/36759414/1469535.
"""
from shapely.geometry import Polygon, MultiPolygon
from shapely.geometry.collection import GeometryCollection
if including_edges:
# This makes sure edge pixels are also positive
grid = Polygon([(0, 0), (shape[0], 0), (shape[0], shape[1]), (0, shape[1])])
poly = Polygon(polygon_vertices)
if not poly.is_valid:
poly = poly.buffer(0)
inter = grid.intersection(poly)
if isinstance(inter, (MultiPolygon, GeometryCollection)):
return np.asarray([polygon_to_mask(x, shape) for x in inter.geoms]).sum(0) > 0
inter_verts = np.asarray(inter.exterior.coords.xy).T.tolist()
else:
inter_verts = polygon_vertices
x, y = np.meshgrid(np.arange(shape[0]), np.arange(shape[1]))
x, y = x.flatten(), y.flatten()
points = np.vstack((x, y)).T
path = matplotlib.path.Path(inter_verts)
grid = path.contains_points(points, radius=-1)
return grid.reshape((shape[1], shape[0]))
def mask_to_labelme(
labeled_image: Array,
filename: Path,
overwrite: bool = False,
simplify: bool = True,
simplification_threshold: float = 5.0,
) -> None:
import io
import base64
import imageio
from imantics import Mask
from shapely.geometry import Polygon
output_file = filename.replace_(".tif", ".json")
if overwrite or output_file.exists():
return
polygons = Mask(labeled_image).polygons()
shapes = list()
for point in polygons.points:
if not simplify:
poly = np.asarray(point).tolist()
else:
poly = np.asarray(
Polygon(point).simplify(simplification_threshold).exterior.coords.xy
).T.tolist()
shape = {
"label": "A",
"points": poly,
"group_id": None,
"shape_type": "polygon",
"flags": {},
}
shapes.append(shape)
f = io.BytesIO()
imageio.imwrite(f, tifffile.imread(filename), format="PNG")
f.seek(0)
encoded = base64.encodebytes(f.read())
payload = {
"version": "4.5.6",
"flags": {},
"shapes": shapes,
"imagePath": filename.name,
"imageData": encoded.decode("ascii"),
"imageHeight": labeled_image.shape[0],
"imageWidth": labeled_image.shape[1],
}
with open(output_file.as_posix(), "w") as fp:
json.dump(payload, fp, indent=2)