OpenMS
Loading...
Searching...
No Matches
OpenSwathWorkflowScheduler Class Reference

Plans memory-bounded OpenSwathWorkflow work waves and exposes a small concurrency-limiter pair. More...

#include <OpenMS/ANALYSIS/OPENSWATH/OpenSwathWorkflowScheduler.h>

Classes

struct  ConcurrencyEstimate
 Derived scheduler limits computed by estimateConcurrency for one extraction/scoring run. More...
 
class  ConcurrencyLimiter
 Counting semaphore for paths that still stream one SWATH per worker. More...
 
struct  Options
 Memory and concurrency knobs used by wave planning and batch sizing. More...
 
class  ScopedSlot
 RAII slot guard for ConcurrencyLimiter — acquires on construction, releases on destruction. More...
 
struct  Wave
 A wave: a group of SWATH indices (into the original swath_maps vector) that may be resident at the same time. Populated by planWaves. More...
 

Static Public Member Functions

static UInt64 estimateAvailableMemoryForScoring (const Options &options)
 Estimate memory available for scoring after subtracting OS, I/O, and OSW-writer reserves.
 
static ConcurrencyEstimate estimateConcurrency (const std::vector< OpenSwath::SwathMap > &swath_maps, const Options &options)
 Derive per-run concurrency limits from a SWATH-map vector and an Options block.
 
static std::vector< WaveplanWaves (const std::vector< OpenSwath::SwathMap > &swath_maps, const Options &options, const ConcurrencyEstimate &estimate)
 Group non-MS1 SWATHs into waves that simultaneously honour the count and memory caps.
 
static Size chooseInnerBatchSize (Size total_compounds, Size active_swaths, Size scoring_threads, int user_inner_batch_size, const Options &options)
 Pick a scoring batch size that keeps the worker pool occupied.
 

Detailed Description

Plans memory-bounded OpenSwathWorkflow work waves and exposes a small concurrency-limiter pair.

The scheduler keeps the policy for memory-aware SWATH concurrency outside the extraction and scoring implementation. It estimates how many SWATH maps can be resident at the same time given the configured per-SWATH memory model, then groups the non-MS1 maps of an OpenSwath::SwathMap vector into waves that can be loaded as a unit before their inner scoring jobs are distributed across worker threads. The class is a namespace in disguise: all entry points are static, all nested types are aggregates, and the two nested classes (ConcurrencyLimiter / ScopedSlot) are a tiny standalone slot-limiting primitive for legacy streaming paths.


Class Documentation

◆ OpenMS::OpenSwathWorkflowScheduler::ConcurrencyEstimate

struct OpenMS::OpenSwathWorkflowScheduler::ConcurrencyEstimate

Derived scheduler limits computed by estimateConcurrency for one extraction/scoring run.

Collaboration diagram for OpenSwathWorkflowScheduler::ConcurrencyEstimate:
[legend]
Class Members
Size avg_spectra_per_swath = 0 Mean spectra per non-MS1 SWATH (integer division across non_ms1_swath_count).
UInt64 estimated_bytes_per_swath = 0 Predicted memory contribution of one non-MS1 SWATH based on the Options model.
Size max_concurrent_swaths = 1 Final cap on simultaneously-resident non-MS1 SWATH maps. Always at least 1.
UInt64 memory_budget_bytes = 0 Memory budget returned by estimateAvailableMemoryForScoring; 0 if the budget could not be determined (treated as unbounded by planWaves).
Size non_ms1_swath_count = 0 Number of non-MS1 SWATH maps with a non-null data pointer found in the input vector.

◆ OpenMS::OpenSwathWorkflowScheduler::Options

struct OpenMS::OpenSwathWorkflowScheduler::Options

Memory and concurrency knobs used by wave planning and batch sizing.

Default values target a multi-gigabyte memory budget on a typical OpenSwath workflow. All quantities are byte counts unless otherwise noted.

Collaboration diagram for OpenSwathWorkflowScheduler::Options:
[legend]
Class Members
Size avg_transitions_per_swath = 0 Average number of chromatograms extracted per non-MS1 SWATH. 0 disables transition-density accounting (the extra term is dropped from the per-SWATH byte estimate).
UInt64 bytes_per_chromatogram_point = 64ULL Estimated bytes retained per extracted chromatogram point. Default 64.
UInt64 bytes_per_spectrum = 600ULL * 1024ULL Estimated memory contribution of one cached spectrum. Default 600 KiB.
int max_concurrent_swaths = -1 User override for concurrent SWATHs. Values > 0 cap the auto-estimate; <= 0 enables memory-based planning constrained only by scoring_threads.
Size max_inner_batch_size = 10000 Largest scoring batch size that may be picked automatically by chooseInnerBatchSize. Default 10000.
double memory_usage_fraction = 0.90 Fraction of the remaining free memory the scheduler may reserve. Internally clamped to [0.05, 0.95] by estimateAvailableMemoryForScoring. Default 0.90.
Size min_inner_batch_size = 2000 Smallest scoring batch size that may be picked automatically by chooseInnerBatchSize. Default 2000.
UInt64 osw_buffer_bytes = 2ULL * 1024ULL * 1024ULL * 1024ULL OSW writer memory reserved outside the scheduler's budget. Default 2 GiB.
UInt64 per_swath_overhead_bytes = 100ULL * 1024ULL * 1024ULL Fixed estimated overhead per loaded SWATH. Default 100 MiB.
Size scoring_threads = 1 Number of scoring worker threads available. When max_concurrent_swaths is <= 0, the auto SWATH concurrency will not exceed this.
Size target_jobs_per_thread = 3 Target number of queued scoring jobs per worker thread for the chooseInnerBatchSize heuristic. Default 3.

