Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or https://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef BITCOIN_UTIL_THREADPOOL_H
6 : : #define BITCOIN_UTIL_THREADPOOL_H
7 : :
8 : : #include <sync.h>
9 : : #include <tinyformat.h>
10 : : #include <util/expected.h>
11 : : #include <util/check.h>
12 : : #include <util/thread.h>
13 : :
14 : : #include <algorithm>
15 : : #include <condition_variable>
16 : : #include <functional>
17 : : #include <future>
18 : : #include <queue>
19 : : #include <thread>
20 : : #include <type_traits>
21 : : #include <utility>
22 : : #include <vector>
23 : :
24 : : /**
25 : : * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
26 : : *
27 : : * The thread pool maintains a set of worker threads that consume and execute
28 : : * tasks submitted through Submit(). Once started, tasks can be queued and
29 : : * processed asynchronously until Stop() is called.
30 : : *
31 : : * ### Thread-safety and lifecycle
32 : : * - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
33 : : * Calling `Stop()` from a worker thread will deadlock, as it waits for all
34 : : * workers to join, including the current one.
35 : : *
36 : : * - `Submit()` can be called from any thread, including workers. It safely
37 : : * enqueues new work for execution as long as the pool has active workers.
38 : : *
39 : : * - `Interrupt()` stops new task submission and lets queued ones drain
40 : : * in the background. Callers can continue other shutdown steps and call
41 : : * Stop() at the end to ensure no remaining tasks are left to execute.
42 : : *
43 : : * - `Stop()` prevents further task submission and blocks until all the
44 : : * queued ones are completed.
45 : : */
46 : : class ThreadPool
47 : : {
48 : : private:
49 : : std::string m_name;
50 : : Mutex m_mutex;
51 : : std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
52 : : std::condition_variable m_cv;
53 : : // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
54 : : // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
55 : : // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
56 : : bool m_interrupt GUARDED_BY(m_mutex){false};
57 : : std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
58 : :
59 : 2384 : void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
60 : : {
61 : 2384 : WAIT_LOCK(m_mutex, wait_lock);
62 : 409424 : for (;;) {
63 [ + + ]: 205904 : std::packaged_task<void()> task;
64 : 205904 : {
65 : : // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
66 [ + + + + ]: 205904 : if (!m_interrupt && m_work_queue.empty()) {
67 : : // Block until the pool is interrupted or a task is available.
68 [ + + + + : 611511 : m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
+ - ]
69 : : }
70 : :
71 : : // If stopped and no work left, exit worker
72 [ + + + + ]: 205904 : if (m_interrupt && m_work_queue.empty()) {
73 : 4768 : return;
74 : : }
75 : :
76 : 203520 : task = std::move(m_work_queue.front());
77 : 203520 : m_work_queue.pop();
78 : : }
79 : :
80 : 203520 : {
81 : : // Execute the task without the lock
82 [ + - ]: 203520 : REVERSE_LOCK(wait_lock, m_mutex);
83 [ + - ]: 203520 : task();
84 : 203520 : }
85 [ + - ]: 205904 : }
86 : 2384 : }
87 : :
88 : : public:
89 [ - + + - ]: 2768 : explicit ThreadPool(const std::string& name) : m_name(name) {}
90 : :
91 : 1384 : ~ThreadPool()
92 : : {
93 : 1384 : Stop(); // In case it hasn't been stopped.
94 : 1384 : }
95 : :
96 : : /**
97 : : * @brief Start worker threads.
98 : : *
99 : : * Creates and launches `num_workers` threads that begin executing tasks
100 : : * from the queue. If the pool is already started, throws.
101 : : *
102 : : * Must be called from a controller (non-worker) thread.
103 : : */
104 : 1140 : void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
105 : : {
106 [ - + ]: 1140 : assert(num_workers > 0);
107 : 1140 : LOCK(m_mutex);
108 [ - + - - ]: 1140 : if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
109 : 1140 : m_interrupt = false; // Reset
110 : :
111 : : // Create workers
112 [ + - ]: 1140 : m_workers.reserve(num_workers);
113 [ + + ]: 3524 : for (int i = 0; i < num_workers; i++) {
114 [ + - + - ]: 7152 : m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
115 : : }
116 : 1140 : }
117 : :
118 : : /**
119 : : * @brief Stop all worker threads and wait for them to exit.
120 : : *
121 : : * Sets the interrupt flag, wakes all waiting workers, and joins them.
122 : : * Any remaining tasks in the queue will be processed before returning.
123 : : *
124 : : * Must be called from a controller (non-worker) thread.
125 : : */
126 : 2572 : void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
127 : : {
128 : : // Notify workers and join them
129 : 2572 : std::vector<std::thread> threads_to_join;
130 : 2572 : {
131 [ + - ]: 2572 : LOCK(m_mutex);
132 : : // Ensure Stop() is not called from a worker thread while workers are still registered,
133 : : // otherwise a self-join deadlock would occur.
134 : 2572 : auto id = std::this_thread::get_id();
135 [ - + + + ]: 4956 : for (const auto& worker : m_workers) assert(worker.get_id() != id);
136 : : // Early shutdown to return right away on any concurrent Submit() call
137 : 2572 : m_interrupt = true;
138 [ + - ]: 2572 : threads_to_join.swap(m_workers);
139 : 2572 : }
140 : 2572 : m_cv.notify_all();
141 [ + - + + ]: 4956 : for (auto& worker : threads_to_join) worker.join();
142 : : // Since we currently wait for tasks completion, sanity-check empty queue
143 [ + - + - ]: 5144 : WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
144 : : // Note: m_interrupt is left true until next Start()
145 : 2572 : }
146 : :
147 : : enum class SubmitError {
148 : : Inactive,
149 : : Interrupted,
150 : : };
151 : :
152 : : /**
153 : : * @brief Enqueues a new task for asynchronous execution.
154 : : *
155 : : * @param fn Callable to execute asynchronously.
156 : : * @return On success, a future containing fn's result.
157 : : * On failure, an error indicating why the task was rejected:
158 : : * - SubmitError::Inactive: Pool has no workers (never started or already stopped).
159 : : * - SubmitError::Interrupted: Pool task acceptance has been interrupted.
160 : : *
161 : : * Thread-safe: Can be called from any thread, including within the provided 'fn' callable.
162 : : *
163 : : * @warning Ignoring the returned future requires guarding the task against
164 : : * uncaught exceptions, as they would otherwise be silently discarded.
165 : : */
166 : : template <class F>
167 : 203542 : [[nodiscard]] util::Expected<std::future<std::invoke_result_t<F>>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
168 : : {
169 : 203542 : std::packaged_task<std::invoke_result_t<F>()> task{std::forward<F>(fn)};
170 : 203542 : auto future{task.get_future()};
171 : : {
172 : 203542 : LOCK(m_mutex);
173 [ + + ]: 203542 : if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
174 [ + + ]: 203541 : if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
175 : :
176 [ + - ]: 203540 : m_work_queue.emplace(std::move(task));
177 : 2 : }
178 : 203540 : m_cv.notify_one();
179 : 203540 : return {std::move(future)};
180 : 203542 : }
181 : :
182 : : /**
183 : : * @brief Execute a single queued task synchronously.
184 : : * Removes one task from the queue and executes it on the calling thread.
185 : : */
186 : 20 : void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
187 : : {
188 [ + - ]: 20 : std::packaged_task<void()> task;
189 : 20 : {
190 [ + - ]: 20 : LOCK(m_mutex);
191 [ - + - - ]: 20 : if (m_work_queue.empty()) return;
192 : :
193 : : // Pop the task
194 : 20 : task = std::move(m_work_queue.front());
195 [ + - ]: 20 : m_work_queue.pop();
196 : 20 : }
197 [ + - ]: 20 : task();
198 : 20 : }
199 : :
200 : : /**
201 : : * @brief Stop accepting new tasks and begin asynchronous shutdown.
202 : : *
203 : : * Wakes all worker threads so they can drain the queue and exit.
204 : : * Unlike Stop(), this function does not wait for threads to finish.
205 : : */
206 : 1184 : void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
207 : : {
208 [ + - ]: 2368 : WITH_LOCK(m_mutex, m_interrupt = true);
209 : 1184 : m_cv.notify_all();
210 : 1184 : }
211 : :
212 : 203213 : size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
213 : : {
214 [ + - + + : 406426 : return WITH_LOCK(m_mutex, return m_work_queue.size());
+ - + - -
+ + - ][ #
# # # #
# ]
215 : : }
216 : :
217 : 3 : size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
218 : : {
219 [ + - + - : 6 : return WITH_LOCK(m_mutex, return m_workers.size());
+ - - + +
- ]
220 : : }
221 : : };
222 : :
223 : 2 : constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
224 [ + + - ]: 2 : switch (err) {
225 : 1 : case ThreadPool::SubmitError::Inactive:
226 : 1 : return "No active workers";
227 : 1 : case ThreadPool::SubmitError::Interrupted:
228 : 1 : return "Interrupted";
229 : : }
230 : 0 : Assume(false); // Unreachable
231 : 0 : return "Unknown error";
232 : : }
233 : :
234 : : #endif // BITCOIN_UTIL_THREADPOOL_H
|