OpenMS
Loading...
Searching...
No Matches
OpenSwathWorkflowScheduler.h
Go to the documentation of this file.
1// Copyright (c) 2002-present, OpenMS Inc. -- EKU Tuebingen, ETH Zurich, and FU Berlin
2// SPDX-License-Identifier: BSD-3-Clause
3//
4// --------------------------------------------------------------------------
5// $Maintainer: Justin Sing $
6// $Authors: Justin Sing $
7// --------------------------------------------------------------------------
8
9#pragma once
10
13
14#include <condition_variable>
15#include <mutex>
16#include <vector>
17
18namespace OpenMS
19{
34 class OPENMS_DLLAPI OpenSwathWorkflowScheduler
35 {
36 public:
43 struct OPENMS_DLLAPI Options
44 {
46 UInt64 osw_buffer_bytes = 2ULL * 1024ULL * 1024ULL * 1024ULL;
48 double memory_usage_fraction = 0.90;
50 UInt64 bytes_per_spectrum = 600ULL * 1024ULL;
52 UInt64 per_swath_overhead_bytes = 100ULL * 1024ULL * 1024ULL;
54 Size avg_transitions_per_swath = 0;
56 UInt64 bytes_per_chromatogram_point = 64ULL;
58 int max_concurrent_swaths = -1;
60 Size scoring_threads = 1;
62 Size min_inner_batch_size = 2000;
64 Size max_inner_batch_size = 10000;
66 Size target_jobs_per_thread = 3;
67 };
68
70 struct OPENMS_DLLAPI ConcurrencyEstimate
71 {
72 Size non_ms1_swath_count = 0;
73 Size avg_spectra_per_swath = 0;
74 Size max_concurrent_swaths = 1;
75 UInt64 memory_budget_bytes = 0;
76 UInt64 estimated_bytes_per_swath = 0;
77 };
78
80 struct OPENMS_DLLAPI Wave
81 {
82 std::vector<Size> swath_indices;
83 UInt64 estimated_bytes = 0;
84 };
85
98
118 const std::vector<OpenSwath::SwathMap>& swath_maps,
119 const Options& options);
120
135 static std::vector<Wave> planWaves(
136 const std::vector<OpenSwath::SwathMap>& swath_maps,
137 const Options& options,
138 const ConcurrencyEstimate& estimate);
139
155 Size total_compounds,
156 Size active_swaths,
157 Size scoring_threads,
158 int user_inner_batch_size,
159 const Options& options);
160
173 class OPENMS_DLLAPI ConcurrencyLimiter
174 {
175 public:
177 explicit ConcurrencyLimiter(Size max_concurrent_swaths);
182
183 private:
185 Size active_swaths_ = 0;
186 std::mutex mutex_;
187 std::condition_variable cv_;
188 };
189
198 class OPENMS_DLLAPI ScopedSlot
199 {
200 public:
202 explicit ScopedSlot(ConcurrencyLimiter* limiter);
205
206 ScopedSlot(const ScopedSlot&) = delete;
207 ScopedSlot& operator=(const ScopedSlot&) = delete;
208
209 private:
211 };
212 };
213}
Counting semaphore for paths that still stream one SWATH per worker.
Definition OpenSwathWorkflowScheduler.h:174
Size max_concurrent_swaths_
Definition OpenSwathWorkflowScheduler.h:184
ConcurrencyLimiter(Size max_concurrent_swaths)
Construct with the upper bound on concurrent slots; values < 1 are clamped to 1.
std::mutex mutex_
Definition OpenSwathWorkflowScheduler.h:186
std::condition_variable cv_
Definition OpenSwathWorkflowScheduler.h:187
void releaseSlot()
Release a previously reserved slot and notify one waiter. No-op (safe) if the active count is already...
void acquireSlot()
Block until a slot is free and reserve it. Wakes via the condition variable.
RAII slot guard for ConcurrencyLimiter — acquires on construction, releases on destruction.
Definition OpenSwathWorkflowScheduler.h:199
ScopedSlot(ConcurrencyLimiter *limiter)
Acquire a slot from limiter (blocking). If limiter is nullptr, the constructor is a no-op.
ConcurrencyLimiter * limiter_
Definition OpenSwathWorkflowScheduler.h:210
~ScopedSlot()
Release the slot back to the limiter (no-op if constructed with nullptr).
ScopedSlot & operator=(const ScopedSlot &)=delete
Plans memory-bounded OpenSwathWorkflow work waves and exposes a small concurrency-limiter pair.
Definition OpenSwathWorkflowScheduler.h:35
static std::vector< Wave > planWaves(const std::vector< OpenSwath::SwathMap > &swath_maps, const Options &options, const ConcurrencyEstimate &estimate)
Group non-MS1 SWATHs into waves that simultaneously honour the count and memory caps.
std::vector< Size > swath_indices
Indices into the original swath_maps vector. Always non-MS1.
Definition OpenSwathWorkflowScheduler.h:82
static UInt64 estimateAvailableMemoryForScoring(const Options &options)
Estimate memory available for scoring after subtracting OS, I/O, and OSW-writer reserves.
static Size chooseInnerBatchSize(Size total_compounds, Size active_swaths, Size scoring_threads, int user_inner_batch_size, const Options &options)
Pick a scoring batch size that keeps the worker pool occupied.
static ConcurrencyEstimate estimateConcurrency(const std::vector< OpenSwath::SwathMap > &swath_maps, const Options &options)
Derive per-run concurrency limits from a SWATH-map vector and an Options block.
Derived scheduler limits computed by estimateConcurrency for one extraction/scoring run.
Definition OpenSwathWorkflowScheduler.h:71
Memory and concurrency knobs used by wave planning and batch sizing.
Definition OpenSwathWorkflowScheduler.h:44
A wave: a group of SWATH indices (into the original swath_maps vector) that may be resident at the sa...
Definition OpenSwathWorkflowScheduler.h:81
uint64_t UInt64
Unsigned integer type (64bit)
Definition Types.h:47
size_t Size
Size type e.g. used as variable which can hold result of size()
Definition Types.h:97
Main OpenMS namespace.
Definition openswathalgo/include/OpenMS/OPENSWATHALGO/DATAACCESS/ISpectrumAccess.h:19