Parallel Module
parallel.py
Batch processing for the exo2micro pipeline.
Provides serial and parallel execution of multiple sample+dye combinations.
Each task is processed by creating a SampleDye instance and
calling run().
Usage
from exo2micro.parallel import run_batch
results = run_batch(
samples=['CD070', 'CD063'],
dyes=['SybrGld', 'DAPI'],
parallel=True,
n_workers=4,
output_dir='processed',
)
Strict vs lenient mode
By default (strict_dyes=True), run_batch() raises
FileNotFoundError if any requested (sample, dye) pair has
no raw files on disk. The error message lists every missing pair so
typos surface immediately rather than after a long batch.
To skip missing pairs silently (e.g. when you have heterogeneous
samples where not every dye exists for every sample), pass
strict_dyes=False. Skipped pairs are listed in the run log and
omitted from the task list.
Notes
Parallel mode uses multiprocessing.Pool. On macOS this uses
spawn, so each worker starts a fresh Python interpreter. The
process_one function is importable from this module by design.
Memory and execution mode
The serial loop in run_serial() explicitly releases per-task
memory (matplotlib figures + the SampleDye instance) between
tasks. On low-RAM machines, prefer parallel=False over
parallel=True, n_workers=1 — the serial loop’s cleanup is more
aggressive than relying on a single worker process to recycle.
Rule of thumb for n_workers:
n_workers * (peak RAM per sample) < available RAM.
If images are large (>1 GB each), start with n_workers=4 and
adjust based on observed memory usage.
- exo2micro.parallel.build_task_list(pairs, params=None, output_dir='processed', raw_dir='raw', from_stage=None, to_stage=None, force=False, checkpoint_format='tiff')[source]
Build the list of
(sample, dye, params)tuples for batch processing.This is a low-level helper. Callers are responsible for resolving which
(sample, dye)pairs actually exist on disk — seeexo2micro.utils.discover_tasks().run_batch()does this automatically; only callbuild_task_listdirectly if you want full control over which pairs are queued.- Parameters:
pairs (list of (str, str) tuples) – Explicit list of
(sample, dye)combinations to queue. No filesystem checks are performed here; missing files will surface as task-level errors at run time.params (dict or None) – Pipeline parameters to apply to all tasks. If
None, each SampleDye uses its built-in defaults.output_dir (str) – Root output directory (default
'processed').raw_dir (str) – Root raw image directory (default
'raw').from_stage (int or None) – Force re-run from this stage onward. Passed through to
SampleDye.run().to_stage (int or None) – Stop after this stage. Passed through to
SampleDye.run().force (bool) – If True, re-run stages even if checkpoints exist.
checkpoint_format ({'tiff', 'fits', 'both'}) – Which file format(s) to write for each checkpoint (default
'tiff'). Passed through toSampleDye.
- Returns:
Each tuple is
(sample, dye, params_dict).- Return type:
- exo2micro.parallel.print_summary(results)[source]
Print a summary table for all completed tasks.
Shows the Moffat-fit scale estimate for each run, and — when any result has a
scale_percentile_valueormanual_scale— additional columns for those alternative scales. Failed tasks are listed in a “Problems” section after the main table with the full error message, so users running large batches can see all failures consolidated in one place.- Parameters:
results (list of dict) – Results from
run_batch(),run_serial(), orrun_parallel().
- exo2micro.parallel.process_one(args)[source]
Process a single (sample, dye) combination.
This function must be importable by name for multiprocessing (required on macOS with the
spawnstart method).- Parameters:
args (tuple) –
(sample, dye, params)whereparamsis a dict of pipeline parameters. The keysoutput_dir,raw_dir,from_stage,to_stage, andforceare consumed here; the rest are passed toSampleDye.set_params().- Returns:
Result dict with at least
sample,dye, andstatuskeys, plusscale_estimateon successful runs.- Return type:
- exo2micro.parallel.run_batch(samples, dyes, parallel=False, n_workers=4, params=None, output_dir='processed', raw_dir='raw', from_stage=None, to_stage=None, force=False, checkpoint_format='tiff', strict_dyes=True, memory_debug=False, timeout_per_task=None, force_run=False)[source]
High-level batch processing entry point.
Resolves the requested
samples × dyesagainst what’s actually on disk, then processes every present combination either serially or across a worker pool, and prints a summary table at the end.Discovery and strict mode
Before queueing tasks, the requested cartesian product is filtered against the filesystem via
exo2micro.utils.discover_tasks(). A pair is considered “present” when both a pre-stain and a post-stain file exist for that dye in the sample’s directory.strict_dyes=True(default): if any requested pair is missing, raiseFileNotFoundErrorwith a single message listing every missing pair. Catches typos before a long batch starts.strict_dyes=False: missing pairs are skipped silently (their reasons are printed in the batch summary, not raised).
Execution modes
Three modes, controlled by the
parallelargument:parallel=False(default) — serial in current process. One task at a time, with explicit matplotlib figure cleanup andgc.collect()between tasks. Safe on low-RAM machines for moderate batch sizes.parallel=True— process pool ofn_workers. Multiple tasks run concurrently. Fastest when you have CPU and RAM to spare; can OOM on low-RAM machines.parallel='subprocess'— subprocess per task. One fresh Python process per task, exited and reclaimed by the OS between tasks. Use when serial mode is OOMing on a low-RAM machine due to leaks (matplotlib figures, retained widget state, accumulated cv2/tifffile caches) thatgc.collect()can’t reach. Adds ~1-2 sec per-task spawn overhead; negligible for large samples.
- param samples:
Sample names.
- type samples:
list of str
- param dyes:
Dye/channel names.
- type dyes:
list of str
- param parallel:
False(default) for serial in-process.Truefor a process pool.'subprocess'for one fresh subprocess per task. For small batches serial is usually faster than parallel due to spawn overhead. On low-RAM machines, preferparallel=Falseoverparallel=True, n_workers=1— the serial loop releases memory more aggressively between tasks. If serial is still OOMing, tryparallel='subprocess'.- type parallel:
bool or str
- param n_workers:
Number of workers when
parallel=True(default 4). Ignored for the other two modes.- type n_workers:
int
- param params:
Pipeline parameters applied to every task.
- type params:
dict or None
- param output_dir:
Root output directory.
- type output_dir:
str
- param raw_dir:
Root raw image directory.
- type raw_dir:
str
- param from_stage:
Force re-run from this stage onward.
- type from_stage:
int or None
- param to_stage:
Stop after this stage.
- type to_stage:
int or None
- param force:
If True, re-run stages even if checkpoints exist.
- type force:
bool
- param checkpoint_format:
Which file format(s) to write for each intermediate checkpoint (default
'tiff'). Using'tiff'or'fits'alone cuts disk usage roughly in half compared to'both'.- type checkpoint_format:
{‘tiff’, ‘fits’, ‘both’}
- param strict_dyes:
If True (default), raise
FileNotFoundErrorwhen any requested(sample, dye)pair has no raw files on disk. Set to False to skip such pairs silently and run only the present ones.- type strict_dyes:
bool
- param memory_debug:
If True, prints RSS before/after each task (with gc.collect() between) so leaks can be diagnosed. Requires
psutil. Useful for triaging “kernel dies mid-batch” reports. DefaultFalse. Only effective in serial and subprocess modes; the parallel pool path ignores it (each worker is in its own process, so per-task RSS in the parent isn’t meaningful).- type memory_debug:
bool
- param timeout_per_task:
Only used when
parallel='subprocess'. Maximum seconds for any single task;None(default) means no timeout. Recommended for unattended overnight batches so a wedged task doesn’t block the whole run.- type timeout_per_task:
float or None
- param force_run:
If True, downgrade any hard-fail pre-flight check (RAM or disk estimate exceeds available) to a warning. Default
False. Useful when the estimate is known to be conservative or you’ve already cleared other processes; not recommended otherwise as an OOM kill mid-batch may corrupt checkpoint files.- type force_run:
bool
- returns:
One result per resolvable sample+dye combination.
- rtype:
list of dict
- raises FileNotFoundError:
When
strict_dyes=Trueand one or more requested(sample, dye)pairs cannot be resolved from raw files, OR when the raw directory itself has a fatal layout problem (missing, empty, no per-sample subfolders) regardless ofstrict_dyes.- raises MemoryError:
When the RAM pre-flight estimate exceeds available memory and
force_run=False.- raises OSError:
When the disk pre-flight estimate exceeds free space at
output_dirandforce_run=False.
- exo2micro.parallel.run_parallel(tasks, n_workers=4)[source]
Run all tasks in parallel across
n_workersprocesses.Each worker saves figures to disk; this function blocks until all workers are done.
- exo2micro.parallel.run_serial(tasks, tracker=None)[source]
Run all tasks sequentially in the current process.
After each task, explicitly closes all matplotlib figures and runs a garbage-collection pass. This is more aggressive than relying on Python’s reference-counted cleanup at scope exit and matters on low-RAM machines processing large images: per-task numpy arrays can otherwise linger long enough to overlap the next task’s allocations.
- Parameters:
tasks (list of tuple) – Each tuple is
(sample, dye, params_dict), as built bybuild_task_list().tracker (MemoryTracker or None) – Optional memory tracker. When provided, snapshots RSS before and after each task (with a gc.collect() pass between them) so leaks can be diagnosed.
None(default) means no tracking.
- Returns:
Results for each task.
- Return type:
- exo2micro.parallel.run_subprocess(tasks, memory_debug=False, timeout_per_task=None)[source]
Run all tasks, each in its own fresh Python subprocess.
Use this on low-RAM machines where serial mode runs OOM partway through a batch. Each task gets a clean process, so any leaked matplotlib figures / numpy arrays / cv2 caches are reclaimed by the OS when the process exits between tasks.
- Parameters:
tasks (list of tuple) – Each tuple is
(sample, dye, params_dict), as built bybuild_task_list().memory_debug (bool) – If True, prints RSS in the parent process before and after each task. Useful for confirming the OS is actually reclaiming memory between tasks. Requires
psutil. DefaultFalse.timeout_per_task (float or None) – Maximum seconds for any single task.
None(default) means no timeout. Recommended to set this for unattended overnight batches so a wedged task doesn’t block the whole run.
- Returns:
Results for each task. Failed tasks have
statusstarting with'error: 'rather than raising.- Return type:
Notes
A few seconds of overhead per task for subprocess launch + import. On a batch of large samples (alignment runtime measured in minutes) this is invisible. On a batch of small/fast tasks this overhead may dominate — use plain
run_serial()in that case.