Parallel Module

parallel.py

Batch processing for the exo2micro pipeline.

Provides serial and parallel execution of multiple sample+dye combinations. Each task is processed by creating a SampleDye instance and calling run().

Usage

from exo2micro.parallel import run_batch

results = run_batch(
    samples=['CD070', 'CD063'],
    dyes=['SybrGld', 'DAPI'],
    parallel=True,
    n_workers=4,
    output_dir='processed',
)

Strict vs lenient mode

By default (strict_dyes=True), run_batch() raises FileNotFoundError if any requested (sample, dye) pair has no raw files on disk. The error message lists every missing pair so typos surface immediately rather than after a long batch.

To skip missing pairs silently (e.g. when you have heterogeneous samples where not every dye exists for every sample), pass strict_dyes=False. Skipped pairs are listed in the run log and omitted from the task list.

Notes

Parallel mode uses multiprocessing.Pool. On macOS this uses spawn, so each worker starts a fresh Python interpreter. The process_one function is importable from this module by design.

Memory and execution mode

The serial loop in run_serial() explicitly releases per-task memory (matplotlib figures + the SampleDye instance) between tasks. On low-RAM machines, prefer parallel=False over parallel=True, n_workers=1 — the serial loop’s cleanup is more aggressive than relying on a single worker process to recycle.

Rule of thumb for n_workers: n_workers * (peak RAM per sample) < available RAM. If images are large (>1 GB each), start with n_workers=4 and adjust based on observed memory usage.

exo2micro.parallel.build_task_list(pairs, params=None, output_dir='processed', raw_dir='raw', from_stage=None, to_stage=None, force=False, checkpoint_format='tiff')[source]

Build the list of (sample, dye, params) tuples for batch processing.

This is a low-level helper. Callers are responsible for resolving which (sample, dye) pairs actually exist on disk — see exo2micro.utils.discover_tasks(). run_batch() does this automatically; only call build_task_list directly if you want full control over which pairs are queued.

Parameters:
  • pairs (list of (str, str) tuples) – Explicit list of (sample, dye) combinations to queue. No filesystem checks are performed here; missing files will surface as task-level errors at run time.

  • params (dict or None) – Pipeline parameters to apply to all tasks. If None, each SampleDye uses its built-in defaults.

  • output_dir (str) – Root output directory (default 'processed').

  • raw_dir (str) – Root raw image directory (default 'raw').

  • from_stage (int or None) – Force re-run from this stage onward. Passed through to SampleDye.run().

  • to_stage (int or None) – Stop after this stage. Passed through to SampleDye.run().

  • force (bool) – If True, re-run stages even if checkpoints exist.

  • checkpoint_format ({'tiff', 'fits', 'both'}) – Which file format(s) to write for each checkpoint (default 'tiff'). Passed through to SampleDye.

Returns:

Each tuple is (sample, dye, params_dict).

Return type:

list of tuple

exo2micro.parallel.print_summary(results)[source]

Print a summary table for all completed tasks.

Shows the Moffat-fit scale estimate for each run, and — when any result has a scale_percentile_value or manual_scale — additional columns for those alternative scales. Failed tasks are listed in a “Problems” section after the main table with the full error message, so users running large batches can see all failures consolidated in one place.

Parameters:

results (list of dict) – Results from run_batch(), run_serial(), or run_parallel().

exo2micro.parallel.process_one(args)[source]

Process a single (sample, dye) combination.

This function must be importable by name for multiprocessing (required on macOS with the spawn start method).

Parameters:

args (tuple) – (sample, dye, params) where params is a dict of pipeline parameters. The keys output_dir, raw_dir, from_stage, to_stage, and force are consumed here; the rest are passed to SampleDye.set_params().

Returns:

Result dict with at least sample, dye, and status keys, plus scale_estimate on successful runs.

Return type:

dict

exo2micro.parallel.run_batch(samples, dyes, parallel=False, n_workers=4, params=None, output_dir='processed', raw_dir='raw', from_stage=None, to_stage=None, force=False, checkpoint_format='tiff', strict_dyes=True, memory_debug=False, timeout_per_task=None, force_run=False)[source]

High-level batch processing entry point.

Resolves the requested samples × dyes against what’s actually on disk, then processes every present combination either serially or across a worker pool, and prints a summary table at the end.

Discovery and strict mode

Before queueing tasks, the requested cartesian product is filtered against the filesystem via exo2micro.utils.discover_tasks(). A pair is considered “present” when both a pre-stain and a post-stain file exist for that dye in the sample’s directory.

  • strict_dyes=True (default): if any requested pair is missing, raise FileNotFoundError with a single message listing every missing pair. Catches typos before a long batch starts.

  • strict_dyes=False: missing pairs are skipped silently (their reasons are printed in the batch summary, not raised).

Execution modes

Three modes, controlled by the parallel argument:

  • parallel=False (default) — serial in current process. One task at a time, with explicit matplotlib figure cleanup and gc.collect() between tasks. Safe on low-RAM machines for moderate batch sizes.

  • parallel=Trueprocess pool of n_workers. Multiple tasks run concurrently. Fastest when you have CPU and RAM to spare; can OOM on low-RAM machines.

  • parallel='subprocess'subprocess per task. One fresh Python process per task, exited and reclaimed by the OS between tasks. Use when serial mode is OOMing on a low-RAM machine due to leaks (matplotlib figures, retained widget state, accumulated cv2/tifffile caches) that gc.collect() can’t reach. Adds ~1-2 sec per-task spawn overhead; negligible for large samples.

