Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or https://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef BITCOIN_UTIL_THREADPOOL_H
6 : : #define BITCOIN_UTIL_THREADPOOL_H
7 : :
8 : : #include <sync.h>
9 : : #include <tinyformat.h>
10 : : #include <util/check.h>
11 : : #include <util/expected.h>
12 : : #include <util/thread.h>
13 : :
14 : : #include <algorithm>
15 : : #include <condition_variable>
16 : : #include <functional>
17 : : #include <future>
18 : : #include <queue>
19 : : #include <ranges>
20 : : #include <thread>
21 : : #include <type_traits>
22 : : #include <utility>
23 : : #include <vector>
24 : :
25 : : /**
26 : : * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
27 : : *
28 : : * The thread pool maintains a set of worker threads that consume and execute
29 : : * tasks submitted through Submit(). Once started, tasks can be queued and
30 : : * processed asynchronously until Stop() is called.
31 : : *
32 : : * ### Thread-safety and lifecycle
33 : : * - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
34 : : * Calling `Stop()` from a worker thread will deadlock, as it waits for all
35 : : * workers to join, including the current one.
36 : : *
37 : : * - `Submit()` can be called from any thread, including workers. It safely
38 : : * enqueues new work for execution as long as the pool has active workers.
39 : : *
40 : : * - `Interrupt()` stops new task submission and lets queued ones drain
41 : : * in the background. Callers can continue other shutdown steps and call
42 : : * Stop() at the end to ensure no remaining tasks are left to execute.
43 : : *
44 : : * - `Stop()` prevents further task submission and blocks until all the
45 : : * queued ones are completed.
46 : : */
47 : : class ThreadPool
48 : : {
49 : : private:
50 : : std::string m_name;
51 : : Mutex m_mutex;
52 : : std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
53 : : std::condition_variable m_cv;
54 : : // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
55 : : // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
56 : : // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
57 : : bool m_interrupt GUARDED_BY(m_mutex){false};
58 : : std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
59 : :
60 : 149 : void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
61 : : {
62 : 149 : WAIT_LOCK(m_mutex, wait_lock);
63 : 1033 : for (;;) {
64 [ + + ]: 591 : std::packaged_task<void()> task;
65 : 591 : {
66 : : // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
67 [ + + + + ]: 591 : if (!m_interrupt && m_work_queue.empty()) {
68 : : // Block until the pool is interrupted or a task is available.
69 [ + + + + : 565 : m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
+ - ]
70 : : }
71 : :
72 : : // If stopped and no work left, exit worker
73 [ + + + + ]: 591 : if (m_interrupt && m_work_queue.empty()) {
74 : 298 : return;
75 : : }
76 : :
77 : 442 : task = std::move(m_work_queue.front());
78 : 442 : m_work_queue.pop();
79 : : }
80 : :
81 : 442 : {
82 : : // Execute the task without the lock
83 [ + - ]: 442 : REVERSE_LOCK(wait_lock, m_mutex);
84 [ + - ]: 442 : task();
85 : 442 : }
86 [ + - ]: 591 : }
87 : 149 : }
88 : :
89 : : public:
90 [ - + + - ]: 340 : explicit ThreadPool(const std::string& name) : m_name(name) {}
91 : :
92 : 170 : ~ThreadPool()
93 : : {
94 : 170 : Stop(); // In case it hasn't been stopped.
95 : 170 : }
96 : :
97 : : /**
98 : : * @brief Start worker threads.
99 : : *
100 : : * Creates and launches `num_workers` threads that begin executing tasks
101 : : * from the queue. If the pool is already started, throws.
102 : : *
103 : : * Must be called from a controller (non-worker) thread.
104 : : */
105 : 19 : void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
106 : : {
107 [ - + ]: 19 : assert(num_workers > 0);
108 : 19 : LOCK(m_mutex);
109 [ + + + - ]: 19 : if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping");
110 [ - + - - ]: 17 : if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
111 : :
112 : : // Create workers
113 [ + - ]: 17 : m_workers.reserve(num_workers);
114 [ + + ]: 166 : for (int i = 0; i < num_workers; i++) {
115 [ + - + - ]: 447 : m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
116 : : }
117 : 17 : }
118 : :
119 : : /**
120 : : * @brief Stop all worker threads and wait for them to exit.
121 : : *
122 : : * Sets the interrupt flag, wakes all waiting workers, and joins them.
123 : : * Any remaining tasks in the queue will be processed before returning.
124 : : *
125 : : * Must be called from a controller (non-worker) thread.
126 : : * Concurrent calls to Start() will be rejected while Stop() is in progress.
127 : : */
128 : 182 : void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
129 : : {
130 : : // Notify workers and join them
131 : 182 : std::vector<std::thread> threads_to_join;
132 : 182 : {
133 [ + - ]: 182 : LOCK(m_mutex);
134 : : // Ensure Stop() is not called from a worker thread while workers are still registered,
135 : : // otherwise a self-join deadlock would occur.
136 : 182 : auto id = std::this_thread::get_id();
137 [ - + + + ]: 331 : for (const auto& worker : m_workers) assert(worker.get_id() != id);
138 : : // Early shutdown to return right away on any concurrent Submit() call
139 : 182 : m_interrupt = true;
140 [ + - ]: 182 : threads_to_join.swap(m_workers);
141 : 182 : }
142 : 182 : m_cv.notify_all();
143 : : // Help draining queue
144 [ + - + + ]: 385 : while (ProcessTask()) {}
145 : : // Free resources
146 [ + - + + ]: 331 : for (auto& worker : threads_to_join) worker.join();
147 : :
148 : : // Since we currently wait for tasks completion, sanity-check empty queue
149 [ + - ]: 182 : LOCK(m_mutex);
150 [ + - ]: 182 : Assume(m_work_queue.empty());
151 : : // Re-allow Start() now that all workers have exited
152 [ + - ]: 182 : m_interrupt = false;
153 : 182 : }
154 : :
155 : : enum class SubmitError {
156 : : Inactive,
157 : : Interrupted,
158 : : };
159 : :
160 : : template <class F>
161 : : using Future = std::future<std::invoke_result_t<F>>;
162 : :
163 : : template <class R>
164 : : using RangeFuture = Future<std::ranges::range_reference_t<R>>;
165 : :
166 : : template <class F>
167 : : using PackagedTask = std::packaged_task<std::invoke_result_t<F>()>;
168 : :
169 : : /**
170 : : * @brief Enqueues a new task for asynchronous execution.
171 : : *
172 : : * @param fn Callable to execute asynchronously.
173 : : * @return On success, a future containing fn's result.
174 : : * On failure, an error indicating why the task was rejected:
175 : : * - SubmitError::Inactive: Pool has no workers (never started or already stopped).
176 : : * - SubmitError::Interrupted: Pool task acceptance has been interrupted.
177 : : *
178 : : * Thread-safe: Can be called from any thread, including within the provided 'fn' callable.
179 : : *
180 : : * @warning Ignoring the returned future requires guarding the task against
181 : : * uncaught exceptions, as they would otherwise be silently discarded.
182 : : */
183 : : template <class F>
184 : 388 : [[nodiscard]] util::Expected<Future<F>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
185 : : {
186 : 388 : PackagedTask<F> task{std::forward<F>(fn)};
187 : 388 : auto future{task.get_future()};
188 : : {
189 : 388 : LOCK(m_mutex);
190 [ + + ]: 388 : if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191 [ + + ]: 385 : if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192 : :
193 [ + - ]: 383 : m_work_queue.emplace(std::move(task));
194 : : }
195 : 383 : m_cv.notify_one();
196 : 383 : return {std::move(future)};
197 : 388 : }
198 : :
199 : : /**
200 : : * @brief Enqueues a range of tasks for asynchronous execution.
201 : : *
202 : : * @param fns Callables to execute asynchronously.
203 : : * @return On success, a vector of futures containing each element of fns's result in order.
204 : : * On failure, an error indicating why the range was rejected:
205 : : * - SubmitError::Inactive: Pool has no workers (never started or already stopped).
206 : : * - SubmitError::Interrupted: Pool task acceptance has been interrupted.
207 : : *
208 : : * This is more efficient when submitting many tasks at once, since
209 : : * the queue lock is only taken once internally and all worker threads are
210 : : * notified. For single tasks, Submit() is preferred since only one worker
211 : : * thread is notified.
212 : : *
213 : : * Thread-safe: Can be called from any thread, including within submitted callables.
214 : : *
215 : : * @warning Ignoring the returned futures requires guarding tasks against
216 : : * uncaught exceptions, as they would otherwise be silently discarded.
217 : : */
218 : : template <std::ranges::sized_range R>
219 : : requires(!std::is_lvalue_reference_v<R>)
220 : 4 : [[nodiscard]] util::Expected<std::vector<RangeFuture<R>>, SubmitError> Submit(R&& fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
221 : : {
222 [ - + ]: 4 : std::vector<RangeFuture<R>> futures;
223 : 4 : futures.reserve(std::ranges::size(fns));
224 : :
225 : : {
226 : 4 : LOCK(m_mutex);
227 [ + + ]: 4 : if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
228 [ + + ]: 3 : if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
229 [ + + ]: 102 : for (auto&& fn : fns) {
230 : 100 : PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)};
231 [ - + ]: 100 : futures.emplace_back(task.get_future());
232 : 100 : m_work_queue.emplace(std::move(task));
233 : : }
234 : 2 : }
235 : 2 : m_cv.notify_all();
236 : 2 : return {std::move(futures)};
237 : 4 : }
238 : :
239 : : /**
240 : : * @brief Execute a single queued task synchronously.
241 : : * Removes one task from the queue and executes it on the calling thread.
242 : : */
243 : 223 : bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
244 : : {
245 [ + - ]: 223 : std::packaged_task<void()> task;
246 : 223 : {
247 [ + - ]: 223 : LOCK(m_mutex);
248 [ + + + - ]: 223 : if (m_work_queue.empty()) return false;
249 : :
250 : : // Pop the task
251 : 41 : task = std::move(m_work_queue.front());
252 [ + - ]: 41 : m_work_queue.pop();
253 : 182 : }
254 [ + - ]: 41 : task();
255 : : return true;
256 : 223 : }
257 : :
258 : : /**
259 : : * @brief Stop accepting new tasks and begin asynchronous shutdown.
260 : : *
261 : : * Wakes all worker threads so they can drain the queue and exit.
262 : : * Unlike Stop(), this function does not wait for threads to finish.
263 : : *
264 : : * Note: The next step in the pool lifecycle is calling Stop(), which
265 : : * releases any dangling resources and resets the pool state
266 : : * for shutdown or restart.
267 : : */
268 : 5 : void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
269 : : {
270 [ + - ]: 10 : WITH_LOCK(m_mutex, m_interrupt = true);
271 : 5 : m_cv.notify_all();
272 : 5 : }
273 : :
274 : 16 : size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
275 : : {
276 [ # # # # : 32 : return WITH_LOCK(m_mutex, return m_work_queue.size());
# # ][ + -
+ - + - +
- + - + -
- + + - ]
277 : : }
278 : :
279 : 54 : size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
280 : : {
281 [ + - + - : 108 : return WITH_LOCK(m_mutex, return m_workers.size());
+ - + - -
+ + - ]
282 : : }
283 : : };
284 : :
285 : 7 : constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
286 [ + + - ]: 7 : switch (err) {
287 : 4 : case ThreadPool::SubmitError::Inactive:
288 : 4 : return "No active workers";
289 : 3 : case ThreadPool::SubmitError::Interrupted:
290 : 3 : return "Interrupted";
291 : : }
292 : 0 : Assume(false); // Unreachable
293 : 0 : return "Unknown error";
294 : : }
295 : :
296 : : #endif // BITCOIN_UTIL_THREADPOOL_H
|