Source code for exo2micro.utils

"""
utils.py
========
Shared utility functions used across the exo2micro sub-modules.

Includes:
  - Image I/O helpers (TIFF / FITS reading and writing with metadata)
  - Preprocessing (median subtraction, normalisation, padding, trimming)
  - Masking helpers (tissue masks, joint masks, fill holes)
  - Gaussian smoothing with NaN conservation
  - Intensity equalisation for image pairs
  - Display helpers (robust vmax, RGB overlays)
"""

import gc
import os
import sys
import glob
from typing import Optional
import numpy as np
import cv2
from PIL import Image
from scipy import ndimage
from scipy.ndimage import binary_fill_holes, binary_dilation, generate_binary_structure

# Allow very large TIFF files without PIL's decompression-bomb guard.
Image.MAX_IMAGE_PIXELS = None


# ==============================================================================
# FILE I/O
# ==============================================================================

def _is_tiff(filename):
    """Return True if ``filename`` ends with ``.tif`` or ``.tiff`` (case-insensitive).

    Used by :func:`classify_raw_files`, :func:`survey_raw_channels`, and
    :func:`diagnose_raw_layout` so all three apply the same rule. Add new
    extensions or tweak the rule here in one place.
    """
    lower = filename.lower()
    return lower.endswith('.tif') or lower.endswith('.tiff')