param samples:

Sample names.

type samples:

list of str

param dyes:

Dye/channel names.

type dyes:

list of str

param parallel:

False (default) for serial in-process. True for a process pool. 'subprocess' for one fresh subprocess per task. For small batches serial is usually faster than parallel due to spawn overhead. On low-RAM machines, prefer parallel=False over parallel=True, n_workers=1 — the serial loop releases memory more aggressively between tasks. If serial is still OOMing, try parallel='subprocess'.

type parallel:

bool or str

param n_workers:

Number of workers when parallel=True (default 4). Ignored for the other two modes.

type n_workers:

int

param params:

Pipeline parameters applied to every task.

type params:

dict or None

param output_dir:

Root output directory.

type output_dir:

str

param raw_dir:

Root raw image directory.

type raw_dir:

str

param from_stage:

Force re-run from this stage onward.

type from_stage:

int or None

param to_stage:

Stop after this stage.

type to_stage:

int or None

param force:

If True, re-run stages even if checkpoints exist.

type force:

bool

param checkpoint_format:

Which file format(s) to write for each intermediate checkpoint (default 'tiff'). Using 'tiff' or 'fits' alone cuts disk usage roughly in half compared to 'both'.

type checkpoint_format:

{‘tiff’, ‘fits’, ‘both’}

param strict_dyes:

If True (default), raise FileNotFoundError when any requested (sample, dye) pair has no raw files on disk. Set to False to skip such pairs silently and run only the present ones.

type strict_dyes:

bool

param memory_debug:

If True, prints RSS before/after each task (with gc.collect() between) so leaks can be diagnosed. Requires psutil. Useful for triaging “kernel dies mid-batch” reports. Default False. Only effective in serial and subprocess modes; the parallel pool path ignores it (each worker is in its own process, so per-task RSS in the parent isn’t meaningful).

type memory_debug:

bool

param timeout_per_task:

Only used when parallel='subprocess'. Maximum seconds for any single task; None (default) means no timeout. Recommended for unattended overnight batches so a wedged task doesn’t block the whole run.

type timeout_per_task:

float or None

param force_run:

If True, downgrade any hard-fail pre-flight check (RAM or disk estimate exceeds available) to a warning. Default False. Useful when the estimate is known to be conservative or you’ve already cleared other processes; not recommended otherwise as an OOM kill mid-batch may corrupt checkpoint files.

type force_run:

bool

returns:

One result per resolvable sample+dye combination.

rtype:

list of dict

raises FileNotFoundError:

When strict_dyes=True and one or more requested (sample, dye) pairs cannot be resolved from raw files, OR when the raw directory itself has a fatal layout problem (missing, empty, no per-sample subfolders) regardless of strict_dyes.

raises MemoryError:

When the RAM pre-flight estimate exceeds available memory and force_run=False.

raises OSError:

When the disk pre-flight estimate exceeds free space at output_dir and force_run=False.

exo2micro.parallel.run_parallel(tasks, n_workers=4)[source]

Run all tasks in parallel across n_workers processes.

Each worker saves figures to disk; this function blocks until all workers are done.

Parameters:
  • tasks (list of tuple) – Each tuple is (sample, dye, params_dict), as built by build_task_list().

  • n_workers (int) – Number of parallel worker processes (default 4).

Returns:

Results for each task, in the same order as tasks.

Return type:

list of dict

exo2micro.parallel.run_serial(tasks, tracker=None)[source]

Run all tasks sequentially in the current process.

After each task, explicitly closes all matplotlib figures and runs a garbage-collection pass. This is more aggressive than relying on Python’s reference-counted cleanup at scope exit and matters on low-RAM machines processing large images: per-task numpy arrays can otherwise linger long enough to overlap the next task’s allocations.

Parameters:
  • tasks (list of tuple) – Each tuple is (sample, dye, params_dict), as built by build_task_list().

  • tracker (MemoryTracker or None) – Optional memory tracker. When provided, snapshots RSS before and after each task (with a gc.collect() pass between them) so leaks can be diagnosed. None (default) means no tracking.

Returns:

Results for each task.

Return type:

list of dict

exo2micro.parallel.run_subprocess(tasks, memory_debug=False, timeout_per_task=None)[source]

Run all tasks, each in its own fresh Python subprocess.

Use this on low-RAM machines where serial mode runs OOM partway through a batch. Each task gets a clean process, so any leaked matplotlib figures / numpy arrays / cv2 caches are reclaimed by the OS when the process exits between tasks.

Parameters:
  • tasks (list of tuple) – Each tuple is (sample, dye, params_dict), as built by build_task_list().

  • memory_debug (bool) – If True, prints RSS in the parent process before and after each task. Useful for confirming the OS is actually reclaiming memory between tasks. Requires psutil. Default False.

  • timeout_per_task (float or None) – Maximum seconds for any single task. None (default) means no timeout. Recommended to set this for unattended overnight batches so a wedged task doesn’t block the whole run.

Returns:

Results for each task. Failed tasks have status starting with 'error: ' rather than raising.

Return type:

list of dict

Notes

A few seconds of overhead per task for subprocess launch + import. On a batch of large samples (alignment runtime measured in minutes) this is invisible. On a batch of small/fast tasks this overhead may dominate — use plain run_serial() in that case.