◆ OpenMS::OpenSwathWorkflowScheduler::Wave

struct OpenMS::OpenSwathWorkflowScheduler::Wave

A wave: a group of SWATH indices (into the original swath_maps vector) that may be resident at the same time. Populated by planWaves.

Collaboration diagram for OpenSwathWorkflowScheduler::Wave:
[legend]
Class Members
UInt64 estimated_bytes = 0 Sum of the per-SWATH byte estimates (derived from Options) for the indices above.
vector< Size > swath_indices Indices into the original swath_maps vector. Always non-MS1.

Member Function Documentation

◆ chooseInnerBatchSize()

static Size chooseInnerBatchSize ( Size  total_compounds,
Size  active_swaths,
Size  scoring_threads,
int  user_inner_batch_size,
const Options options 
)
static

Pick a scoring batch size that keeps the worker pool occupied.

Behaviour:

  • total_compounds == 0 returns 0.
  • user_inner_batch_size > 0 short-circuits to min(user_inner_batch_size, total_compounds).
  • Otherwise the heuristic targets max(active_swaths, scoring_threads * target_jobs_per_thread) jobs in flight: batch_size = ceil(total_compounds / target_jobs), then clamped to [min_inner_batch_size, max_inner_batch_size] and finally capped at total_compounds.

scoring_threads and active_swaths are each treated as max(1, value) inside the heuristic so callers may pass 0 without surprises.

◆ estimateAvailableMemoryForScoring()

static UInt64 estimateAvailableMemoryForScoring ( const Options options)
static

Estimate memory available for scoring after subtracting OS, I/O, and OSW-writer reserves.

Queries the OS via SysInfo::getFreeSystemMemory, subtracts a hardcoded 2 GiB OS reserve plus a 500 MiB I/O-buffer reserve plus options.osw_buffer_bytes, and returns the remainder multiplied by options.memory_usage_fraction (clamped to [0.05, 0.95]).

Returns 0 when SysInfo::getFreeSystemMemory fails or when the reserves already exceed the reported free memory.

◆ estimateConcurrency()

static ConcurrencyEstimate estimateConcurrency ( const std::vector< OpenSwath::SwathMap > &  swath_maps,
const Options options 
)
static

Derive per-run concurrency limits from a SWATH-map vector and an Options block.

Counts non-MS1 SWATH maps that carry a non-null data pointer, computes the mean spectra count, and derives the per-SWATH byte estimate as avg_spectra * bytes_per_spectrum + per_swath_overhead + (transition-density term). The transition-density term is included only when options.avg_transitions_per_swath and options.bytes_per_chromatogram_point are both non-zero.

The reported ConcurrencyEstimate::max_concurrent_swaths is the minimum of:

  • the number of non-MS1 SWATHs,
  • the memory budget divided by the per-SWATH byte estimate (skipped when either side is zero),
  • options.max_concurrent_swaths when > 0, or max(1, scoring_threads) otherwise. The final value is clamped to [1, non_ms1_swath_count].

When the input vector contains no non-MS1 SWATHs, the returned estimate is default-constructed (all zeros except max_concurrent_swaths which stays at 1).

◆ planWaves()

static std::vector< Wave > planWaves ( const std::vector< OpenSwath::SwathMap > &  swath_maps,
const Options options,
const ConcurrencyEstimate estimate 
)
static

Group non-MS1 SWATHs into waves that simultaneously honour the count and memory caps.

Walks swath_maps in input order, skips MS1 maps, and assigns each remaining SWATH to the current wave. The wave is closed (and a new one started) when either of the following becomes true:

  • the wave already holds estimate.max_concurrent_swaths entries, or
  • adding this SWATH would push the running byte total past estimate.memory_budget_bytes (a budget of 0 is treated as unbounded).

Per-SWATH byte cost uses the actual spectra count when sptr is non-null; otherwise falls back to estimate.avg_spectra_per_swath. Returns waves in the same order their contents appear in the input vector.