[docs] def survey_raw_channels(raw_dir='raw', crop_size=1000): """ Survey all raw TIFF files to report which RGB channels carry signal. Reads a small centre crop from each file to avoid loading full images into memory. Parameters ---------- raw_dir : str Root directory containing sample subdirectories (default 'raw'). crop_size : int Side length of the centre crop to inspect (default 1000). Returns ------- results : list of dict One entry per file with keys: 'path', 'size', 'mode', 'channels'. 'channels' is a dict mapping channel name ('R', 'G', 'B' or 'gray') to {'max': int, 'mean': float, 'nonzero': int}. Notes ----- If ``raw_dir`` is missing, empty, or has TIFFs in the wrong place (e.g. directly in ``raw_dir`` rather than in per-sample subdirectories), this function prints a human-readable layout diagnosis via :func:`diagnose_raw_layout` and returns an empty list. """ # Catch the common layout problems up front and explain them in # plain English. If the layout is fine, this is silent. layout = diagnose_raw_layout(raw_dir) if not layout['ok']: print(layout['message']) return [] # Walk the tree case-insensitively for both .tif and .tiff. The # previous glob ``**/*.tif`` was both case-sensitive on Linux/Mac # and missed .tiff files entirely. files = [] for root, _dirs, names in os.walk(raw_dir): for name in names: if _is_tiff(name): files.append(os.path.join(root, name)) files = sorted(files) if not files: # diagnose_raw_layout already covers the no-files case, but # keep a fallback for the (rare) case where the layout looked # ok but the recursive walk found nothing. print(f" No .tif/.tiff files found under {raw_dir}") return [] results = [] half = crop_size // 2 for f in files: try: im = Image.open(f) except Exception as e: print(f" !! Could not open {f}: {e}") continue w, h = im.size cx, cy = w // 2, h // 2 r0 = max(cy - half, 0) c0 = max(cx - half, 0) r1 = min(cy + half, h) c1 = min(cx + half, w) crop = np.array(im.crop((c0, r0, c1, r1))) entry = { 'path': f, 'size': (w, h), 'mode': im.mode, 'channels': {}, } if crop.ndim == 2: entry['channels']['gray'] = { 'max': int(crop.max()), 'mean': float(crop.mean()), 'nonzero': int(np.count_nonzero(crop)), } else: names = ['R', 'G', 'B'] + [f'ch{i}' for i in range(3, crop.shape[2])] for ch in range(crop.shape[2]): d = crop[:, :, ch] entry['channels'][names[ch]] = { 'max': int(d.max()), 'mean': float(d.mean()), 'nonzero': int(np.count_nonzero(d)), } # Print summary active = [f"{name}(max={info['max']})" for name, info in entry['channels'].items() if info['max'] > 0] print(f" {f}{', '.join(active) if active else 'all zero'}") results.append(entry) return results
def _extract_signal_channel(path): """ Load a TIFF and extract the fluorescence signal as a 2-D uint8 array. For RGB images, auto-detects which channel(s) carry signal by comparing per-channel means on a centre crop. - If exactly one channel has signal: extracts that channel. - If multiple channels have signal: sums them (clipped to uint8) and prints an informative message. - If the image is already grayscale: returns it directly. Parameters ---------- path : str Path to the TIFF file. Returns ------- image : ndarray (2-D, uint8) Extracted fluorescence signal. """ im = Image.open(path) if im.mode in ('L', 'I', 'F'): # Already single-channel return np.array(im) arr = np.array(im) if arr.ndim == 2: return arr n_channels = arr.shape[2] channel_names = ['R', 'G', 'B'] + [f'ch{i}' for i in range(3, n_channels)] # Find channels with meaningful signal (mean > 0.5 in the channel) means = [float(arr[:, :, ch].mean()) for ch in range(n_channels)] active = [(ch, channel_names[ch], means[ch]) for ch in range(n_channels) if means[ch] > 0.5] if len(active) == 0: # Fallback: take the channel with the highest mean best = int(np.argmax(means)) print(f" channel auto-detect: no strong signal, " f"using {channel_names[best]} ({path})") return arr[:, :, best] if len(active) == 1: ch, name, mn = active[0] print(f" channel auto-detect: {name} (mean={mn:.1f}) ({os.path.basename(path)})") return arr[:, :, ch] # Multiple channels have signal — sum them names = [name for _, name, _ in active] print(f" channel auto-detect: signal in {'+'.join(names)}, " f"summing ({os.path.basename(path)})") combined = np.zeros(arr.shape[:2], dtype=np.float64) for ch, _, _ in active: combined += arr[:, :, ch].astype(np.float64) return np.clip(combined, 0, 255).astype(np.uint8)
[docs] def classify_raw_files(sample_dir): """ Classify TIFF files in a sample directory by stain type and dye. Filename rules -------------- A valid raw image filename must: 1. End with ``.tif`` or ``.tiff`` (case-insensitive). 2. Contain ``pre`` or ``post`` (case-insensitive) somewhere in the basename, marking it as a pre-stain or post-stain image. 3. End with ``_<DyeName>.tif`` (or ``.tiff``), where ``<DyeName>`` is the dye identifier and contains **no underscores**. The dye name is the substring between the last underscore and the extension. Examples of valid filenames:: Sample001_PreStain_SybrGld.tif -> pre, dye=SybrGld Sample001_PostStain_SybrGld.tiff -> post, dye=SybrGld my_2024_pre_run3_DAPI.tif -> pre, dye=DAPI whatever_post_Cy5.tiff -> post, dye=Cy5 Examples of *invalid* filenames (will be flagged in ``warnings``): - ``Sample001_PreStain_SybrGld_microbe.tif`` -- dye name contains an underscore. Would be parsed as dye ``microbe``. Rename ``SybrGld_microbe`` to ``SybrGldmicrobe`` or similar. - ``Sample001_pre_post_SybrGld.tif`` -- contains both ``pre`` and ``post``. Ambiguous, skipped. - ``Sample001_SybrGld.tif`` -- contains neither ``pre`` nor ``post``. Cannot be classified, skipped. This function is **non-raising**: it returns whatever it could parse plus a list of human-readable warnings about anything it couldn't. Callers that need to fail hard on missing or duplicate pairs (e.g. :func:`load_image_pair`) should check the returned structures themselves. Parameters ---------- sample_dir : str Path to a single sample's directory. Returns ------- pairs : dict Maps each detected dye name to a dict of candidate file paths:: { 'SybrGld': {'pre': ['/.../...PreStain_SybrGld.tif'], 'post': ['/.../...PostStain_SybrGld.tif']}, 'DAPI': {'pre': ['/.../...Pre_DAPI.tiff'], 'post': ['/.../...Post_DAPI.tiff']}, } Each list may have 0, 1, or many entries. Callers decide what to do about duplicates and missing sides. warnings : list of str Human-readable problem descriptions for individual files that couldn't be classified. One entry per problematic file. """ if not os.path.isdir(sample_dir): return {}, [f"directory not found: {sample_dir}"] candidates = [] for entry in os.listdir(sample_dir): if _is_tiff(entry): candidates.append(os.path.join(sample_dir, entry)) pairs = {} warnings = [] for path in sorted(candidates): basename = os.path.basename(path) # Strip extension (case-insensitive .tif or .tiff) if basename.lower().endswith('.tiff'): stem = basename[:-5] elif basename.lower().endswith('.tif'): stem = basename[:-4] else: continue lower_stem = stem.lower() has_pre = 'pre' in lower_stem has_post = 'post' in lower_stem if has_pre and has_post: warnings.append( f"AMBIGUOUS: {basename} contains both 'pre' and 'post' " f"in the filename -- cannot determine stain type. " f"Rename so only one of these substrings appears.") continue if not has_pre and not has_post: warnings.append( f"NO STAIN MARKER: {basename} contains neither 'pre' nor " f"'post' in the filename. Add 'Pre' or 'Post' to the " f"filename so the loader can classify it.") continue kind = 'pre' if has_pre else 'post' if '_' not in stem: warnings.append( f"NO UNDERSCORE: {basename} has no underscore before the " f"extension. Filenames must end with '_<DyeName>.tif' or " f"'_<DyeName>.tiff', where the dye name contains no " f"underscores.") continue dye = stem.rsplit('_', 1)[1] if not dye: warnings.append( f"EMPTY DYE: {basename} has nothing between the last " f"underscore and the extension. The filename must end " f"with '_<DyeName>.tif' or '_<DyeName>.tiff', where the " f"dye name (e.g. SybrGld, DAPI, Cy5) contains no " f"underscores.") continue pairs.setdefault(dye, {'pre': [], 'post': []})[kind].append(path) return pairs, warnings
[docs] def diagnose_raw_layout(raw_dir='raw'): """ Diagnose the layout of a raw image directory and return a structured report. Catches the common "I don't see any images" failure modes before the pipeline gets a chance to fail confusingly downstream: 1. ``raw_dir`` doesn't exist at all. 2. ``raw_dir`` exists but is empty. 3. ``raw_dir`` contains TIFF files directly (no per-sample folders). This is the most common mistake — users dump all their files in one place instead of separating by sample. 4. ``raw_dir`` contains subdirectories but none of them contain any TIFF files. When the layout looks correct, returns ``ok=True`` and a short informational summary. When something is wrong, returns ``ok=False`` and a multi-line ``message`` explaining what's wrong and how the directory should be structured. Parameters ---------- raw_dir : str Path to the raw image directory (default ``'raw'``). Returns ------- report : dict Keys: - ``ok`` (bool): True if the layout looks usable. - ``message`` (str): Human-readable multi-line message. Empty string when ``ok=True`` and there's nothing to report. - ``raw_dir`` (str): The directory that was inspected. - ``exists`` (bool): Whether ``raw_dir`` itself exists. - ``subdirs`` (list of str): Subdirectory names found (sorted). - ``loose_tiffs`` (list of str): TIFF filenames found directly in ``raw_dir`` (sorted). Non-empty implies the layout is wrong even if ``subdirs`` is also non-empty. - ``empty_subdirs`` (list of str): Subdirectory names that contain no TIFF files (sorted). Informational only. """ report = { 'ok': False, 'message': '', 'raw_dir': raw_dir, 'exists': False, 'subdirs': [], 'loose_tiffs': [], 'empty_subdirs': [], } # The "canonical layout" string used in several error messages. # Reuses the rules from classify_raw_files; if either changes, # update both together. layout_hint = ( "Expected layout:\n" " {raw_dir}/\n" " Sample001/\n" " Sample001_PreStain_SybrGld.tif\n" " Sample001_PostStain_SybrGld.tif\n" " Sample002/\n" " Sample002_PreStain_DAPI.tif\n" " Sample002_PostStain_DAPI.tif\n\n" "Filename rules:\n" " 1. Each sample must live in its OWN subdirectory under {raw_dir}.\n" " 2. Files end with .tif or .tiff (case-insensitive).\n" " 3. Filename contains 'pre' or 'post' (case-insensitive)\n" " somewhere in the basename.\n" " 4. Filename ends with '_<DyeName>.tif' (or .tiff). The dye\n" " name MUST NOT contain underscores. Use 'SybrGld', 'DAPI',\n" " 'Cy5', etc. — not 'SybrGld_microbe'.\n" " 5. Each sample directory contains exactly one pre-stain and\n" " one post-stain file per dye." ).format(raw_dir=raw_dir) # Case 1: raw_dir doesn't exist if not os.path.exists(raw_dir): report['message'] = ( f"Raw image directory not found: '{raw_dir}'\n" f" -> exo2micro looked for this directory in your current " f"working directory and didn't find it.\n" f" -> Either create it and put your sample folders inside, " f"or point the GUI / SampleDye / run_batch at the directory " f"where your raw images actually live (raw_dir='...').\n\n" f"{layout_hint}" ) return report if not os.path.isdir(raw_dir): report['message'] = ( f"'{raw_dir}' exists but is not a directory.\n" f" -> exo2micro needs raw_dir to be a folder containing " f"per-sample subfolders.\n\n" f"{layout_hint}" ) return report report['exists'] = True # Inventory the immediate children of raw_dir entries = [e for e in os.listdir(raw_dir) if not e.startswith('.')] loose_tiffs = sorted([e for e in entries if _is_tiff(e)]) subdirs = sorted([ e for e in entries if os.path.isdir(os.path.join(raw_dir, e)) ]) report['subdirs'] = subdirs report['loose_tiffs'] = loose_tiffs # Case 2: empty raw_dir if not entries: report['message'] = ( f"Raw image directory is empty: '{raw_dir}'\n" f" -> exo2micro needs this directory to contain one folder " f"per sample, with paired pre-stain and post-stain TIFFs " f"inside each folder.\n\n" f"{layout_hint}" ) return report # Case 3: TIFFs are directly in raw_dir (no per-sample folders) if loose_tiffs and not subdirs: # Show up to 5 example filenames so the user can recognize their # files and not be confused about which directory is meant. examples = loose_tiffs[:5] more = (f"\n ... and {len(loose_tiffs) - 5} more" if len(loose_tiffs) > 5 else "") report['message'] = ( f"Found {len(loose_tiffs)} TIFF file(s) directly inside " f"'{raw_dir}', but no per-sample subfolders.\n" f" -> exo2micro requires each sample to live in its own " f"subdirectory under '{raw_dir}'. The current layout puts " f"all images in one folder, which exo2micro doesn't know " f"how to interpret.\n" f" -> Found these files:\n" f" " + "\n ".join(examples) + more + "\n" f" -> Fix: create a folder per sample (e.g. '{raw_dir}/Sample001/'),\n" f" and move each sample's pre/post files into its folder.\n\n" f"{layout_hint}" ) return report # Mixed case: loose TIFFs AND subdirectories. Probably a mistake. if loose_tiffs and subdirs: examples = loose_tiffs[:5] more = (f" (and {len(loose_tiffs) - 5} more)" if len(loose_tiffs) > 5 else "") report['message'] = ( f"Found {len(loose_tiffs)} TIFF file(s) directly inside " f"'{raw_dir}' (alongside {len(subdirs)} subdirectory(ies)).\n" f" -> exo2micro only reads images from per-sample " f"subdirectories. The loose files at the top level will be " f"ignored:\n" f" " + ", ".join(examples) + more + "\n" f" -> If those files belong to one of the existing samples, " f"move them into the appropriate subfolder. If they're a new " f"sample, create a folder for them.\n\n" f"{layout_hint}" ) # This is recoverable — the existing subdirs may still process — # so we report it as a warning, not a fatal error. report['ok'] = True # Check whether the subdirs themselves have any usable images # before claiming the layout is OK. # (fall through to the subdir-check block below) # Case 4: subdirectories exist but none contain TIFFs empty_subdirs = [] nonempty_subdirs = [] for sub in subdirs: sub_path = os.path.join(raw_dir, sub) try: sub_entries = os.listdir(sub_path) except OSError: empty_subdirs.append(sub) continue has_tiff = any(_is_tiff(e) for e in sub_entries) if has_tiff: nonempty_subdirs.append(sub) else: empty_subdirs.append(sub) report['empty_subdirs'] = empty_subdirs if subdirs and not nonempty_subdirs: report['message'] = ( f"Found {len(subdirs)} sample subdirectory(ies) under " f"'{raw_dir}', but none of them contain any TIFF files.\n" f" -> exo2micro found these folders:\n" f" " + ", ".join(subdirs[:10]) + ("..." if len(subdirs) > 10 else "") + "\n" f" -> Each one should contain at least one pre-stain TIFF " f"and one post-stain TIFF.\n\n" f"{layout_hint}" ) report['ok'] = False return report # All good (possibly with the mixed-loose-and-subdirs warning above) if not report['message']: # Pure-success path: build an informational summary, leave ok=True. summary_parts = [ f"raw_dir='{raw_dir}': {len(nonempty_subdirs)} " f"sample folder(s) with TIFFs" ] if empty_subdirs: summary_parts.append( f", {len(empty_subdirs)} empty folder(s) ignored") report['message'] = ''.join(summary_parts) report['ok'] = True return report
[docs] def discover_tasks(samples, dyes, raw_dir='raw'): """ Resolve a (samples, dyes) request into the actual list of tasks present on disk. Given a list of sample names and a list of dye names the user wants to process, walk each sample directory and return: - ``present``: list of ``(sample, dye)`` tuples that have both a pre-stain and a post-stain file. These are runnable. - ``skipped``: list of ``(sample, dye, reason)`` tuples that were requested but can't run, with a short human-readable reason. - ``warnings``: list of ``(sample, warning_str)`` tuples for filename problems encountered along the way (ambiguous, no underscore, etc. — same wording as :func:`classify_raw_files` returns). This is the canonical "what tasks should we actually run?" helper. Both the batch processor (:func:`exo2micro.parallel.build_task_list`) and the GUI use it so they share one source of truth. Parameters ---------- samples : list of str Sample names requested by the user. dyes : list of str Dye names requested by the user. raw_dir : str Root directory containing per-sample subdirectories (default ``'raw'``). Returns ------- result : dict Keys: ``present`` (list of (sample, dye)), ``skipped`` (list of (sample, dye, reason)), ``warnings`` (list of (sample, warning_str)), and ``layout_ok`` (bool — False if :func:`diagnose_raw_layout` flagged a fatal layout problem; in that case ``present`` and ``skipped`` will both be empty and a single layout warning is added to ``warnings``). """ result = { 'present': [], 'skipped': [], 'warnings': [], 'layout_ok': True, } # Top-level layout check first. If raw_dir is missing or shaped # wrong, there's no point even trying to resolve individual pairs. layout = diagnose_raw_layout(raw_dir) if not layout['ok']: result['layout_ok'] = False result['warnings'].append(('(layout)', layout['message'])) return result for sample in samples: sample_dir = os.path.join(raw_dir, sample) if not os.path.isdir(sample_dir): for dye in dyes: result['skipped'].append( (sample, dye, f"sample directory '{sample_dir}' does not exist")) continue pairs, file_warnings = classify_raw_files(sample_dir) for w in file_warnings: result['warnings'].append((sample, w)) for dye in dyes: if dye not in pairs: available = sorted(pairs.keys()) if available: reason = (f"no '{dye}' files in this sample " f"(dyes present: {', '.join(available)})") else: reason = (f"no validly-named raw files in " f"'{sample_dir}'") result['skipped'].append((sample, dye, reason)) continue pre_n = len(pairs[dye]['pre']) post_n = len(pairs[dye]['post']) if pre_n == 0 or post_n == 0: missing_side = 'pre-stain' if pre_n == 0 else 'post-stain' reason = (f"incomplete pair — {missing_side} file is " f"missing (found {pre_n} pre, {post_n} post)") result['skipped'].append((sample, dye, reason)) continue if pre_n > 1 or post_n > 1: reason = (f"duplicate files — found {pre_n} pre and " f"{post_n} post for this dye, expected 1+1") result['skipped'].append((sample, dye, reason)) continue result['present'].append((sample, dye)) return result
[docs] def estimate_pipeline_output_size(sample_dye_pairs, raw_dir='raw', pad=2000, save_all_intermediates=False, n_scale_methods=1, checkpoint_format='tiff'): """ Estimate the on-disk footprint of a pipeline run. Returns a best-effort estimate of how much disk space the pipeline will consume if run on the given (sample, dye) combinations with the given parameters. Used by the GUI to pre-warn users when the estimate would exceed available disk space. The estimate is based on the raw TIFF dimensions: exo2micro pads each raw image by ``pad`` pixels on every side, converts to float32 (4 bytes per pixel), and saves intermediates at each pipeline stage. Approximate breakdown per (sample, dye): - Stage 1: padded post + padded pre (2 files, float32) - Stage 2: ICP-aligned pre (1 file); +coarse-aligned pre if ``save_all_intermediates=True`` - Stage 3: interior-aligned pre (1 file) - Stage 4: difference image (``n_scale_methods`` files, one per active scale method: Moffat-only = 1, Moffat+percentile = 2, Moffat+manual = 2, all three = 3) Each intermediate can be written as TIFF, FITS, or both depending on ``checkpoint_format``. TIFF-only and FITS-only runs use roughly half the disk space of ``'both'``. Diagnostic PNG plots add a small fixed overhead (~10 MB per (sample, dye) regardless). Parameters ---------- sample_dye_pairs : list of tuple List of ``(sample, dye)`` combinations to estimate. raw_dir : str Root raw image directory (default ``'raw'``). pad : int Padding value (default ``2000``). save_all_intermediates : bool If True, adds the stage-2 coarse intermediate to the estimate. n_scale_methods : int How many difference images stage 4 will produce (1-3). checkpoint_format : {'tiff', 'fits', 'both'} Which file format(s) each checkpoint gets written as. TIFF and FITS are roughly the same size on disk; ``'both'`` doubles the per-file footprint. Returns ------- estimate : dict ``{'bytes_per_task': [list], 'total_bytes': int, 'n_tasks': int, 'n_resolvable': int, 'warnings': [list]}`` """ PNG_OVERHEAD_BYTES = 10 * 1024 * 1024 # ~10 MB of diagnostic PNGs per task n_stage4_tiffs = max(1, n_scale_methods) # Total full-res image files produced per task (in one format): # stage 1: post + pre = 2 # stage 2: icp_aligned_pre = 1 (+ 1 if intermediates) # stage 3: interior_aligned_pre = 1 # stage 4: difference (per scale method) = n_stage4_tiffs n_fullres_single_format = 2 + 1 + 1 + n_stage4_tiffs if save_all_intermediates: n_fullres_single_format += 1 # If the user is saving both formats, each of those files is # written twice (TIFF and FITS copies). Otherwise it's written # once in the chosen format. format_multiplier = 2 if checkpoint_format == 'both' else 1 n_fullres = n_fullres_single_format * format_multiplier per_task = [] warnings = [] for sample, dye in sample_dye_pairs: sample_dir = os.path.join(raw_dir, sample) pairs, _ = classify_raw_files(sample_dir) if dye not in pairs or not pairs[dye]['pre'] or not pairs[dye]['post']: warnings.append( f"{sample} / {dye}: cannot resolve raw files for estimate") per_task.append(0) continue # Read raw file size of the post-stain file and infer full # dimensions. We could open it with tifffile to get shape, # but that's slow on networked drives; the raw file size is # close enough since uncompressed TIFFs dominate here. raw_path = pairs[dye]['post'][0] try: raw_bytes = os.path.getsize(raw_path) except OSError as e: warnings.append(f"{sample} / {dye}: cannot stat {raw_path}: {e}") per_task.append(0) continue # Raw TIFF is typically uint8 × 3 channels = 3 bytes/px. # Full-res float32 single channel = 4 bytes/px. # So per-pixel ratio of output to raw input = 4/3. # Then multiply by the padding inflation: # input area = H * W # padded area ~ (H + 2*pad) * (W + 2*pad) # For H, W in the ~25000 px range and pad=2000, this is # roughly 1.18×. We use a fixed 1.2× inflation since we # don't know exact dims without opening the file. PADDING_INFLATION = 1.2 CHANNEL_RATIO = 4.0 / 3.0 output_bytes_per_image = int( raw_bytes * CHANNEL_RATIO * PADDING_INFLATION) task_bytes = n_fullres * output_bytes_per_image + PNG_OVERHEAD_BYTES per_task.append(task_bytes) total = sum(per_task) n_resolvable = sum(1 for b in per_task if b > 0) return { 'bytes_per_task': per_task, 'total_bytes': total, 'n_tasks': len(sample_dye_pairs), 'n_resolvable': n_resolvable, 'warnings': warnings, }
[docs] def get_free_disk_space(path): """Return free disk space at ``path`` in bytes.""" try: stat = os.statvfs(path) return stat.f_bavail * stat.f_frsize except (OSError, AttributeError): # Fallback for Windows or if path doesn't exist try: import shutil return shutil.disk_usage(path).free except Exception: return None
[docs] def format_bytes(n): """Format a byte count as a human-readable string.""" if n is None: return '?' for unit in ('B', 'KB', 'MB', 'GB', 'TB'): if n < 1024: return f'{n:.1f} {unit}' if unit != 'B' else f'{int(n)} {unit}' n /= 1024 return f'{n:.1f} PB'
# ────────────────────────────────────────────────────────────────────────── # Persistent run log # ────────────────────────────────────────────────────────────────────────── # # The GUI's ipywidgets Output widget is ephemeral — its content lives # only in the Python kernel and disappears when the kernel restarts or # the notebook is closed. For users who want to reopen a notebook later # and see what a previous run produced, we also append every line to a # persistent log file in the output directory. RUN_LOG_FILENAME = '.exo2micro_run_log.txt'
[docs] def get_run_log_path(output_dir): """Return the path to the persistent run log file. The log lives at ``{output_dir}/.exo2micro_run_log.txt``. The leading dot keeps it out of casual file listings since it's mostly for recovery/debugging, not regular browsing. """ return os.path.join(output_dir, RUN_LOG_FILENAME)
[docs] def append_to_run_log(output_dir, message): """Append a line to the persistent run log. Failures (e.g. permission errors, missing directory) are silently ignored — the log is a best-effort persistence aid, not a critical path. """ try: os.makedirs(output_dir, exist_ok=True) path = get_run_log_path(output_dir) with open(path, 'a', encoding='utf-8') as f: f.write(message) if not message.endswith('\n'): f.write('\n') except Exception: pass
[docs] def read_run_log_tail(output_dir, max_lines=500): """Read the tail of the persistent run log. Parameters ---------- output_dir : str Directory containing ``.exo2micro_run_log.txt``. max_lines : int Maximum number of lines to return (most recent). Reading the whole file into memory is fine for typical log sizes (~megabytes), but we cap it defensively. Returns ------- text : str or None The last ``max_lines`` lines of the file, joined into a single string, or None if the file doesn't exist. """ path = get_run_log_path(output_dir) if not os.path.exists(path): return None try: with open(path, 'r', encoding='utf-8', errors='replace') as f: lines = f.readlines() if len(lines) > max_lines: lines = lines[-max_lines:] return ''.join(lines) except Exception as e: return f'(error reading log: {e})'
[docs] def clear_run_log(output_dir): """Delete the persistent run log file, if it exists.""" path = get_run_log_path(output_dir) try: if os.path.exists(path): os.remove(path) except Exception: pass
[docs] class TeeStdout: """File-like object that writes to both stdout and a log file. Used as a context manager during pipeline runs to capture every line of pipeline output (including text from inside library functions that use ``print()`` directly) into the persistent run log, without disturbing the normal stdout flow that the GUI's :class:`widgets.Output` context manager captures. Usage:: with TeeStdout(log_path): with widget_output: run.run() # all prints go to widget AND log file Failures writing to the file are silently swallowed; the underlying stdout writes always succeed. """ def __init__(self, log_path): self.log_path = log_path self._original_stdout = None self._fh = None def __enter__(self): self._original_stdout = sys.stdout try: os.makedirs(os.path.dirname(self.log_path) or '.', exist_ok=True) self._fh = open(self.log_path, 'a', encoding='utf-8', buffering=1) # line-buffered except Exception: self._fh = None sys.stdout = self return self def __exit__(self, *args): sys.stdout = self._original_stdout if self._fh is not None: try: self._fh.close() except Exception: pass self._fh = None
[docs] def write(self, data): # Always write to the original stdout first so widget capture # works exactly as before. try: self._original_stdout.write(data) except Exception: pass # Best-effort write to the log file. if self._fh is not None: try: self._fh.write(data) except Exception: pass
[docs] def flush(self): try: self._original_stdout.flush() except Exception: pass if self._fh is not None: try: self._fh.flush() except Exception: pass
[docs] def load_image_pair(sample, dye, raw_dir='raw'): """ Load a pre-stain and post-stain image pair for a given sample and dye. Automatically detects which RGB channel carries the fluorescence signal and extracts it at full 8-bit precision, rather than using :meth:`PIL.Image.convert` which loses ~41% of the dynamic range. Filename convention ------------------- Each sample directory must contain exactly one pre-stain file and exactly one post-stain file per dye, named so that: 1. The filename ends with ``.tif`` or ``.tiff`` (case-insensitive). 2. The basename contains ``pre`` or ``post`` (case-insensitive) to mark the stain type. 3. The basename ends with ``_<dye>.<ext>``, where ``<dye>`` matches the ``dye`` argument and contains no underscores. See :func:`classify_raw_files` for full details and examples. Behaviour on problems --------------------- This function is strict: it raises rather than returning placeholder values when anything goes wrong. The exception message is multi-line and tells the user exactly what to fix. - **Missing sample directory** -> :class:`FileNotFoundError` - **No file matches the requested dye** -> :class:`FileNotFoundError` - **Only one side of the pair found** -> :class:`FileNotFoundError` - **Multiple pre-stain or post-stain files for the same dye** -> :class:`ValueError` When other dyes in the same directory are misnamed (ambiguous, no underscore, etc.), warnings about them are printed but do not block loading the requested dye. Parameters ---------- sample : str Sample name, e.g. ``'CD070'``. Must match the name of a subdirectory under ``raw_dir``. dye : str Dye name, e.g. ``'SybrGld'`` or ``'DAPI'``. Must match the substring after the last underscore in the raw filenames. raw_dir : str Base directory containing sample subdirectories (default ``'raw'``). Returns ------- post_im : ndarray Post-stain image as a 2-D numpy array. pre_im : ndarray Pre-stain image as a 2-D numpy array. post_path : str Path to the post-stain file. pre_path : str Path to the pre-stain file. Raises ------ FileNotFoundError If the sample directory is missing, the requested dye has no matching files, or only one side of the pair exists. ValueError If the requested dye matches more than one pre-stain or post-stain file in the directory. """ sample_dir = os.path.join(raw_dir, sample) if not os.path.isdir(sample_dir): raise FileNotFoundError( f"Sample directory not found: {sample_dir}\n" f" -> expected '{sample}' to be a subdirectory of '{raw_dir}'") pairs, warnings = classify_raw_files(sample_dir) # Surface any per-file warnings (these affect other files in the # same directory but don't necessarily block the requested dye). if warnings: print(f" ({len(warnings)} filename problem(s) in {sample_dir}:)") for w in warnings: print(f" !! {w}") if dye not in pairs: available = sorted(pairs.keys()) msg = (f"No raw files matching dye '{dye}' in {sample_dir}\n" f" -> looked for files containing 'pre' or 'post' and " f"ending with '_{dye}.tif' or '_{dye}.tiff'") if available: msg += f"\n -> dyes detected in this directory: {available}" else: msg += f"\n -> no validly-named raw files in this directory" if '_' in dye: # Most common typo: user passed 'SybrGld_microbe' as the dye # name when the actual file is named with just 'SybrGld' as # the dye and 'microbe' is a leftover descriptor. Dye names # must not contain underscores. msg += (f"\n -> note: dye names must not contain underscores. " f"The dye is the substring AFTER the last underscore " f"in the filename. If your files are named like " f"'..._{dye}.tif', the dye here would be " f"'{dye.rsplit('_', 1)[1]}'. Either rename the files " f"or request the dye as '{dye.rsplit('_', 1)[1]}'.") raise FileNotFoundError(msg) pre_files = pairs[dye]['pre'] post_files = pairs[dye]['post'] if not pre_files or not post_files: msg = f"Incomplete pair for {sample} / {dye} in {sample_dir}\n" if not pre_files: msg += (f" -> no file containing 'pre' ends with " f"'_{dye}.tif' or '_{dye}.tiff'\n") else: msg += f" -> pre-stain found: {os.path.basename(pre_files[0])}\n" if not post_files: msg += (f" -> no file containing 'post' ends with " f"'_{dye}.tif' or '_{dye}.tiff'\n") else: msg += f" -> post-stain found: {os.path.basename(post_files[0])}\n" msg += (f" -> rename or add the missing file so the directory " f"has exactly one pre-stain and one post-stain image " f"for this dye") raise FileNotFoundError(msg) if len(pre_files) > 1 or len(post_files) > 1: msg = (f"Duplicate pair for {sample} / {dye} in {sample_dir}\n" f" -> expected exactly one pre-stain and one post-stain " f"file for this dye, but found:\n") if len(pre_files) > 1: msg += f" -> pre-stain candidates ({len(pre_files)}):\n" for p in pre_files: msg += f" {os.path.basename(p)}\n" if len(post_files) > 1: msg += f" -> post-stain candidates ({len(post_files)}):\n" for p in post_files: msg += f" {os.path.basename(p)}\n" msg += (f" -> rename or remove the extras so each sample " f"directory contains exactly one pre-stain and one " f"post-stain image per dye") raise ValueError(msg) pre_path = pre_files[0] post_path = post_files[0] post_im = _extract_signal_channel(post_path) pre_im = _extract_signal_channel(pre_path) print(f" Loaded post-stain : {post_path} shape={post_im.shape} " f"range=[{post_im.min()}, {post_im.max()}]") print(f" Loaded pre-stain : {pre_path} shape={pre_im.shape} " f"range=[{pre_im.min()}, {pre_im.max()}]") return post_im, pre_im, post_path, pre_path
[docs] def save_checkpoint(image, filepath, sample='', dye='', stage='', params=None, extra_headers=None): """ Save an intermediate image as both TIFF and FITS, with metadata. The TIFF is saved in the 'tiff/' subdirectory and the FITS in the 'fits/' subdirectory of the same parent. Parameters ---------- image : ndarray 2D image array to save. filepath : str Base filepath WITHOUT extension, e.g. 'processed/CD070/SybrGld_microbe/01_padded_post'. The function appends .tiff and .fits and places them in the appropriate subdirectories. sample : str Sample name for FITS header. dye : str Dye name for FITS header. stage : str Pipeline stage name for FITS header. params : dict or None Non-default parameters to record in FITS header. extra_headers : dict or None Additional FITS header keywords (e.g., warp matrix elements). """ from astropy.io import fits from datetime import datetime # Determine base directory and filename parent = os.path.dirname(filepath) basename = os.path.basename(filepath) # Construct tiff and fits directories # filepath is expected to be like: .../sample/dye/images/01_padded_post_suffix # We need to route to .../sample/dye/tiff/ and .../sample/dye/fits/ # Go up from images/ to the sample/dye level sample_dye_dir = os.path.dirname(parent) if os.path.basename(parent) == 'images' else parent tiff_dir = os.path.join(sample_dye_dir, 'tiff') fits_dir = os.path.join(sample_dye_dir, 'fits') os.makedirs(tiff_dir, exist_ok=True) os.makedirs(fits_dir, exist_ok=True) tiff_path = os.path.join(tiff_dir, basename + '.tiff') fits_path = os.path.join(fits_dir, basename + '.fits') # Save TIFF — preserve full precision if image.dtype == np.float32 or image.dtype == np.float64: # Use PIL for float images via intermediate conversion # For full float precision, save as 32-bit TIFF import tifffile tifffile.imwrite(tiff_path, image.astype(np.float32)) else: tifffile_save(image, tiff_path) # Save FITS with metadata hdu = fits.PrimaryHDU(image.astype(np.float32)) header = hdu.header header['SAMPLE'] = (sample, 'Sample name') header['DYE'] = (dye, 'Dye/channel name') header['STAGE'] = (stage, 'Pipeline stage') header['CREATED'] = (datetime.now().isoformat(), 'File creation timestamp') if params: for key, value in params.items(): # FITS keywords are max 8 chars; use abbreviation if available from .defaults import ABBREVIATIONS fits_key = ABBREVIATIONS.get(key, key[:8]).upper() if value is None: header[fits_key] = ('None', key) elif isinstance(value, bool): header[fits_key] = (value, key) else: header[fits_key] = (value, key) if extra_headers: for key, value in extra_headers.items(): header[key[:8].upper()] = value hdu.writeto(fits_path, overwrite=True) print(f" Saved checkpoint: {tiff_path}") print(f" Saved checkpoint: {fits_path}")
[docs] def tifffile_save(image, path): """Save image as TIFF using tifffile for full-precision support.""" import tifffile tifffile.imwrite(path, image.astype(np.float32))
[docs] def load_checkpoint(filepath): """ Load a checkpoint image from TIFF. Parameters ---------- filepath : str Base filepath WITHOUT extension (same as passed to save_checkpoint). Returns ------- image : ndarray or None The loaded image, or None if not found. """ parent = os.path.dirname(filepath) basename = os.path.basename(filepath) sample_dye_dir = os.path.dirname(parent) if os.path.basename(parent) == 'images' else parent tiff_dir = os.path.join(sample_dye_dir, 'tiff') tiff_path = os.path.join(tiff_dir, basename + '.tiff') if os.path.exists(tiff_path): import tifffile image = tifffile.imread(tiff_path) print(f" Loaded checkpoint: {tiff_path} shape={image.shape}") return image return None
[docs] def checkpoint_exists(filepath): """ Check whether a checkpoint file exists. Parameters ---------- filepath : str Base filepath WITHOUT extension. Returns ------- bool """ parent = os.path.dirname(filepath) basename = os.path.basename(filepath) sample_dye_dir = os.path.dirname(parent) if os.path.basename(parent) == 'images' else parent tiff_dir = os.path.join(sample_dye_dir, 'tiff') tiff_path = os.path.join(tiff_dir, basename + '.tiff') return os.path.exists(tiff_path)
[docs] def tiff_to_fits(tiff_file, return_data=False): """ Convert a three-channel RGB TIFF file to a FITS file. Each colour channel is stored as a named image extension (RED1, GREEN2, BLUE3). Parameters ---------- tiff_file : str Path to the source TIFF file. return_data : bool If True, also return the raw TIFF array (default False). Returns ------- fits_filename : str Path to the generated FITS file. tiff_data : ndarray Raw uint8 array of shape (H, W, 3); only if return_data=True. """ from astropy.io import fits tiff_image = Image.open(tiff_file) tiff_data = np.array(tiff_image) red_channel = tiff_data[:, :, 0] green_channel = tiff_data[:, :, 1] blue_channel = tiff_data[:, :, 2] hdu_list = fits.HDUList([ fits.PrimaryHDU(), fits.ImageHDU(red_channel, name='RED1'), fits.ImageHDU(green_channel, name='GREEN2'), fits.ImageHDU(blue_channel, name='BLUE3'), ]) fits_filename = os.path.splitext(tiff_file)[0] + ".fits" hdu_list.writeto(fits_filename, overwrite=True) print(f"tiff_to_fits: wrote {fits_filename}") if return_data: return fits_filename, tiff_data return fits_filename
# ============================================================================== # IMAGE PREPROCESSING # ==============================================================================
[docs] def subtract_median(image, region=(0, 5000, 0, 5000)): """ Subtract the median background level estimated from a rectangular region. Parameters ---------- image : ndarray 2D image array. region : tuple of 4 ints (row_min, row_max, col_min, col_max) region for background estimation. Returns ------- ndarray Background-subtracted image. """ r0, r1, c0, c1 = region # Clamp region to image bounds r1 = min(r1, image.shape[0]) c1 = min(c1, image.shape[1]) bgnd_level = np.nanmedian(image[r0:r1, c0:c1]) print(f"subtract_median: background level = {bgnd_level:.4f}") return image - bgnd_level
[docs] def normalize_image(image, norm_percentile=None): """ Normalize an image to its maximum or to a specified percentile value. Parameters ---------- image : ndarray 2D image array. norm_percentile : float or None If None, normalize by the image maximum. Otherwise normalize by this percentile value. Returns ------- ndarray Normalized image. """ if norm_percentile is None: return image / np.nanmax(image) return image / np.nanpercentile(image, norm_percentile)
[docs] def pad_images(post_im, pre_im, pad=50): """ Pad two images with zeros onto a common canvas plus a border. The extra border gives the registration algorithm room to shift the pre-stain image without it falling off the canvas edge. Parameters ---------- post_im : ndarray Post-stain image (2D). pre_im : ndarray Pre-stain image (2D). pad : int Number of zero-padding pixels on each side (default 50). Returns ------- post_im_pad : ndarray Zero-padded post-stain image. pre_im_pad : ndarray Zero-padded pre-stain image on the same canvas. """ max_rows = max(post_im.shape[0], pre_im.shape[0]) max_cols = max(post_im.shape[1], pre_im.shape[1]) canvas_shape = (max_rows + pad * 2, max_cols + pad * 2) post_im_pad = np.zeros(canvas_shape, dtype=post_im.dtype) pre_im_pad = np.zeros(canvas_shape, dtype=pre_im.dtype) post_im_pad[pad:post_im.shape[0] + pad, pad:post_im.shape[1] + pad] = post_im pre_im_pad[pad:pre_im.shape[0] + pad, pad:pre_im.shape[1] + pad] = pre_im return post_im_pad, pre_im_pad
[docs] def trim_to_signal(post_im, pre_im, threshold=0): """ Trim both images to the bounding box of their combined nonzero signal. Discards large empty margins before padding and registration. This is critical when images have significant zero-padded borders, because those empty regions confuse phase correlation and ECC. Parameters ---------- post_im : ndarray Post-stain image (2D). pre_im : ndarray Pre-stain image (2D). threshold : float Pixel values <= this are treated as empty background (default 0). Returns ------- post_trimmed : ndarray pre_trimmed : ndarray bbox : tuple (row_min, row_max, col_min, col_max) bounding box applied. """ combined_rows = np.any(post_im > threshold, axis=1) | \ np.any(pre_im > threshold, axis=1) combined_cols = np.any(post_im > threshold, axis=0) | \ np.any(pre_im > threshold, axis=0) if not np.any(combined_rows) or not np.any(combined_cols): print("trim_to_signal: no signal found — returning originals") return post_im, pre_im, (0, post_im.shape[0], 0, post_im.shape[1]) row_min = np.argmax(combined_rows) row_max = len(combined_rows) - np.argmax(combined_rows[::-1]) col_min = np.argmax(combined_cols) col_max = len(combined_cols) - np.argmax(combined_cols[::-1]) bbox = (row_min, row_max, col_min, col_max) print(f"trim_to_signal: rows {row_min}:{row_max}, cols {col_min}:{col_max}") return (post_im[row_min:row_max, col_min:col_max], pre_im[row_min:row_max, col_min:col_max], bbox)
# ============================================================================== # GAUSSIAN SMOOTHING # ==============================================================================
[docs] def filter_nan_gaussian_conserving(arr, sigma): """ Apply a Gaussian smooth to an array that may contain NaNs, conserving total intensity. NaN positions remain NaN in the output. Parameters ---------- arr : ndarray Input 2D array, may contain NaNs. sigma : float Gaussian smoothing sigma in pixels. Returns ------- ndarray Smoothed array with NaNs preserved. """ nan_msk = np.isnan(arr) loss = np.zeros(arr.shape) loss[nan_msk] = 1 loss = ndimage.gaussian_filter(loss, sigma=sigma, mode='constant', cval=1) gauss = arr.astype(float) gauss[nan_msk] = 0 gauss = ndimage.gaussian_filter(gauss, sigma=sigma, mode='constant', cval=0) gauss[nan_msk] = np.nan gauss += loss * arr return gauss
# ============================================================================== # INTENSITY EQUALISATION # ==============================================================================
[docs] def equalize_pair(post, pre): """ Intensity-equalize a pair of images for registration. Histogram-matches the pre-stain image's intensity distribution to the post-stain's, then jointly normalises both to [0, 1] using the shared 99th percentile of post-stain nonzero pixels. Parameters ---------- post : ndarray Post-stain image as float32 (2D). pre : ndarray Pre-stain image as float32 (2D). Returns ------- post_eq : ndarray Post-stain normalised to [0, 1]. pre_eq : ndarray Pre-stain histogram-matched and normalised. """ post_px = post[post > 0].ravel() pre_px = pre[pre > 0].ravel() if len(post_px) == 0 or len(pre_px) == 0: post_eq = post / (post.max() + 1e-8) pre_eq = pre / (pre.max() + 1e-8) return post_eq, pre_eq n_bins = 256 post_hist, post_edges = np.histogram(post_px, bins=n_bins) pre_hist, pre_edges = np.histogram(pre_px, bins=n_bins) post_cdf = np.cumsum(post_hist).astype(np.float64) pre_cdf = np.cumsum(pre_hist).astype(np.float64) post_cdf /= post_cdf[-1] pre_cdf /= pre_cdf[-1] post_bin_centres = (post_edges[:-1] + post_edges[1:]) / 2.0 pre_bin_idx = np.searchsorted(pre_edges[1:], pre.ravel(), side='left') pre_bin_idx = np.clip(pre_bin_idx, 0, n_bins - 1) pre_cdf_vals = pre_cdf[pre_bin_idx] post_matched_idx = np.searchsorted(post_cdf, pre_cdf_vals, side='left') post_matched_idx = np.clip(post_matched_idx, 0, n_bins - 1) pre_matched = post_bin_centres[post_matched_idx].reshape(pre.shape).astype(np.float32) pre_matched[pre == 0] = 0.0 post_99 = np.percentile(post_px, 99) if post_99 < 1e-8: post_99 = 1.0 post_eq = np.clip(post / post_99, 0.0, 1.0) pre_eq = np.clip(pre_matched / post_99, 0.0, 1.0) return post_eq, pre_eq
# ============================================================================== # MASKING # ==============================================================================
[docs] def build_tissue_mask(post_im, pre_im, signal_threshold=0, dilation_iters=50): """ Build a joint tissue mask from post-stain and pre-stain images. Each image is independently thresholded, dilated, and hole-filled, then the two masks are intersected. Parameters ---------- post_im : ndarray Post-stain image (2D). pre_im : ndarray Pre-stain aligned image (2D). signal_threshold : float Pixels <= this are excluded (default 0). dilation_iters : int Morphological dilation iterations (default 50). Returns ------- joint_mask : ndarray of bool Intersection of post and pre tissue masks. """ struct = generate_binary_structure(2, 1) post_mask = binary_fill_holes(binary_dilation( post_im > signal_threshold, structure=struct, iterations=dilation_iters)) pre_mask = binary_fill_holes(binary_dilation( pre_im > signal_threshold, structure=struct, iterations=dilation_iters)) joint_mask = post_mask & pre_mask joint_fraction = np.mean(joint_mask) print(f" Joint mask coverage: {joint_fraction:.3f} " f"({'OK' if joint_fraction > 0.05 else '!! LOW'})") return joint_mask
[docs] def build_clean_tissue_mask(post, pre): """ Build a clean tissue mask using binary_fill_holes only (no dilation). This is the mask used for residual histograms and as the base for the signal-only fitting mask. Parameters ---------- post : ndarray Post-stain image (float). pre : ndarray Pre-stain aligned image (float). Returns ------- tissue_mask : ndarray of bool """ return binary_fill_holes(post > 0) & binary_fill_holes(pre > 0)
# ============================================================================== # DISPLAY HELPERS # ==============================================================================
[docs] def robust_vmax(im, n_mad=5): """ Compute a display vmax robust to bright outliers. Uses median + n_mad * MAD over nonzero pixels. Parameters ---------- im : ndarray 2D image array. n_mad : float Number of median absolute deviations above the median (default 5). Returns ------- float Robust display maximum. """ px = im[im > 0] if len(px) == 0: return 1.0 med = np.median(px) mad = np.median(np.abs(px - med)) return med + n_mad * mad
[docs] def make_rgb_overlay(post, pre, post_edges=None, pre_edges=None): """ Build a 3-channel RGB overlay for alignment assessment. Post-stain in Red, pre-stain in Green. Overlap appears yellow. Optional boundary edges drawn in cyan (post) and magenta (pre). Parameters ---------- post : ndarray Post-stain image (float32, 2D). pre : ndarray Pre-stain image (float32, 2D). post_edges : ndarray or None Post-stain boundary ring. pre_edges : ndarray or None Pre-stain boundary ring. Returns ------- rgb : ndarray uint8 array of shape (H, W, 3). """ def _to_uint8(im): px = im[im > 0] if len(px) == 0: return np.zeros_like(im, dtype=np.uint8) vmax = np.percentile(px, 99) if vmax < 1e-8: return np.zeros_like(im, dtype=np.uint8) return np.clip(im / vmax * 255, 0, 255).astype(np.uint8) r = _to_uint8(post) g = _to_uint8(pre) b = np.zeros_like(r) rgb = np.stack([r, g, b], axis=-1) if post_edges is not None: rgb[post_edges > 0] = [0, 255, 255] # cyan if pre_edges is not None: rgb[pre_edges > 0] = [255, 0, 255] # magenta return rgb
[docs] def estimate_gauss_sigma(im, down_scale, sparse_threshold=0.1, sparse_sigma=5, dense_sigma=0): """ Estimate an appropriate Gaussian pre-smoothing sigma for ECC registration based on image density. Parameters ---------- im : ndarray Full-resolution image (2D). down_scale : float Downsample factor that will be applied before ECC. sparse_threshold : float Nonzero pixel fraction below which the image is sparse (default 0.1). sparse_sigma : float Sigma for sparse images at downsampled resolution (default 5). dense_sigma : float Sigma for dense images; 0 disables smoothing (default 0). Returns ------- float Recommended gauss_sigma value. """ nonzero_fraction = np.mean(im > 0) if nonzero_fraction < sparse_threshold: sigma = sparse_sigma print(f"estimate_gauss_sigma: sparse (nonzero={nonzero_fraction:.3f}) " f"— using sigma={sigma}") else: sigma = dense_sigma print(f"estimate_gauss_sigma: dense (nonzero={nonzero_fraction:.3f}) " f"— using sigma={sigma}") return sigma
# ============================================================================== # MEMORY DIAGNOSTICS # ============================================================================== def _get_psutil(): """Import psutil lazily so it's an optional dependency.""" try: import psutil return psutil except ImportError: return None
[docs] class MemoryTracker: """ Track resident-set-size (RSS) across pipeline tasks. Use this to confirm whether memory is actually being released between tasks in a batch. If RSS climbs monotonically across tasks, there is a leak somewhere (matplotlib figures, Jupyter ``Out[]`` history, retained widget state, unfreed numpy temporaries). If RSS returns to baseline after the explicit ``gc.collect()`` in each task footer, then per-task peak is just exceeding available RAM and the answer is reducing ``pad``, using a smaller working resolution, or switching to subprocess-per-task mode (see :func:`exo2micro.parallel.run_batch_subprocess`). When ``enabled=False`` (the default) all methods are cheap no-ops, so leaving the calls in production code costs essentially nothing. Requires ``psutil`` to actually do anything. If psutil is missing the tracker prints a one-time warning and no-ops. Example ------- >>> from exo2micro.utils import MemoryTracker >>> tracker = MemoryTracker(enabled=True) >>> tracker.snapshot('start') >>> for sample, dye in tasks: ... tracker.snapshot(f'before {sample}/{dye}') ... SampleDye(sample, dye).run() ... tracker.collect_and_snapshot(f'after gc {sample}/{dye}') >>> tracker.summary() """ def __init__(self, enabled=False): self.enabled = enabled self._psutil = _get_psutil() if enabled else None self._process = (self._psutil.Process(os.getpid()) if self._psutil else None) self._history = [] # list of (label, rss_gb) if enabled and self._psutil is None: print("[MemoryTracker] psutil not installed; tracking disabled. " "`pip install psutil` to enable.") def _rss_gb(self): if self._process is None: return None return self._process.memory_info().rss / 1e9
[docs] def snapshot(self, label): """Record current RSS with a label and print it.""" if not self.enabled or self._process is None: return rss = self._rss_gb() self._history.append((label, rss)) print(f"[mem] {rss:6.2f} GB {label}")
[docs] def collect_and_snapshot(self, label): """Run ``gc.collect()`` twice then snapshot. Use between tasks.""" if not self.enabled or self._process is None: return # Two passes — the first frees cycles, the second sweeps anything # that finalizers freed during the first. gc.collect() gc.collect() self.snapshot(label)
[docs] def summary(self): """Print a summary table at end of batch.""" if not self.enabled or not self._history: return baseline = self._history[0][1] peak = max(rss for _, rss in self._history) final = self._history[-1][1] print("\n[mem] === memory summary ===") print(f"[mem] baseline: {baseline:6.2f} GB") print(f"[mem] peak: {peak:6.2f} GB (+{peak - baseline:.2f} GB)") print(f"[mem] final: {final:6.2f} GB (+{final - baseline:.2f} GB)") if final - baseline > 0.5: print("[mem] WARNING: final RSS is >0.5 GB above baseline. " "This suggests a leak rather than just high peak usage. " "Consider switching to subprocess-per-task mode " "(exo2micro.parallel.run_batch_subprocess).") print("[mem] ======================\n")
# ============================================================================== # PRE-FLIGHT RESOURCE ESTIMATION (RAM + DISK) # ============================================================================== # # The goal of these helpers is to catch insufficient-RAM and insufficient-disk # situations BEFORE a long batch starts, rather than letting the kernel get # OOM-killed mid-run (which leaves no useful diagnostic) or running out of # disk halfway through writing a 4 GB float32 TIFF (which leaves a corrupt # checkpoint file). # # The disk side reuses estimate_pipeline_output_size + get_free_disk_space # above. The RAM side is below. # # RAM is estimated as: # peak_per_task ≈ padded_pixel_count * 4 bytes * PEAK_FACTOR # # PEAK_FACTOR = 6 is an empirical estimate of how many full-resolution # float32 image copies live in memory simultaneously at the worst point of # a single-task run (stage 2 or stage 3, where padded post + padded pre + # downsampled working copies + warp output buffer + SIFT internals all # coexist). Adjust at the top of this section if real measurements show # differently. # ============================================================================== PEAK_FACTOR_PER_TASK = 4.0 WARN_FRACTION = 0.80 # warn if estimate exceeds this fraction of available HARD_FAIL_FRACTION = 1.0 # raise if estimate exceeds this fraction
[docs] def estimate_pipeline_memory(sample_dye_pairs, raw_dir='raw', pad=2000, n_workers=1): """ Estimate peak RAM required to process a list of (sample, dye) pairs. Reads only TIFF headers (no pixel data) to get raw image dimensions, inflates by the padding, multiplies by float32 bytes/pixel and by :data:`PEAK_FACTOR_PER_TASK` to account for the several full-resolution arrays that coexist at peak. For parallel runs, multiplies by ``n_workers``. The estimate is the **worst-case peak across tasks**, not the sum, because tasks run sequentially in serial mode (only one task in memory at a time) and concurrently in parallel mode (n_workers tasks at a time, but each could be the worst one). Parameters ---------- sample_dye_pairs : list of tuple ``(sample, dye)`` combinations. raw_dir : str Root raw image directory. pad : int Padding parameter (default 2000). Larger padding inflates the memory estimate substantially. n_workers : int Number of concurrent worker processes. ``1`` for serial. Returns ------- estimate : dict Keys: ``peak_bytes`` (int, worst-case single-task peak), ``concurrent_peak_bytes`` (int, that times n_workers), ``per_task_bytes`` (list of int, one per pair), ``warnings`` (list of str), and ``n_resolvable`` (int). """ # Defer the tifffile import to keep utils.py importable for callers # that don't need this estimate (e.g. on systems without tifffile). try: import tifffile except ImportError: return { 'peak_bytes': None, 'concurrent_peak_bytes': None, 'per_task_bytes': [], 'warnings': ['tifffile not available; cannot estimate RAM'], 'n_resolvable': 0, } per_task = [] warnings = [] for sample, dye in sample_dye_pairs: sample_dir = os.path.join(raw_dir, sample) pairs, _ = classify_raw_files(sample_dir) if dye not in pairs or not pairs[dye]['pre'] or not pairs[dye]['post']: warnings.append( f"{sample} / {dye}: cannot resolve raw files for estimate") per_task.append(0) continue # Read just the TIFF header — no pixel data. Use the post-stain # file as the size reference; pre-stain dimensions are # essentially identical in practice. raw_path = pairs[dye]['post'][0] try: with tifffile.TiffFile(raw_path) as tf: shape = tf.pages[0].shape except Exception as e: warnings.append( f"{sample} / {dye}: cannot read TIFF header {raw_path}: {e}") per_task.append(0) continue # shape is (H, W) for grayscale or (H, W, C) for RGB. We work # in 2-D extracted-channel space, so use H * W regardless. if len(shape) >= 2: h, w = shape[0], shape[1] else: warnings.append( f"{sample} / {dye}: unexpected TIFF shape {shape}") per_task.append(0) continue padded_h = h + 2 * pad padded_w = w + 2 * pad bytes_per_image = padded_h * padded_w * 4 # float32 task_peak = int(bytes_per_image * PEAK_FACTOR_PER_TASK) per_task.append(task_peak) if per_task and any(b > 0 for b in per_task): peak = max(b for b in per_task if b > 0) else: peak = 0 concurrent_peak = peak * max(1, n_workers) return { 'peak_bytes': peak, 'concurrent_peak_bytes': concurrent_peak, 'per_task_bytes': per_task, 'warnings': warnings, 'n_resolvable': sum(1 for b in per_task if b > 0), }
[docs] def get_available_memory(): """Return available RAM in bytes, or None if psutil unavailable.""" psutil = _get_psutil() if psutil is None: return None return psutil.virtual_memory().available
[docs] def preflight_check(sample_dye_pairs, output_dir='processed', raw_dir='raw', pad=2000, n_workers=1, checkpoint_format='tiff', n_scale_methods=1, save_all_intermediates=False, force_run=False): """ Combined RAM + disk pre-flight check before a batch or single run. Estimates both peak RAM (across all concurrent tasks) and total disk output. Compares each against the available headroom on the system and either warns or raises :class:`MemoryError` / :class:`OSError` depending on severity. Severity bands (each resource checked independently): - estimate ≤ 80% of available — silent. - 80%-100% — print a warning, proceed. - > 100% — raise ``MemoryError`` (RAM) or ``OSError`` (disk). Callers can pass ``force_run=True`` to downgrade the hard fail to a warning. Useful when the estimate is known to be conservative or when the user has already cleared other processes. Parameters ---------- sample_dye_pairs : list of tuple Tasks to check. output_dir : str Where checkpoints will be written. Free disk space is measured at this path. raw_dir : str Source raw images, needed to read TIFF dimensions. pad : int Padding parameter. n_workers : int Concurrent workers. ``1`` for serial. checkpoint_format, n_scale_methods, save_all_intermediates : Passed through to :func:`estimate_pipeline_output_size`. force_run : bool If True, hard-fail conditions are downgraded to warnings. Raises ------ MemoryError When the RAM estimate exceeds 100% of available and ``force_run=False``. OSError When the disk estimate exceeds 100% of free space and ``force_run=False``. """ print("\n=== Pre-flight resource check ===") # ── RAM ───────────────────────────────────────────────────────── ram_est = estimate_pipeline_memory( sample_dye_pairs, raw_dir=raw_dir, pad=pad, n_workers=n_workers) avail_ram = get_available_memory() if ram_est['concurrent_peak_bytes'] is None: print(" RAM: estimate unavailable (tifffile not installed).") elif avail_ram is None: print(f" RAM: estimated peak " f"{format_bytes(ram_est['concurrent_peak_bytes'])}; " "available unknown (psutil not installed).") else: peak = ram_est['concurrent_peak_bytes'] fraction = peak / avail_ram if avail_ram > 0 else float('inf') line = (f" RAM: estimated peak {format_bytes(peak)} " f"vs {format_bytes(avail_ram)} available " f"({fraction:.0%})") if fraction > HARD_FAIL_FRACTION: print(f" {line} ❌ EXCEEDS AVAILABLE") msg = ( f"Estimated peak RAM ({format_bytes(peak)}) exceeds " f"available RAM ({format_bytes(avail_ram)}).\n" f" - {len(sample_dye_pairs)} task(s), {n_workers} concurrent " f"worker(s), pad={pad}.\n" f" - Per-task peak estimate uses a {PEAK_FACTOR_PER_TASK}x " "factor over single-image float32 size.\n" f"Options:\n" f" 1. Reduce n_workers (currently {n_workers}).\n" f" 2. Reduce pad (currently {pad}; try 1000 or 500).\n" f" 3. Close other applications to free RAM.\n" f" 4. Use parallel='subprocess' mode if leaks are " "suspected.\n" f" 5. Override this check with force_run=True (NOT " "recommended; will likely OOM-kill the kernel).") if force_run: print(f" (force_run=True; proceeding anyway)") else: raise MemoryError(msg) elif fraction > WARN_FRACTION: print(f" {line} ⚠️ HIGH") print(f" Close other applications if possible. " "Consider reducing n_workers or pad.") else: print(f" {line} ✓") for w in ram_est.get('warnings', []): print(f" [ram warning] {w}") # ── Disk ──────────────────────────────────────────────────────── disk_est = estimate_pipeline_output_size( sample_dye_pairs, raw_dir=raw_dir, pad=pad, save_all_intermediates=save_all_intermediates, n_scale_methods=n_scale_methods, checkpoint_format=checkpoint_format) # Probe the output dir's filesystem; if the dir doesn't exist yet, # walk up to the nearest existing parent (output dirs are # auto-created by the pipeline but might not exist at check time). probe = output_dir while probe and not os.path.exists(probe): parent = os.path.dirname(probe) if parent == probe: break probe = parent avail_disk = get_free_disk_space(probe) if probe else None total_out = disk_est['total_bytes'] if avail_disk is None: print(f" Disk: estimated total {format_bytes(total_out)}; " "free space unknown.") else: fraction = total_out / avail_disk if avail_disk > 0 else float('inf') line = (f" Disk: estimated total {format_bytes(total_out)} " f"vs {format_bytes(avail_disk)} free " f"({fraction:.0%})") if fraction > HARD_FAIL_FRACTION: print(f" {line} ❌ EXCEEDS FREE SPACE") msg = ( f"Estimated output size ({format_bytes(total_out)}) " f"exceeds free disk space ({format_bytes(avail_disk)}) " f"at {probe}.\n" f" - {disk_est['n_resolvable']} task(s).\n" f" - checkpoint_format='{checkpoint_format}' " f"(switch to 'tiff' or 'fits' alone to roughly halve).\n" f"Options:\n" f" 1. Free up disk space at {probe}.\n" f" 2. Switch checkpoint_format to 'tiff' or 'fits' " "(not 'both').\n" f" 3. Run a smaller subset of samples/dyes.\n" f" 4. Override this check with force_run=True.") if force_run: print(f" (force_run=True; proceeding anyway)") else: raise OSError(msg) elif fraction > WARN_FRACTION: print(f" {line} ⚠️ HIGH") print(" Consider freeing disk or switching checkpoint_format " "to one format only.") else: print(f" {line} ✓") for w in disk_est.get('warnings', []): print(f" [disk warning] {w}") print("=================================\n")
# ============================================================================== # MEMORY WATCHDOG # ============================================================================== # # Polls available RAM from a background thread. When available RAM falls # below a threshold, sets a flag. The pipeline polls this flag at stage # boundaries via check_or_raise(); if the flag is set, the run aborts # with a clean MemoryError rather than getting SIGKILLed by the OS later. # # This won't catch every OOM — a single huge allocation can spike between # poll intervals and trip the OOM killer immediately — but it should catch # the slow swap-filling pattern that's the most common cause of "kernel # dies mid-batch" reports. # ==============================================================================
[docs] class MemoryWatchdog: """ Background thread that polls available RAM and signals when it falls below a threshold. Usage ----- >>> from exo2micro.utils import MemoryWatchdog >>> wd = MemoryWatchdog(min_available_gb=0.5) >>> wd.start() >>> try: ... for sample, dye in tasks: ... wd.check_or_raise(f"before {sample}/{dye}") ... SampleDye(sample, dye).run() ... finally: ... wd.stop() The watchdog itself doesn't interrupt anything — it just sets a flag. The caller must poll via :meth:`check_or_raise` at safe abort points (typically stage boundaries) to actually halt. Parameters ---------- min_available_gb : float Threshold in gibibytes. When ``psutil.virtual_memory().available`` drops below this, the watchdog trips. Default 0.5 GB. poll_interval_sec : float How often to poll RAM. Default 5.0 seconds. Polling too fast wastes CPU; too slow misses fast-growing leaks. verbose : bool If True, prints a warning line each time RAM drops below the threshold. Default False (silent until tripped). """ def __init__(self, min_available_gb=0.5, poll_interval_sec=5.0, verbose=False): self.min_available_bytes = int(min_available_gb * (1024 ** 3)) self.poll_interval_sec = poll_interval_sec self.verbose = verbose self._psutil = _get_psutil() self._thread = None self._stop_event = None self._tripped = False self._tripped_at_bytes = None if self._psutil is None: print("[MemoryWatchdog] psutil not installed; watchdog disabled. " "`pip install psutil` to enable.")
[docs] def start(self): """Start the background polling thread.""" if self._psutil is None: return # silently no-op import threading self._stop_event = threading.Event() self._thread = threading.Thread( target=self._run, daemon=True, name='exo2micro-memory-watchdog') self._thread.start()
def _run(self): while not self._stop_event.is_set(): avail = self._psutil.virtual_memory().available if avail < self.min_available_bytes: if not self._tripped: self._tripped = True self._tripped_at_bytes = avail if self.verbose: print(f"[MemoryWatchdog] tripped: " f"{avail / 1e9:.2f} GB available " f"(threshold " f"{self.min_available_bytes / 1e9:.2f} GB)") self._stop_event.wait(self.poll_interval_sec)
[docs] def stop(self): """Stop the background polling thread.""" if self._stop_event is not None: self._stop_event.set() if self._thread is not None: self._thread.join(timeout=2.0) self._thread = None self._stop_event = None
[docs] def is_tripped(self): """Return True if the watchdog has crossed its threshold.""" return self._tripped
[docs] def check_or_raise(self, label=''): """ Raise :class:`MemoryError` if the watchdog is tripped. Call at safe abort points (e.g. between pipeline stages). ``label`` is included in the error message to identify where the abort happened. """ if self._tripped: tripped_gb = (self._tripped_at_bytes or 0) / 1e9 threshold_gb = self.min_available_bytes / 1e9 raise MemoryError( f"MemoryWatchdog triggered abort at '{label}': " f"only {tripped_gb:.2f} GB RAM available " f"(threshold {threshold_gb:.2f} GB). " f"This is a clean abort to avoid the OS OOM-killing the " f"process. To recover: reduce n_workers, reduce pad, or " f"close other applications.")
[docs] def reset(self): """Clear the tripped flag (e.g. after handling a low-RAM event).""" self._tripped = False self._tripped_at_bytes = None