"""
Functions for image annotations.
"""
import os, json, typing as tp
from collections import Counter
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import imc.data_models.roi as _roi
from imc.types import DataFrame, Array, Path
[docs]def label_domains(
rois: tp.Sequence[_roi.ROI],
output_dir: Path,
export: bool = True,
domains: tp.Sequence[str] = ["T", "S", "A", "L", "V", "E"],
**kwargs,
) -> None:
"""
Draw shapes outying topological domains in tissue.
This step is done manually using the `labelme` program.
$ labelme --autosave --labels metadata/labelme_labels.txt
"""
if export:
export_images_for_topological_labeling(rois, output_dir, **kwargs)
labels_f = (output_dir).mkdir() / "labelme_labels.txt"
with open(labels_f, "w") as handle:
handle.write("\n".join(domains))
os.system(f"labelme --autosave --labels {labels_f} {output_dir}")
[docs]def export_images_for_topological_labeling(
rois: tp.Sequence[_roi.ROI],
output_dir: Path,
channels: tp.Sequence[str] = ["mean"],
overwrite: bool = False,
) -> None:
"""
Export PNGs for labeling with `labelme`.
"""
for roi in tqdm(rois):
f = output_dir / roi.name + ".jpg"
if not overwrite and f.exists():
continue
array = roi._get_channels(channels, minmax=True, equalize=True)[1].squeeze()
if array.ndim > 2:
array = np.moveaxis(array, 0, -1)
matplotlib.image.imsave(f, array)
def collect_domains(
input_dir: Path, rois: tp.Sequence[_roi.ROI] = None, output_file: Path = None
) -> tp.Dict[str, tp.Dict]:
if rois is not None:
roi_names = [r.name for r in rois]
filenames = list(input_dir.glob("*.json"))
if rois is not None:
filenames = [f for f in filenames if f.stem in roi_names]
topo_annots = dict()
for filename in tqdm(filenames):
annot_f = filename.replace_(".jpg", ".json")
if not annot_f.exists():
continue
with open(annot_f, "r") as handle:
annot = json.load(handle)
if annot["shapes"]:
topo_annots[filename.stem] = annot["shapes"]
if output_file is not None:
with open(output_file, "w") as handle:
json.dump(topo_annots, handle, indent=4)
return topo_annots
[docs]def illustrate_domains(
topo_annots: tp.Dict[str, tp.Dict],
rois: tp.Sequence[_roi.ROI],
output_dir: Path,
channels: tp.Sequence[str],
domain_exclude: tp.Sequence[str] = None,
cleanup: bool = False,
cmap_str: str = "Set3",
) -> None:
"""
Illustrate annotated topological domains of each ROI.
"""
from imc.utils import polygon_to_mask
from imc.graphics import legend_without_duplicate_labels
from shapely.geometry import Polygon
if domain_exclude is None:
domain_exclude = []
labels = list(set(geom["label"] for n, j in topo_annots.items() for geom in j))
label_color = dict(zip(labels, sns.color_palette(cmap_str)))
label_order = dict(zip(labels, range(1, len(labels) + 1)))
cmap = plt.get_cmap(cmap_str)(range(len(labels) + 1))
cmap[0] = [0, 0, 0, 1]
for roi_name in tqdm(topo_annots):
shapes = topo_annots[roi_name]
roi = [r for r in rois if r.name == roi_name][0]
annot_mask = np.zeros(roi.shape[1:])
for shape in shapes:
if shape["label"] in domain_exclude:
continue
region = polygon_to_mask(shape["points"], roi.shape[1:][::-1])
annot_mask[region > 0] = label_order[shape["label"]]
ar = roi.shape[1] / roi.shape[2]
fig, axes = plt.subplots(
1, 2, figsize=(2 * 4, 4 * ar), gridspec_kw=dict(wspace=0, hspace=0)
)
axes[0].set(title=roi.name)
roi.plot_channels(channels, axes=[axes[0]], merged=True)
shape_types: Counter[str] = Counter()
for shape in shapes:
label: str = shape["label"]
if label in domain_exclude:
continue
shape_types[label] += 1
c = Polygon(shape["points"]).centroid
axes[1].text(
c.x,
c.y,
s=f"{label}{shape_types[label]}",
ha="center",
va="center",
)
axes[0].plot(
*np.asarray(shape["points"] + [shape["points"][0]]).T,
label=label,
color=cmap[label_order[label]],
)
m = annot_mask == 0
annot_mask += 1
annot_mask[m] = 0
axes[1].imshow(
annot_mask,
cmap=cmap_str,
vmin=1,
vmax=len(label_color) + 1,
interpolation="none",
)
axes[1].set(title="Manual annotations")
legend_without_duplicate_labels(axes[0], title="Domain:")
for ax in axes:
ax.axis("off")
fig.savefig(
output_dir / roi.name + ".annotations.pdf",
dpi=300,
bbox_inches="tight",
)
plt.close(fig)
cmd = f"""pdftk
{output_dir}/*.annotations.pdf
cat
output
{output_dir}/topological_domain_annotations.pdf"""
os.system(cmd.replace("\n", " "))
if cleanup:
files = output_dir.glob("*.annotations.pdf")
for file in files:
file.unlink()
[docs]def get_domains_per_cell(
topo_annots: tp.Dict[str, tp.Dict],
rois: tp.Sequence[_roi.ROI],
exclude_domains: tp.Sequence[str] = None,
remaining_domain: tp.Union[str, tp.Dict[str, str]] = "background",
resolution: str = "largest",
) -> DataFrame:
"""
Generate annotation of topological domain each cell is contained in
based on manual annotated masks.
Parameters
----------
topo_annots: dict
Dictionary of annotations for each ROI.
rois: list
List of ROI objects.
exclude_domains: list[str]
Domains to ignore
remaining_domain: str | dict[str, str]
Name of domain to fill in for cells that do not fall under any domain annotation.
If given a string, it will simply use that.
If given a dict, the filled domain will be the value of the key which exists in the image.
E.g. Annotating tumor/stroma domains. If an image has only domains of type 'Tumor',
given `remaining_domain` == {'Tumor': 'Stroma', 'Stroma': 'Tumor'}, the remaining cells
will be annotated with 'Stroma'. In an image annotated only with 'Stroma' domains,
remaining cells will be annotated with 'Tumor' domains.
resolution: str
If `remaining_domain` is a dict, there may be more than one domain present in the image.
A resolution method is thus needed to select which domain will be filled for the remaining cells.
The method 'largest' will choose as key of `remaining_domain` the largest annotated domain class.
The method 'unique' will be strict and only fill in if there is a unique domain.
"""
from imc.utils import polygon_to_mask
if exclude_domains is None:
exclude_domains = []
_full_assigns = list()
for roi_name, shapes in tqdm(topo_annots.items()):
roi = [r for r in rois if r.name == roi_name][0]
mask = roi.mask
cells = np.unique(mask)[1:]
td_count: tp.Counter[str] = Counter()
regions = list()
_assigns = list()
for shape in shapes:
label = shape["label"]
points = shape["points"]
if label in exclude_domains:
continue
td_count[label] += 1
points += [points[0]]
region = polygon_to_mask(points, roi.shape[1:][::-1])
regions.append(region)
assign = (
pd.Series(np.unique(mask[(mask > 0) & region]), name="obj_id")
.to_frame()
.assign(
roi=roi.name,
sample=roi.sample.name,
domain_id=f"{label}{td_count[label]}",
)
)
_assigns.append(assign)
## if remaining_domain explicitely annotated, skip
if isinstance(remaining_domain, str):
if remaining_domain in td_count:
print(
f"ROI '{roi.name}' has been manually annotated"
" with remaining domains."
)
_full_assigns += _assigns
continue
## add a domain for cells not annotated
remain = ~np.asarray(regions).sum(0).astype(bool)
existing = np.sort(pd.concat(_assigns)["obj_id"].unique())
remain = remain & (~np.isin(mask, existing))
if remain.sum() == 0:
_full_assigns += _assigns
continue
if isinstance(remaining_domain, str):
### if given a string just make that the domain for unnanotated cells
domain = remaining_domain
print(f"ROI '{roi.name}' will be annotated with '{domain}' by default.")
elif isinstance(remaining_domain, dict):
### if given a dict, dependent on the existing domains choose what to label the remaining
### useful for when labeling e.g. tumor/stroma and different images may be labeled with only one of them
existing_domains = pd.concat(_assigns)["domain_id"].value_counts()
existing_domains.index = existing_domains.index.str.replace(
r"\d+", "", regex=True
)
repl = set(v for k, v in remaining_domain.items() if k in existing_domains)
if resolution == "largest":
domain = remaining_domain[existing_domains.idxmax()]
elif resolution == "unique":
if len(repl) == 1:
domain = repl.pop()
else:
raise ValueError(
"More than one domain was detected and it is"
" unclear how to annotate the remaining cells "
f"with the mapping: {remaining_domain}"
)
assign = (
pd.Series(np.unique(mask[remain]), name="obj_id")
.drop(0, errors="ignore")
.to_frame()
.assign(
roi=roi.name,
sample=roi.sample.name,
domain_id=domain + "1",
)
)
_assigns.append(assign)
_full_assigns += _assigns
assigns = pd.concat(_full_assigns)
assigns["topological_domain"] = assigns["domain_id"].str.replace(
r"\d", "", regex=True
)
# reduce duplicated annotations but for cells annotated with background, make that the primary annotation
id_cols = ["sample", "roi", "obj_id"]
assigns = (
assigns.groupby(id_cols).apply(
lambda x: x
if (x.shape[0] == 1)
else x.loc[x["topological_domain"] == remaining_domain, :]
if (x["topological_domain"] == remaining_domain).any()
else x
)
# .drop(id_cols, axis=1)
.reset_index(level=-1, drop=True)
).set_index(id_cols)
# make sure there are no cells with more than one domain that is background
tpc = assigns.groupby(id_cols)["domain_id"].nunique()
cells = tpc.index
assert not assigns.loc[cells[tpc > 1]].isin([remaining_domain]).any().any()
assigns = (
assigns.reset_index()
.drop_duplicates(subset=id_cols)
.set_index(id_cols)
.sort_index()
)
# expand domains
for domain in assigns["topological_domain"].unique():
assigns[domain] = assigns["topological_domain"] == domain
return assigns
@tp.overload
def get_domain_areas(
topo_annots: tp.Dict[str, tp.Dict],
rois: tp.Sequence[_roi.ROI],
per_domain: tp.Literal[False],
) -> tp.Dict[Path, float]:
...
@tp.overload
def get_domain_areas(
topo_annots: tp.Dict[str, tp.Dict],
rois: tp.Sequence[_roi.ROI],
per_domain: tp.Literal[True],
) -> DataFrame:
...
[docs]def get_domain_areas(
topo_annots: tp.Dict[str, tp.Dict],
rois: tp.Sequence[_roi.ROI] = None,
per_domain: bool = False,
) -> tp.Union[tp.Dict[Path, float], DataFrame]:
"""
Get area of airways per image in microns.
"""
from shapely.geometry import Polygon
mpp = 1 # scale
if rois is not None:
roi_names = [r.name for r in rois]
topo_annots = {k: v for k, v in topo_annots.items() if k in roi_names}
_areas = list()
for roi_name, shapes in tqdm(topo_annots.items()):
count: tp.Counter[str] = Counter()
for shape in shapes:
label = shape["label"]
count[label] += 1
a = Polygon(shape["points"]).area
_areas.append([roi_name, label + str(count[label]), a * mpp])
areas = (
pd.DataFrame(_areas)
.rename(columns={0: "filename", 1: "domain_domain_obj", 2: "area"})
.set_index("filename")
)
if not per_domain:
areas = areas.groupby("filename")["area"].sum().to_dict()
return areas
def get_domain_masks(
topo_annots: tp.Dict,
rois: tp.Sequence[_roi.ROI],
exclude_domains: tp.Sequence[str] = None,
fill_remaining: str = None,
per_domain: bool = False,
) -> Array:
_x = list()
for roi in rois:
x = get_domain_mask(
topo_annots[roi.name],
roi,
exclude_domains=exclude_domains,
fill_remaining=fill_remaining,
per_domain=per_domain,
)
_x.append(x)
x = np.asarray(_x)
return x
def get_domain_mask(
topo_annot: tp.Dict,
roi: _roi.ROI,
exclude_domains: tp.Sequence[str] = None,
fill_remaining: str = None,
per_domain: bool = False,
) -> Array:
""" """
import tifffile
from imc.utils import polygon_to_mask
if exclude_domains is None:
exclude_domains = []
_, h, w = roi.shape
masks = list()
region_types = list()
region_names = list()
count: tp.Counter[str] = Counter()
for shape in topo_annot:
shape["points"] += [shape["points"][0]]
region = polygon_to_mask(shape["points"], (w, h))
label = shape["label"]
count[label] += 1
masks.append(region)
region_types.append(label)
region_names.append(label + str(count[label]))
for_mask = np.asarray(
[m for l, m in zip(region_types, masks) if l not in exclude_domains]
).sum(0)
if fill_remaining is not None:
masks += [for_mask == 0]
region_types += [fill_remaining]
for_mask[for_mask == 0] = -1
exc_mask = np.asarray(
[m for l, m in zip(region_types, masks) if l in exclude_domains]
).sum(0)
mask: Array = (
((for_mask != 0) & ~(exc_mask != 0))
if isinstance(exc_mask, np.ndarray)
else for_mask
).astype(bool)
if per_domain:
nmask = np.empty_like(mask, dtype="object")
for r, l in zip(masks, region_types):
if l not in exclude_domains:
nmask[mask & r] = l
mask = np.ma.masked_array(nmask, mask=nmask == None)
return mask