![]() |
OpenMS
|
Plans memory-bounded OpenSwathWorkflow work waves and exposes a small concurrency-limiter pair. More...
#include <OpenMS/ANALYSIS/OPENSWATH/OpenSwathWorkflowScheduler.h>
Classes | |
| struct | ConcurrencyEstimate |
| Derived scheduler limits computed by estimateConcurrency for one extraction/scoring run. More... | |
| class | ConcurrencyLimiter |
| Counting semaphore for paths that still stream one SWATH per worker. More... | |
| struct | Options |
| Memory and concurrency knobs used by wave planning and batch sizing. More... | |
| class | ScopedSlot |
| RAII slot guard for ConcurrencyLimiter — acquires on construction, releases on destruction. More... | |
| struct | Wave |
A wave: a group of SWATH indices (into the original swath_maps vector) that may be resident at the same time. Populated by planWaves. More... | |
Static Public Member Functions | |
| static UInt64 | estimateAvailableMemoryForScoring (const Options &options) |
| Estimate memory available for scoring after subtracting OS, I/O, and OSW-writer reserves. | |
| static ConcurrencyEstimate | estimateConcurrency (const std::vector< OpenSwath::SwathMap > &swath_maps, const Options &options) |
| Derive per-run concurrency limits from a SWATH-map vector and an Options block. | |
| static std::vector< Wave > | planWaves (const std::vector< OpenSwath::SwathMap > &swath_maps, const Options &options, const ConcurrencyEstimate &estimate) |
| Group non-MS1 SWATHs into waves that simultaneously honour the count and memory caps. | |
| static Size | chooseInnerBatchSize (Size total_compounds, Size active_swaths, Size scoring_threads, int user_inner_batch_size, const Options &options) |
| Pick a scoring batch size that keeps the worker pool occupied. | |
Plans memory-bounded OpenSwathWorkflow work waves and exposes a small concurrency-limiter pair.
The scheduler keeps the policy for memory-aware SWATH concurrency outside the extraction and scoring implementation. It estimates how many SWATH maps can be resident at the same time given the configured per-SWATH memory model, then groups the non-MS1 maps of an OpenSwath::SwathMap vector into waves that can be loaded as a unit before their inner scoring jobs are distributed across worker threads. The class is a namespace in disguise: all entry points are static, all nested types are aggregates, and the two nested classes (ConcurrencyLimiter / ScopedSlot) are a tiny standalone slot-limiting primitive for legacy streaming paths.
| struct OpenMS::OpenSwathWorkflowScheduler::ConcurrencyEstimate |
Derived scheduler limits computed by estimateConcurrency for one extraction/scoring run.
| Class Members | ||
|---|---|---|
| Size | avg_spectra_per_swath = 0 | Mean spectra per non-MS1 SWATH (integer division across non_ms1_swath_count). |
| UInt64 | estimated_bytes_per_swath = 0 | Predicted memory contribution of one non-MS1 SWATH based on the Options model. |
| Size | max_concurrent_swaths = 1 |
Final cap on simultaneously-resident non-MS1 SWATH maps. Always at least 1. |
| UInt64 | memory_budget_bytes = 0 |
Memory budget returned by estimateAvailableMemoryForScoring; 0 if the budget could not be determined (treated as unbounded by planWaves). |
| Size | non_ms1_swath_count = 0 | Number of non-MS1 SWATH maps with a non-null data pointer found in the input vector. |
| struct OpenMS::OpenSwathWorkflowScheduler::Options |
Memory and concurrency knobs used by wave planning and batch sizing.
Default values target a multi-gigabyte memory budget on a typical OpenSwath workflow. All quantities are byte counts unless otherwise noted.
| Class Members | ||
|---|---|---|
| Size | avg_transitions_per_swath = 0 |
Average number of chromatograms extracted per non-MS1 SWATH. 0 disables transition-density accounting (the extra term is dropped from the per-SWATH byte estimate). |
| UInt64 | bytes_per_chromatogram_point = 64ULL |
Estimated bytes retained per extracted chromatogram point. Default 64. |
| UInt64 | bytes_per_spectrum = 600ULL * 1024ULL |
Estimated memory contribution of one cached spectrum. Default 600 KiB. |
| int | max_concurrent_swaths = -1 |
User override for concurrent SWATHs. Values > 0 cap the auto-estimate; <= 0 enables memory-based planning constrained only by scoring_threads. |
| Size | max_inner_batch_size = 10000 |
Largest scoring batch size that may be picked automatically by chooseInnerBatchSize. Default 10000. |
| double | memory_usage_fraction = 0.90 |
Fraction of the remaining free memory the scheduler may reserve. Internally clamped to [0.05, 0.95] by estimateAvailableMemoryForScoring. Default 0.90. |
| Size | min_inner_batch_size = 2000 |
Smallest scoring batch size that may be picked automatically by chooseInnerBatchSize. Default 2000. |
| UInt64 | osw_buffer_bytes = 2ULL * 1024ULL * 1024ULL * 1024ULL |
OSW writer memory reserved outside the scheduler's budget. Default 2 GiB. |
| UInt64 | per_swath_overhead_bytes = 100ULL * 1024ULL * 1024ULL |
Fixed estimated overhead per loaded SWATH. Default 100 MiB. |
| Size | scoring_threads = 1 |
Number of scoring worker threads available. When max_concurrent_swaths is <= 0, the auto SWATH concurrency will not exceed this. |
| Size | target_jobs_per_thread = 3 |
Target number of queued scoring jobs per worker thread for the chooseInnerBatchSize heuristic. Default 3. |
| struct OpenMS::OpenSwathWorkflowScheduler::Wave |
A wave: a group of SWATH indices (into the original swath_maps vector) that may be resident at the same time. Populated by planWaves.
| Class Members | ||
|---|---|---|
| UInt64 | estimated_bytes = 0 | Sum of the per-SWATH byte estimates (derived from Options) for the indices above. |
| vector< Size > | swath_indices |
Indices into the original swath_maps vector. Always non-MS1. |
|
static |
Pick a scoring batch size that keeps the worker pool occupied.
Behaviour:
total_compounds == 0 returns 0.user_inner_batch_size > 0 short-circuits to min(user_inner_batch_size, total_compounds).max(active_swaths, scoring_threads * target_jobs_per_thread) jobs in flight: batch_size = ceil(total_compounds / target_jobs), then clamped to [min_inner_batch_size, max_inner_batch_size] and finally capped at total_compounds.scoring_threads and active_swaths are each treated as max(1, value) inside the heuristic so callers may pass 0 without surprises.
Estimate memory available for scoring after subtracting OS, I/O, and OSW-writer reserves.
Queries the OS via SysInfo::getFreeSystemMemory, subtracts a hardcoded 2 GiB OS reserve plus a 500 MiB I/O-buffer reserve plus options.osw_buffer_bytes, and returns the remainder multiplied by options.memory_usage_fraction (clamped to [0.05, 0.95]).
Returns 0 when SysInfo::getFreeSystemMemory fails or when the reserves already exceed the reported free memory.
|
static |
Derive per-run concurrency limits from a SWATH-map vector and an Options block.
Counts non-MS1 SWATH maps that carry a non-null data pointer, computes the mean spectra count, and derives the per-SWATH byte estimate as avg_spectra * bytes_per_spectrum + per_swath_overhead + (transition-density term). The transition-density term is included only when options.avg_transitions_per_swath and options.bytes_per_chromatogram_point are both non-zero.
The reported ConcurrencyEstimate::max_concurrent_swaths is the minimum of:
options.max_concurrent_swaths when > 0, or max(1, scoring_threads) otherwise. The final value is clamped to [1, non_ms1_swath_count].When the input vector contains no non-MS1 SWATHs, the returned estimate is default-constructed (all zeros except max_concurrent_swaths which stays at 1).
|
static |
Group non-MS1 SWATHs into waves that simultaneously honour the count and memory caps.
Walks swath_maps in input order, skips MS1 maps, and assigns each remaining SWATH to the current wave. The wave is closed (and a new one started) when either of the following becomes true:
estimate.max_concurrent_swaths entries, orestimate.memory_budget_bytes (a budget of 0 is treated as unbounded).Per-SWATH byte cost uses the actual spectra count when sptr is non-null; otherwise falls back to estimate.avg_spectra_per_swath. Returns waves in the same order their contents appear in the input vector.