Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or https://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef BITCOIN_UTIL_THREADPOOL_H
6 : : #define BITCOIN_UTIL_THREADPOOL_H
7 : :
8 : : #include <sync.h>
9 : : #include <tinyformat.h>
10 : : #include <util/check.h>
11 : : #include <util/thread.h>
12 : :
13 : : #include <algorithm>
14 : : #include <condition_variable>
15 : : #include <functional>
16 : : #include <future>
17 : : #include <queue>
18 : : #include <stdexcept>
19 : : #include <thread>
20 : : #include <utility>
21 : : #include <vector>
22 : :
23 : : /**
24 : : * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
25 : : *
26 : : * The thread pool maintains a set of worker threads that consume and execute
27 : : * tasks submitted through Submit(). Once started, tasks can be queued and
28 : : * processed asynchronously until Stop() is called.
29 : : *
30 : : * ### Thread-safety and lifecycle
31 : : * - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
32 : : * Calling `Stop()` from a worker thread will deadlock, as it waits for all
33 : : * workers to join, including the current one.
34 : : *
35 : : * - `Submit()` can be called from any thread, including workers. It safely
36 : : * enqueues new work for execution as long as the pool has active workers.
37 : : *
38 : : * - `Interrupt()` stops new task submission and lets queued ones drain
39 : : * in the background. Callers can continue other shutdown steps and call
40 : : * Stop() at the end to ensure no remaining tasks are left to execute.
41 : : *
42 : : * - `Stop()` prevents further task submission and blocks until all the
43 : : * queued ones are completed.
44 : : */
45 : : class ThreadPool
46 : : {
47 : : private:
48 : : std::string m_name;
49 : : Mutex m_mutex;
50 : : std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
51 : : std::condition_variable m_cv;
52 : : // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
53 : : // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
54 : : // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
55 : : bool m_interrupt GUARDED_BY(m_mutex){false};
56 : : std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
57 : :
58 : 106 : void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
59 : : {
60 : 106 : WAIT_LOCK(m_mutex, wait_lock);
61 : 688 : for (;;) {
62 [ + + ]: 397 : std::packaged_task<void()> task;
63 : 397 : {
64 : : // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
65 [ + + + + ]: 397 : if (!m_interrupt && m_work_queue.empty()) {
66 : : // Block until the pool is interrupted or a task is available.
67 [ + + + + : 612 : m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
+ - ]
68 : : }
69 : :
70 : : // If stopped and no work left, exit worker
71 [ + + + + ]: 397 : if (m_interrupt && m_work_queue.empty()) {
72 : 212 : return;
73 : : }
74 : :
75 : 291 : task = std::move(m_work_queue.front());
76 : 291 : m_work_queue.pop();
77 : : }
78 : :
79 : 291 : {
80 : : // Execute the task without the lock
81 [ + - ]: 291 : REVERSE_LOCK(wait_lock, m_mutex);
82 [ + - ]: 291 : task();
83 : 291 : }
84 [ + - ]: 397 : }
85 : 106 : }
86 : :
87 : : public:
88 [ - + + - ]: 318 : explicit ThreadPool(const std::string& name) : m_name(name) {}
89 : :
90 : 159 : ~ThreadPool()
91 : : {
92 : 159 : Stop(); // In case it hasn't been stopped.
93 : 159 : }
94 : :
95 : : /**
96 : : * @brief Start worker threads.
97 : : *
98 : : * Creates and launches `num_workers` threads that begin executing tasks
99 : : * from the queue. If the pool is already started, throws.
100 : : *
101 : : * Must be called from a controller (non-worker) thread.
102 : : */
103 : 11 : void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
104 : : {
105 [ - + ]: 11 : assert(num_workers > 0);
106 : 11 : LOCK(m_mutex);
107 [ - + - - ]: 11 : if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
108 : 11 : m_interrupt = false; // Reset
109 : :
110 : : // Create workers
111 [ + - ]: 11 : m_workers.reserve(num_workers);
112 [ + + ]: 117 : for (int i = 0; i < num_workers; i++) {
113 [ + - + - ]: 318 : m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
114 : : }
115 : 11 : }
116 : :
117 : : /**
118 : : * @brief Stop all worker threads and wait for them to exit.
119 : : *
120 : : * Sets the interrupt flag, wakes all waiting workers, and joins them.
121 : : * Any remaining tasks in the queue will be processed before returning.
122 : : *
123 : : * Must be called from a controller (non-worker) thread.
124 : : */
125 : 166 : void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
126 : : {
127 : : // Notify workers and join them
128 : 166 : std::vector<std::thread> threads_to_join;
129 : 166 : {
130 [ + - ]: 166 : LOCK(m_mutex);
131 : : // Ensure Stop() is not called from a worker thread while workers are still registered,
132 : : // otherwise a self-join deadlock would occur.
133 : 166 : auto id = std::this_thread::get_id();
134 [ - + + + ]: 272 : for (const auto& worker : m_workers) assert(worker.get_id() != id);
135 : : // Early shutdown to return right away on any concurrent Submit() call
136 : 166 : m_interrupt = true;
137 [ + - ]: 166 : threads_to_join.swap(m_workers);
138 : 166 : }
139 : 166 : m_cv.notify_all();
140 [ + - + + ]: 272 : for (auto& worker : threads_to_join) worker.join();
141 : : // Since we currently wait for tasks completion, sanity-check empty queue
142 [ + - + - ]: 332 : WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
143 : : // Note: m_interrupt is left true until next Start()
144 : 166 : }
145 : :
146 : : /**
147 : : * @brief Enqueues a new task for asynchronous execution.
148 : : *
149 : : * Returns a `std::future` that provides the task's result or propagates
150 : : * any exception it throws.
151 : : * Note: Ignoring the returned future requires guarding the task against
152 : : * uncaught exceptions, as they would otherwise be silently discarded.
153 : : */
154 : : template <class F> [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
155 : 313 : auto Submit(F&& fn)
156 : : {
157 : 313 : std::packaged_task task{std::forward<F>(fn)};
158 [ + - ]: 313 : auto future{task.get_future()};
159 : : {
160 [ + - ]: 313 : LOCK(m_mutex);
161 [ + + + + ]: 313 : if (m_interrupt || m_workers.empty()) {
162 [ + - ]: 2 : throw std::runtime_error("No active workers; cannot accept new tasks");
163 : : }
164 [ + - + - ]: 311 : m_work_queue.emplace(std::move(task));
165 : 2 : }
166 : 311 : m_cv.notify_one();
167 : 311 : return future;
168 : 313 : }
169 : :
170 : : /**
171 : : * @brief Execute a single queued task synchronously.
172 : : * Removes one task from the queue and executes it on the calling thread.
173 : : */
174 : 20 : void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
175 : : {
176 [ + - ]: 20 : std::packaged_task<void()> task;
177 : 20 : {
178 [ + - ]: 20 : LOCK(m_mutex);
179 [ - + - - ]: 20 : if (m_work_queue.empty()) return;
180 : :
181 : : // Pop the task
182 : 20 : task = std::move(m_work_queue.front());
183 [ + - ]: 20 : m_work_queue.pop();
184 : 20 : }
185 [ + - ]: 20 : task();
186 : 20 : }
187 : :
188 : : /**
189 : : * @brief Stop accepting new tasks and begin asynchronous shutdown.
190 : : *
191 : : * Wakes all worker threads so they can drain the queue and exit.
192 : : * Unlike Stop(), this function does not wait for threads to finish.
193 : : */
194 : 3 : void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
195 : : {
196 [ + - ]: 6 : WITH_LOCK(m_mutex, m_interrupt = true);
197 : 3 : m_cv.notify_all();
198 : 3 : }
199 : :
200 : 4 : size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
201 : : {
202 [ + - + - : 8 : return WITH_LOCK(m_mutex, return m_work_queue.size());
+ - + - -
+ + - ][ #
# # # #
# ]
203 : : }
204 : :
205 : 3 : size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
206 : : {
207 [ + - + - : 6 : return WITH_LOCK(m_mutex, return m_workers.size());
+ - - + +
- ]
208 : : }
209 : : };
210 : :
211 : : #endif // BITCOIN_UTIL_THREADPOOL_H
|