Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #include <common/system.h>
6 : : #include <logging.h>
7 : : #include <random.h>
8 : : #include <test/util/common.h>
9 : : #include <util/string.h>
10 : : #include <util/threadpool.h>
11 : : #include <util/time.h>
12 : :
13 : : #include <boost/test/unit_test.hpp>
14 : :
15 : : #include <array>
16 : : #include <functional>
17 : : #include <latch>
18 : : #include <ranges>
19 : : #include <semaphore>
20 : :
21 : : // General test values
22 : : int NUM_WORKERS_DEFAULT = 0;
23 : : constexpr char POOL_NAME[] = "test";
24 : : constexpr auto WAIT_TIMEOUT = 120s;
25 : :
26 : : struct ThreadPoolFixture {
27 : 15 : ThreadPoolFixture() {
28 [ + - ]: 15 : NUM_WORKERS_DEFAULT = FastRandomContext().randrange(GetNumCores()) + 1;
29 : 15 : LogInfo("thread pool workers count: %d", NUM_WORKERS_DEFAULT);
30 : 15 : }
31 : : };
32 : :
33 : : // Test Cases Overview
34 : : // 0) Submit task to a non-started pool.
35 : : // 1) Submit tasks and verify completion.
36 : : // 2) Maintain all threads busy except one.
37 : : // 3) Wait for work to finish.
38 : : // 4) Wait for result object.
39 : : // 5) The task throws an exception, catch must be done in the consumer side.
40 : : // 6) Busy workers, help them by processing tasks externally.
41 : : // 7) Recursive submission of tasks.
42 : : // 8) Submit task when all threads are busy, stop pool and verify task gets executed.
43 : : // 9) Congestion test; create more workers than available cores.
44 : : // 10) Ensure Interrupt() prevents further submissions.
45 : : // 11) Start() must not cause a deadlock when called during Stop().
46 : : // 12) Ensure queued tasks complete after Interrupt().
47 : : // 13) Ensure the Stop() calling thread helps drain the queue.
48 : : // 14) Submit range of tasks in one lock acquisition.
49 : : BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture)
50 : :
51 : : #define WAIT_FOR(futures) \
52 : : do { \
53 : : for (const auto& f : futures) { \
54 : : BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \
55 : : } \
56 : : } while (0)
57 : :
58 : : // Helper to unwrap a valid pool submission
59 : : template <typename F>
60 : 411 : [[nodiscard]] auto Submit(ThreadPool& pool, F&& fn)
61 : : {
62 [ - + ]: 411 : return std::move(*Assert(pool.Submit(std::forward<F>(fn))));
63 : : }
64 : :
65 : : // Block a number of worker threads by submitting tasks that wait on `release_sem`.
66 : : // Returns the futures of the blocking tasks, ensuring all have started and are waiting.
67 : 8 : std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
68 : : {
69 [ - + ]: 8 : assert(threadPool.WorkersCount() >= num_of_threads_to_block);
70 : 8 : std::latch ready{static_cast<std::ptrdiff_t>(num_of_threads_to_block)};
71 : 8 : std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
72 [ + - + + ]: 92 : for (auto& f : blocking_tasks) f = Submit(threadPool, [&] {
73 [ + + ]: 84 : ready.count_down();
74 : 84 : release_sem.acquire();
75 [ - + ]: 84 : });
76 : 8 : ready.wait();
77 : 8 : return blocking_tasks;
78 : 0 : }
79 : :
80 : : // Test 0, submit task to a non-started, interrupted, or stopped pool
81 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
82 : : {
83 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
84 : 1 : const auto fn_empty = [&] {};
85 : :
86 : : // Never started: Inactive
87 : 1 : auto res = threadPool.Submit(fn_empty);
88 [ + - + - : 2 : BOOST_CHECK(!res);
+ - ]
89 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
90 : :
91 : : // Interrupted (workers still alive): Interrupted, and Start() must be rejected too
92 [ + - ]: 1 : std::counting_semaphore<> blocker(0);
93 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
94 [ + - ]: 1 : const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
95 [ + - ]: 1 : threadPool.Interrupt();
96 : 1 : res = threadPool.Submit(fn_empty);
97 [ + - + - : 2 : BOOST_CHECK(!res);
+ - ]
98 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
99 [ + - - + : 2 : BOOST_CHECK_EXCEPTION(threadPool.Start(NUM_WORKERS_DEFAULT), std::runtime_error, HasReason("Thread pool has been interrupted or is stopping"));
- - - - -
+ + - + -
+ - ]
100 : 1 : blocker.release(NUM_WORKERS_DEFAULT);
101 [ + - + - : 4 : WAIT_FOR(blocking_tasks);
+ - + + ]
102 : :
103 : : // Interrupted then stopped: Inactive
104 [ + - ]: 1 : threadPool.Stop();
105 : 1 : res = threadPool.Submit(fn_empty);
106 [ + - + - : 2 : BOOST_CHECK(!res);
+ - ]
107 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
108 : :
109 : : // Started then stopped: Inactive
110 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
111 [ + - ]: 1 : threadPool.Stop();
112 : 1 : res = threadPool.Submit(fn_empty);
113 [ + - + - : 2 : BOOST_CHECK(!res);
+ - ]
114 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
115 : :
116 : 1 : std::vector<std::function<void()>> tasks;
117 : 1 : const auto range_res{threadPool.Submit(std::move(tasks))};
118 [ + - + - : 2 : BOOST_CHECK(!range_res);
+ - ]
119 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "No active workers");
120 : 1 : }
121 : :
122 : : // Test 1, submit tasks and verify completion
123 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
124 : : {
125 : 1 : int num_tasks = 50;
126 : :
127 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
128 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
129 : 1 : std::atomic<int> counter = 0;
130 : :
131 : : // Store futures to ensure completion before checking counter.
132 : 1 : std::vector<std::future<void>> futures;
133 [ + - ]: 1 : futures.reserve(num_tasks);
134 [ + + ]: 51 : for (int i = 1; i <= num_tasks; i++) {
135 [ + - + - ]: 150 : futures.emplace_back(Submit(threadPool, [&counter, i]() {
136 : 50 : counter.fetch_add(i, std::memory_order_relaxed);
137 : : }));
138 : : }
139 : :
140 : : // Wait for all tasks to finish
141 [ + - + - : 51 : WAIT_FOR(futures);
+ - + + ]
142 : 1 : int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
143 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(counter.load(), expected_value);
144 [ + - + - : 1 : BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
+ - ]
145 : 1 : }
146 : :
147 : : // Test 2, maintain all threads busy except one
148 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
149 : : {
150 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
151 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
152 [ + - ]: 1 : std::counting_semaphore<> blocker(0);
153 [ + - ]: 1 : const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT - 1);
154 : :
155 : : // Now execute tasks on the single available worker
156 : : // and check that all the tasks are executed.
157 : 1 : int num_tasks = 15;
158 : 1 : int counter = 0;
159 : :
160 : : // Store futures to wait on
161 [ + - ]: 1 : std::vector<std::future<void>> futures(num_tasks);
162 [ + - - + : 31 : for (auto& f : futures) f = Submit(threadPool, [&counter]{ counter++; });
+ + ]
163 : :
164 [ + - + - : 16 : WAIT_FOR(futures);
+ - + + ]
165 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(counter, num_tasks);
166 : :
167 : 1 : blocker.release(NUM_WORKERS_DEFAULT - 1);
168 [ + - + - : 16 : WAIT_FOR(blocking_tasks);
+ - + + ]
169 [ + - ]: 1 : threadPool.Stop();
170 [ + - + - : 1 : BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
+ - ]
171 : 1 : }
172 : :
173 : : // Test 3, wait for work to finish
174 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(wait_for_task_to_finish)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
175 : : {
176 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
177 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
178 : 1 : std::atomic<bool> flag = false;
179 : 2 : std::future<void> future = Submit(threadPool, [&flag]() {
180 : 1 : UninterruptibleSleep(200ms);
181 : 1 : flag.store(true, std::memory_order_release);
182 [ + - ]: 2 : });
183 [ + - + - : 2 : BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready);
+ - + - ]
184 [ + - + - : 2 : BOOST_CHECK(flag.load(std::memory_order_acquire));
+ - ]
185 : 1 : }
186 : :
187 : : // Test 4, obtain result object
188 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(get_result_from_completed_task)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
189 : : {
190 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
191 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
192 [ + - ]: 1 : std::future<bool> future_bool = Submit(threadPool, []() { return true; });
193 [ + - + - : 2 : BOOST_CHECK(future_bool.get());
+ - + - ]
194 : :
195 [ + - + - ]: 2 : std::future<std::string> future_str = Submit(threadPool, []() { return std::string("true"); });
196 [ + - ]: 1 : std::string result = future_str.get();
197 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(result, "true");
198 [ - + - + ]: 1 : }
199 : :
200 : : // Test 5, throw exception and catch it on the consumer side
201 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
202 : : {
203 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
204 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
205 : :
206 [ + - ]: 10 : const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
207 : :
208 : 1 : const int num_tasks = 5;
209 : 1 : std::vector<std::future<void>> futures;
210 [ + - ]: 1 : futures.reserve(num_tasks);
211 [ + + ]: 6 : for (int i = 0; i < num_tasks; i++) {
212 [ + - + - ]: 20 : futures.emplace_back(Submit(threadPool, [&make_err, i] { throw std::runtime_error(make_err(i)); }));
213 : : }
214 : :
215 [ + + ]: 6 : for (int i = 0; i < num_tasks; i++) {
216 [ + - - + : 10 : BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
- - - - -
+ + - + -
- + + - +
- ]
217 : : }
218 : 1 : }
219 : :
220 : : // Test 6, all workers are busy, help them by processing tasks from outside
221 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
222 : : {
223 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
224 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
225 : :
226 [ + - ]: 1 : std::counting_semaphore<> blocker(0);
227 [ + - ]: 1 : const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
228 : :
229 : : // Now submit tasks and check that none of them are executed.
230 : 1 : int num_tasks = 20;
231 : 1 : std::atomic<int> counter = 0;
232 [ + + ]: 21 : for (int i = 0; i < num_tasks; i++) {
233 [ + - ]: 60 : (void)Submit(threadPool, [&counter]() {
234 : 20 : counter.fetch_add(1, std::memory_order_relaxed);
235 : : });
236 : : }
237 [ + - ]: 1 : UninterruptibleSleep(100ms);
238 [ + - + - : 1 : BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
+ - ]
239 : :
240 : : // Now process manually
241 [ + + ]: 21 : for (int i = 0; i < num_tasks; i++) {
242 [ + - ]: 20 : threadPool.ProcessTask();
243 : : }
244 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(counter.load(), num_tasks);
245 [ + - + - : 1 : BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
+ - ]
246 : 1 : blocker.release(NUM_WORKERS_DEFAULT);
247 [ + - ]: 1 : threadPool.Stop();
248 [ + - + - : 14 : WAIT_FOR(blocking_tasks);
+ - + + ]
249 : 1 : }
250 : :
251 : : // Test 7, submit tasks from other tasks
252 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(recursive_task_submission)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
253 : : {
254 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
255 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
256 : :
257 [ + - ]: 1 : std::promise<void> signal;
258 [ + - ]: 2 : (void)Submit(threadPool, [&]() {
259 [ + - ]: 1 : (void)Submit(threadPool, [&]() {
260 [ - - + - ]: 1 : signal.set_value();
261 : : });
262 : 1 : });
263 : :
264 [ + - + - ]: 2 : signal.get_future().wait();
265 [ + - ]: 1 : threadPool.Stop();
266 : 1 : }
267 : :
268 : : // Test 8, submit task when all threads are busy and then stop the pool
269 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
270 : : {
271 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
272 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
273 : :
274 [ + - ]: 1 : std::counting_semaphore<> blocker(0);
275 [ + - ]: 1 : const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
276 : :
277 : : // Submit an extra task that should execute once a worker is free
278 [ + - ]: 1 : std::future<bool> future = Submit(threadPool, []() { return true; });
279 : :
280 : : // At this point, all workers are blocked, and the extra task is queued
281 [ + - + - : 1 : BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
+ - ]
282 : :
283 : : // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
284 : 2 : std::thread thread_unblocker([&blocker]() {
285 : 1 : UninterruptibleSleep(300ms);
286 : 1 : blocker.release(NUM_WORKERS_DEFAULT);
287 [ + - ]: 2 : });
288 : :
289 : : // Stop the pool while the workers are still blocked
290 [ + - ]: 1 : threadPool.Stop();
291 : :
292 : : // Expect the submitted task to complete
293 [ + - + - : 2 : BOOST_CHECK(future.get());
+ - + - ]
294 [ + - ]: 1 : thread_unblocker.join();
295 : :
296 : : // Obviously all the previously blocking tasks should be completed at this point too
297 [ + - + - : 12 : WAIT_FOR(blocking_tasks);
+ - + + ]
298 : :
299 : : // Pool should be stopped and no workers remaining
300 [ + - + - : 1 : BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
+ - ]
301 [ - + ]: 1 : }
302 : :
303 : : // Test 9, more workers than available cores (congestion test)
304 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
305 : : {
306 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
307 [ + - - + : 1 : threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2×
+ - ]
308 : :
309 : 1 : int num_tasks = 200;
310 : 1 : std::atomic<int> counter{0};
311 : :
312 : 1 : std::vector<std::future<void>> futures;
313 [ + - ]: 1 : futures.reserve(num_tasks);
314 [ + + ]: 201 : for (int i = 0; i < num_tasks; i++) {
315 [ + - + - ]: 600 : futures.emplace_back(Submit(threadPool, [&counter] {
316 : 200 : counter.fetch_add(1, std::memory_order_relaxed);
317 : : }));
318 : : }
319 : :
320 [ + - + - : 201 : WAIT_FOR(futures);
+ - + + ]
321 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(counter.load(), num_tasks);
322 : 1 : }
323 : :
324 : : // Test 10, Interrupt() prevents further submissions
325 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
326 : : {
327 : : // 1) Interrupt from main thread
328 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
329 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
330 [ + - ]: 1 : threadPool.Interrupt();
331 : :
332 : 1 : auto res = threadPool.Submit([]{});
333 [ + - + - : 2 : BOOST_CHECK(!res);
+ - ]
334 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
335 : :
336 : 1 : std::vector<std::function<void()>> tasks;
337 : 1 : const auto range_res{threadPool.Submit(std::move(tasks))};
338 [ + - + - : 2 : BOOST_CHECK(!range_res);
+ - ]
339 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "Interrupted");
340 : :
341 : : // Reset pool
342 [ + - ]: 1 : threadPool.Stop();
343 : :
344 : : // 2) Interrupt() from a worker thread
345 : : // One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks.
346 [ + - ]: 1 : threadPool.Start(/*num_workers=*/3);
347 : 1 : std::atomic<int> counter{0};
348 [ + - ]: 1 : std::counting_semaphore<> blocker(0);
349 [ + - ]: 1 : const auto blocking_tasks = BlockWorkers(threadPool, blocker, 1);
350 [ + - ]: 2 : Submit(threadPool, [&threadPool, &counter]{
351 : 1 : threadPool.Interrupt();
352 : 1 : counter.fetch_add(1, std::memory_order_relaxed);
353 [ + - ]: 2 : }).get();
354 : 1 : blocker.release(1); // unblock worker
355 : :
356 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(counter.load(), 1);
357 [ + - ]: 1 : threadPool.Stop();
358 [ + - + - : 2 : WAIT_FOR(blocking_tasks);
+ - + + ]
359 [ + - + - : 1 : BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
+ - ]
360 : 1 : }
361 : :
362 : : // Test 11, Start() must not cause a deadlock when called during Stop()
363 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(start_mid_stop_does_not_deadlock)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
364 : : {
365 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
366 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
367 : :
368 : : // Keep all workers busy so Stop() gets stuck waiting for them to finish during join()
369 [ + - ]: 1 : std::counting_semaphore<> workers_blocker(0);
370 [ + - ]: 1 : const auto blocking_tasks = BlockWorkers(threadPool, workers_blocker, NUM_WORKERS_DEFAULT);
371 : :
372 [ + - ]: 2 : std::thread stopper_thread([&threadPool] { threadPool.Stop(); });
373 : :
374 : : // Stop() takes ownership of the workers before joining them, so WorkersCount()
375 : : // hits 0 the moment Stop() is waiting for them to join. That is our signal
376 : : // to call Start() right into the middle of the joining phase.
377 [ + - + + ]: 17 : while (threadPool.WorkersCount() != 0) {
378 : 16 : std::this_thread::yield(); // let the OS breathe so it can switch context
379 : : }
380 : : // Now we know for sure the stopper thread is hanging while workers are still alive.
381 : : // Restart the pool and resume workers so the stopper thread can proceed.
382 : : // This will throw an exception only if the pool handles Start-Stop race properly,
383 : : // otherwise it will proceed and hang the stopper_thread.
384 : 1 : try {
385 [ - + ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
386 [ - + ]: 1 : } catch (std::exception& e) {
387 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(e.what(), "Thread pool has been interrupted or is stopping");
388 : 1 : }
389 : 1 : workers_blocker.release(NUM_WORKERS_DEFAULT);
390 [ + - + - : 16 : WAIT_FOR(blocking_tasks);
+ - + + ]
391 : :
392 : : // If Stop() is stuck, joining the stopper thread will deadlock
393 [ + - ]: 1 : stopper_thread.join();
394 : 1 : }
395 : :
396 : : // Test 12, queued tasks complete after Interrupt()
397 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(queued_tasks_complete_after_interrupt)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
398 : : {
399 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
400 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
401 : :
402 [ + - ]: 1 : std::counting_semaphore<> blocker(0);
403 [ + - ]: 1 : const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
404 : :
405 : : // Queue tasks while all workers are busy, then interrupt
406 : 1 : std::atomic<int> counter{0};
407 : 1 : const int num_tasks = 10;
408 : 1 : std::vector<std::future<void>> futures;
409 [ + - ]: 1 : futures.reserve(num_tasks);
410 [ + + ]: 11 : for (int i = 0; i < num_tasks; i++) {
411 [ + - + - ]: 30 : futures.emplace_back(Submit(threadPool, [&counter]{ counter.fetch_add(1, std::memory_order_relaxed); }));
412 : : }
413 [ + - ]: 1 : threadPool.Interrupt();
414 : :
415 : : // Queued tasks must still complete despite the interrupt
416 : 1 : blocker.release(NUM_WORKERS_DEFAULT);
417 [ + - + - : 11 : WAIT_FOR(futures);
+ - + + ]
418 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(counter.load(), num_tasks);
419 : :
420 [ + - ]: 1 : threadPool.Stop();
421 [ + - + - : 14 : WAIT_FOR(blocking_tasks);
+ - + + ]
422 : 1 : }
423 : :
424 : : // Test 13, ensure the Stop() calling thread helps drain the queue
425 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
426 : : {
427 [ + - ]: 1 : ThreadPool threadPool(POOL_NAME);
428 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
429 : :
430 [ + - ]: 1 : std::counting_semaphore<> blocker(0);
431 [ + - ]: 1 : const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
432 : :
433 : 1 : auto main_thread_id = std::this_thread::get_id();
434 : 1 : std::atomic<int> main_thread_tasks{0};
435 : 1 : const size_t num_tasks = 20;
436 [ + + ]: 21 : for (size_t i = 0; i < num_tasks; i++) {
437 [ + - ]: 60 : (void)Submit(threadPool, [&main_thread_tasks, main_thread_id]() {
438 [ + - ]: 20 : if (std::this_thread::get_id() == main_thread_id)
439 : 20 : main_thread_tasks.fetch_add(1, std::memory_order_relaxed);
440 : 20 : });
441 : : }
442 [ + - + - : 1 : BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
+ - ]
443 : :
444 : : // Delay release so Stop() drains all tasks from the calling thread
445 : 2 : std::thread unblocker([&blocker, &threadPool]() {
446 [ + + ]: 3 : while (threadPool.WorkQueueSize() > 0) {
447 : 2 : std::this_thread::yield();
448 : : }
449 : 1 : blocker.release(NUM_WORKERS_DEFAULT);
450 [ + - ]: 2 : });
451 : :
452 [ + - ]: 1 : threadPool.Stop();
453 [ + - ]: 1 : unblocker.join();
454 : :
455 : : // Check the main thread processed all tasks
456 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(main_thread_tasks.load(), num_tasks);
457 [ + - + - : 14 : WAIT_FOR(blocking_tasks);
+ - + + ]
458 : 1 : }
459 : :
460 : : // Test 14, submit range of tasks in one lock acquisition
461 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(submit_range_of_tasks_complete_successfully)
+ - + - -
+ + - + -
+ - + - +
- - + + -
+ - + - +
- + - - +
+ - + - +
- + - + -
- + + - +
- + - + -
+ - - + +
- ]
462 : : {
463 : 1 : constexpr int32_t num_tasks{50};
464 : :
465 [ + - ]: 1 : ThreadPool threadPool{POOL_NAME};
466 [ + - ]: 1 : threadPool.Start(NUM_WORKERS_DEFAULT);
467 : 1 : std::atomic_int32_t sum{0};
468 : 101 : const auto square{[&sum](int32_t i) {
469 : 100 : sum.fetch_add(i, std::memory_order_relaxed);
470 : 100 : return i * i;
471 : 1 : }};
472 : :
473 : 1 : std::array<std::function<int32_t()>, static_cast<size_t>(num_tasks)> array_tasks;
474 : 1 : std::vector<std::function<int32_t()>> vector_tasks;
475 [ + - ]: 1 : vector_tasks.reserve(static_cast<size_t>(num_tasks));
476 [ + + ]: 51 : for (const auto i : std::views::iota(int32_t{1}, num_tasks + 1)) {
477 [ - + ]: 100 : array_tasks.at(static_cast<size_t>(i - 1)) = [i, square] { return square(i); };
478 [ + - ]: 100 : vector_tasks.emplace_back([i, square] { return square(i); });
479 : : }
480 : :
481 [ - + ]: 1 : auto futures{std::move(*Assert(threadPool.Submit(std::move(array_tasks))))};
482 [ + - - + : 1 : BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks));
+ - ]
483 [ - + + - ]: 1 : std::ranges::move(*Assert(threadPool.Submit(std::move(vector_tasks))), std::back_inserter(futures));
484 [ + - - + : 1 : BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks * 2));
+ - ]
485 : :
486 : 1 : auto squares_sum{0};
487 [ + + ]: 101 : for (auto& future : futures) {
488 [ + - ]: 100 : squares_sum += future.get();
489 : : }
490 : :
491 : : // 2x Gauss sum.
492 : 1 : const auto expected_sum{2 * ((num_tasks * (num_tasks + 1)) / 2)};
493 : 1 : const auto expected_squares_sum{2 * ((num_tasks * (num_tasks + 1) * ((num_tasks * 2) + 1)) / 6)};
494 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(sum, expected_sum);
495 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(squares_sum, expected_squares_sum);
496 [ + - + - : 1 : BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
+ - ]
497 : 2 : }
498 : :
499 : : BOOST_AUTO_TEST_SUITE_END()
|