Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or https://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef BITCOIN_UTIL_THREADPOOL_H
6 : : #define BITCOIN_UTIL_THREADPOOL_H
7 : :
8 : : #include <sync.h>
9 : : #include <tinyformat.h>
10 : : #include <util/expected.h>
11 : : #include <util/check.h>
12 : : #include <util/thread.h>
13 : :
14 : : #include <algorithm>
15 : : #include <condition_variable>
16 : : #include <functional>
17 : : #include <future>
18 : : #include <queue>
19 : : #include <thread>
20 : : #include <type_traits>
21 : : #include <utility>
22 : : #include <vector>
23 : :
24 : : /**
25 : : * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
26 : : *
27 : : * The thread pool maintains a set of worker threads that consume and execute
28 : : * tasks submitted through Submit(). Once started, tasks can be queued and
29 : : * processed asynchronously until Stop() is called.
30 : : *
31 : : * ### Thread-safety and lifecycle
32 : : * - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
33 : : * Calling `Stop()` from a worker thread will deadlock, as it waits for all
34 : : * workers to join, including the current one.
35 : : *
36 : : * - `Submit()` can be called from any thread, including workers. It safely
37 : : * enqueues new work for execution as long as the pool has active workers.
38 : : *
39 : : * - `Interrupt()` stops new task submission and lets queued ones drain
40 : : * in the background. Callers can continue other shutdown steps and call
41 : : * Stop() at the end to ensure no remaining tasks are left to execute.
42 : : *
43 : : * - `Stop()` prevents further task submission and blocks until all the
44 : : * queued ones are completed.
45 : : */
46 : : class ThreadPool
47 : : {
48 : : private:
49 : : std::string m_name;
50 : : Mutex m_mutex;
51 : : std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
52 : : std::condition_variable m_cv;
53 : : // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
54 : : // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
55 : : // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
56 : : bool m_interrupt GUARDED_BY(m_mutex){false};
57 : : std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
58 : :
59 : 3 : void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
60 : : {
61 : 3 : WAIT_LOCK(m_mutex, wait_lock);
62 : 38761 : for (;;) {
63 [ + - ]: 19382 : std::packaged_task<void()> task;
64 : 19382 : {
65 : : // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
66 [ + - + + ]: 19382 : if (!m_interrupt && m_work_queue.empty()) {
67 : : // Block until the pool is interrupted or a task is available.
68 [ + + + + : 16833 : m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
+ - ]
69 : : }
70 : :
71 : : // If stopped and no work left, exit worker
72 [ + + - + ]: 19382 : if (m_interrupt && m_work_queue.empty()) {
73 : 6 : return;
74 : : }
75 : :
76 : 19379 : task = std::move(m_work_queue.front());
77 : 19379 : m_work_queue.pop();
78 : : }
79 : :
80 : 19379 : {
81 : : // Execute the task without the lock
82 [ + - ]: 19379 : REVERSE_LOCK(wait_lock, m_mutex);
83 [ + - ]: 19379 : task();
84 : 19379 : }
85 [ + - ]: 19382 : }
86 : 3 : }
87 : :
88 : : public:
89 [ - + + - ]: 912 : explicit ThreadPool(const std::string& name) : m_name(name) {}
90 : :
91 : 456 : ~ThreadPool()
92 : : {
93 : 456 : Stop(); // In case it hasn't been stopped.
94 : 456 : }
95 : :
96 : : /**
97 : : * @brief Start worker threads.
98 : : *
99 : : * Creates and launches `num_workers` threads that begin executing tasks
100 : : * from the queue. If the pool is already started, throws.
101 : : *
102 : : * Must be called from a controller (non-worker) thread.
103 : : */
104 : 1 : void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
105 : : {
106 [ - + ]: 1 : assert(num_workers > 0);
107 : 1 : LOCK(m_mutex);
108 [ - + - - ]: 1 : if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping");
109 [ - + - - ]: 1 : if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
110 : :
111 : : // Create workers
112 [ + - ]: 1 : m_workers.reserve(num_workers);
113 [ + + ]: 4 : for (int i = 0; i < num_workers; i++) {
114 [ + - + - ]: 9 : m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
115 : : }
116 : 1 : }
117 : :
118 : : /**
119 : : * @brief Stop all worker threads and wait for them to exit.
120 : : *
121 : : * Sets the interrupt flag, wakes all waiting workers, and joins them.
122 : : * Any remaining tasks in the queue will be processed before returning.
123 : : *
124 : : * Must be called from a controller (non-worker) thread.
125 : : * Concurrent calls to Start() will be rejected while Stop() is in progress.
126 : : */
127 : 456 : void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
128 : : {
129 : : // Notify workers and join them
130 : 456 : std::vector<std::thread> threads_to_join;
131 : 456 : {
132 [ + - ]: 456 : LOCK(m_mutex);
133 : : // Ensure Stop() is not called from a worker thread while workers are still registered,
134 : : // otherwise a self-join deadlock would occur.
135 : 456 : auto id = std::this_thread::get_id();
136 [ - + + + ]: 459 : for (const auto& worker : m_workers) assert(worker.get_id() != id);
137 : : // Early shutdown to return right away on any concurrent Submit() call
138 : 456 : m_interrupt = true;
139 [ + - ]: 456 : threads_to_join.swap(m_workers);
140 : 456 : }
141 : 456 : m_cv.notify_all();
142 : : // Help draining queue
143 [ + - - + ]: 912 : while (ProcessTask()) {}
144 : : // Free resources
145 [ + - + + ]: 459 : for (auto& worker : threads_to_join) worker.join();
146 : :
147 : : // Since we currently wait for tasks completion, sanity-check empty queue
148 [ + - ]: 456 : LOCK(m_mutex);
149 [ - + ]: 456 : Assume(m_work_queue.empty());
150 : : // Re-allow Start() now that all workers have exited
151 [ + - ]: 456 : m_interrupt = false;
152 : 456 : }
153 : :
154 : : enum class SubmitError {
155 : : Inactive,
156 : : Interrupted,
157 : : };
158 : :
159 : : /**
160 : : * @brief Enqueues a new task for asynchronous execution.
161 : : *
162 : : * @param fn Callable to execute asynchronously.
163 : : * @return On success, a future containing fn's result.
164 : : * On failure, an error indicating why the task was rejected:
165 : : * - SubmitError::Inactive: Pool has no workers (never started or already stopped).
166 : : * - SubmitError::Interrupted: Pool task acceptance has been interrupted.
167 : : *
168 : : * Thread-safe: Can be called from any thread, including within the provided 'fn' callable.
169 : : *
170 : : * @warning Ignoring the returned future requires guarding the task against
171 : : * uncaught exceptions, as they would otherwise be silently discarded.
172 : : */
173 : : template <class F>
174 : 19379 : [[nodiscard]] util::Expected<std::future<std::invoke_result_t<F>>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
175 : : {
176 : 19379 : std::packaged_task<std::invoke_result_t<F>()> task{std::forward<F>(fn)};
177 : 19379 : auto future{task.get_future()};
178 : : {
179 : 19379 : LOCK(m_mutex);
180 [ - + ]: 19379 : if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
181 [ - + ]: 19379 : if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
182 : :
183 [ + - ]: 19379 : m_work_queue.emplace(std::move(task));
184 : 0 : }
185 : 19379 : m_cv.notify_one();
186 : 19379 : return {std::move(future)};
187 : 19379 : }
188 : :
189 : : /**
190 : : * @brief Execute a single queued task synchronously.
191 : : * Removes one task from the queue and executes it on the calling thread.
192 : : */
193 : 456 : bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
194 : : {
195 [ + - ]: 456 : std::packaged_task<void()> task;
196 : 456 : {
197 [ + - ]: 456 : LOCK(m_mutex);
198 [ + - + - ]: 456 : if (m_work_queue.empty()) return false;
199 : :
200 : : // Pop the task
201 : 0 : task = std::move(m_work_queue.front());
202 [ # # ]: 0 : m_work_queue.pop();
203 : 456 : }
204 [ # # ]: 0 : task();
205 : : return true;
206 : 456 : }
207 : :
208 : : /**
209 : : * @brief Stop accepting new tasks and begin asynchronous shutdown.
210 : : *
211 : : * Wakes all worker threads so they can drain the queue and exit.
212 : : * Unlike Stop(), this function does not wait for threads to finish.
213 : : *
214 : : * Note: The next step in the pool lifecycle is calling Stop(), which
215 : : * releases any dangling resources and resets the pool state
216 : : * for shutdown or restart.
217 : : */
218 : 0 : void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
219 : : {
220 [ # # ]: 0 : WITH_LOCK(m_mutex, m_interrupt = true);
221 : 0 : m_cv.notify_all();
222 : 0 : }
223 : :
224 : 78 : size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
225 : : {
226 [ + - - + : 156 : return WITH_LOCK(m_mutex, return m_work_queue.size());
+ - ]
227 : : }
228 : :
229 : 78 : size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
230 : : {
231 [ + - - + : 156 : return WITH_LOCK(m_mutex, return m_workers.size());
+ - ]
232 : : }
233 : : };
234 : :
235 : 0 : constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
236 [ # # # ]: 0 : switch (err) {
237 : 0 : case ThreadPool::SubmitError::Inactive:
238 : 0 : return "No active workers";
239 : 0 : case ThreadPool::SubmitError::Interrupted:
240 : 0 : return "Interrupted";
241 : : }
242 : 0 : Assume(false); // Unreachable
243 : 0 : return "Unknown error";
244 : : }
245 : :
246 : : #endif // BITCOIN_UTIL_THREADPOOL_H
|