LCOV - code coverage report
Current view: top level - src/test - threadpool_tests.cpp (source / functions) Coverage Total Hit
Test: total_coverage.info Lines: 99.6 % 284 283
Test Date: 2026-03-16 05:20:51 Functions: 100.0 % 55 55
Branches: 51.4 % 1518 781

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #include <common/system.h>
       6                 :             : #include <logging.h>
       7                 :             : #include <random.h>
       8                 :             : #include <test/util/common.h>
       9                 :             : #include <util/string.h>
      10                 :             : #include <util/threadpool.h>
      11                 :             : #include <util/time.h>
      12                 :             : 
      13                 :             : #include <boost/test/unit_test.hpp>
      14                 :             : 
      15                 :             : #include <array>
      16                 :             : #include <functional>
      17                 :             : #include <latch>
      18                 :             : #include <ranges>
      19                 :             : #include <semaphore>
      20                 :             : 
      21                 :             : // General test values
      22                 :             : int NUM_WORKERS_DEFAULT = 0;
      23                 :             : constexpr char POOL_NAME[] = "test";
      24                 :             : constexpr auto WAIT_TIMEOUT = 120s;
      25                 :             : 
      26                 :             : struct ThreadPoolFixture {
      27                 :          15 :     ThreadPoolFixture() {
      28         [ +  - ]:          15 :         NUM_WORKERS_DEFAULT = FastRandomContext().randrange(GetNumCores()) + 1;
      29                 :          15 :         LogInfo("thread pool workers count: %d", NUM_WORKERS_DEFAULT);
      30                 :          15 :     }
      31                 :             : };
      32                 :             : 
      33                 :             : // Test Cases Overview
      34                 :             : // 0) Submit task to a non-started pool.
      35                 :             : // 1) Submit tasks and verify completion.
      36                 :             : // 2) Maintain all threads busy except one.
      37                 :             : // 3) Wait for work to finish.
      38                 :             : // 4) Wait for result object.
      39                 :             : // 5) The task throws an exception, catch must be done in the consumer side.
      40                 :             : // 6) Busy workers, help them by processing tasks externally.
      41                 :             : // 7) Recursive submission of tasks.
      42                 :             : // 8) Submit task when all threads are busy, stop pool and verify task gets executed.
      43                 :             : // 9) Congestion test; create more workers than available cores.
      44                 :             : // 10) Ensure Interrupt() prevents further submissions.
      45                 :             : // 11) Start() must not cause a deadlock when called during Stop().
      46                 :             : // 12) Ensure queued tasks complete after Interrupt().
      47                 :             : // 13) Ensure the Stop() calling thread helps drain the queue.
      48                 :             : // 14) Submit range of tasks in one lock acquisition.
      49                 :             : BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture)
      50                 :             : 
      51                 :             : #define WAIT_FOR(futures)                                                         \
      52                 :             :     do {                                                                          \
      53                 :             :         for (const auto& f : futures) {                                           \
      54                 :             :             BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \
      55                 :             :         }                                                                         \
      56                 :             :     } while (0)
      57                 :             : 
      58                 :             : // Helper to unwrap a valid pool submission
      59                 :             : template <typename F>
      60                 :         411 : [[nodiscard]] auto Submit(ThreadPool& pool, F&& fn)
      61                 :             : {
      62         [ -  + ]:         411 :     return std::move(*Assert(pool.Submit(std::forward<F>(fn))));
      63                 :             : }
      64                 :             : 
      65                 :             : // Block a number of worker threads by submitting tasks that wait on `release_sem`.
      66                 :             : // Returns the futures of the blocking tasks, ensuring all have started and are waiting.
      67                 :           8 : std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
      68                 :             : {
      69         [ -  + ]:           8 :     assert(threadPool.WorkersCount() >= num_of_threads_to_block);
      70                 :           8 :     std::latch ready{static_cast<std::ptrdiff_t>(num_of_threads_to_block)};
      71                 :           8 :     std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
      72   [ +  -  +  + ]:          92 :     for (auto& f : blocking_tasks) f = Submit(threadPool, [&] {
      73         [ +  + ]:          84 :         ready.count_down();
      74                 :          84 :         release_sem.acquire();
      75         [ -  + ]:          84 :     });
      76                 :           8 :     ready.wait();
      77                 :           8 :     return blocking_tasks;
      78                 :           0 : }
      79                 :             : 
      80                 :             : // Test 0, submit task to a non-started, interrupted, or stopped pool
      81   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
      82                 :             : {
      83         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
      84                 :           1 :     const auto fn_empty = [&] {};
      85                 :             : 
      86                 :             :     // Never started: Inactive
      87                 :           1 :     auto res = threadPool.Submit(fn_empty);
      88   [ +  -  +  -  :           2 :     BOOST_CHECK(!res);
                   +  - ]
      89   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
      90                 :             : 
      91                 :             :     // Interrupted (workers still alive): Interrupted, and Start() must be rejected too
      92         [ +  - ]:           1 :     std::counting_semaphore<> blocker(0);
      93         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
      94         [ +  - ]:           1 :     const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
      95         [ +  - ]:           1 :     threadPool.Interrupt();
      96                 :           1 :     res = threadPool.Submit(fn_empty);
      97   [ +  -  +  -  :           2 :     BOOST_CHECK(!res);
                   +  - ]
      98   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
      99   [ +  -  -  +  :           2 :     BOOST_CHECK_EXCEPTION(threadPool.Start(NUM_WORKERS_DEFAULT), std::runtime_error, HasReason("Thread pool has been interrupted or is stopping"));
          -  -  -  -  -  
          +  +  -  +  -  
                   +  - ]
     100                 :           1 :     blocker.release(NUM_WORKERS_DEFAULT);
     101   [ +  -  +  -  :           4 :     WAIT_FOR(blocking_tasks);
             +  -  +  + ]
     102                 :             : 
     103                 :             :     // Interrupted then stopped: Inactive
     104         [ +  - ]:           1 :     threadPool.Stop();
     105                 :           1 :     res = threadPool.Submit(fn_empty);
     106   [ +  -  +  -  :           2 :     BOOST_CHECK(!res);
                   +  - ]
     107   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
     108                 :             : 
     109                 :             :     // Started then stopped: Inactive
     110         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     111         [ +  - ]:           1 :     threadPool.Stop();
     112                 :           1 :     res = threadPool.Submit(fn_empty);
     113   [ +  -  +  -  :           2 :     BOOST_CHECK(!res);
                   +  - ]
     114   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
     115                 :             : 
     116                 :           1 :     std::vector<std::function<void()>> tasks;
     117                 :           1 :     const auto range_res{threadPool.Submit(std::move(tasks))};
     118   [ +  -  +  -  :           2 :     BOOST_CHECK(!range_res);
                   +  - ]
     119   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "No active workers");
     120                 :           1 : }
     121                 :             : 
     122                 :             : // Test 1, submit tasks and verify completion
     123   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     124                 :             : {
     125                 :           1 :     int num_tasks = 50;
     126                 :             : 
     127         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     128         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     129                 :           1 :     std::atomic<int> counter = 0;
     130                 :             : 
     131                 :             :     // Store futures to ensure completion before checking counter.
     132                 :           1 :     std::vector<std::future<void>> futures;
     133         [ +  - ]:           1 :     futures.reserve(num_tasks);
     134         [ +  + ]:          51 :     for (int i = 1; i <= num_tasks; i++) {
     135   [ +  -  +  - ]:         150 :         futures.emplace_back(Submit(threadPool, [&counter, i]() {
     136                 :          50 :             counter.fetch_add(i, std::memory_order_relaxed);
     137                 :             :         }));
     138                 :             :     }
     139                 :             : 
     140                 :             :     // Wait for all tasks to finish
     141   [ +  -  +  -  :          51 :     WAIT_FOR(futures);
             +  -  +  + ]
     142                 :           1 :     int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
     143   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(counter.load(), expected_value);
     144   [ +  -  +  -  :           1 :     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
                   +  - ]
     145                 :           1 : }
     146                 :             : 
     147                 :             : // Test 2, maintain all threads busy except one
     148   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     149                 :             : {
     150         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     151         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     152         [ +  - ]:           1 :     std::counting_semaphore<> blocker(0);
     153         [ +  - ]:           1 :     const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT - 1);
     154                 :             : 
     155                 :             :     // Now execute tasks on the single available worker
     156                 :             :     // and check that all the tasks are executed.
     157                 :           1 :     int num_tasks = 15;
     158                 :           1 :     int counter = 0;
     159                 :             : 
     160                 :             :     // Store futures to wait on
     161         [ +  - ]:           1 :     std::vector<std::future<void>> futures(num_tasks);
     162   [ +  -  -  +  :          31 :     for (auto& f : futures) f = Submit(threadPool, [&counter]{ counter++; });
                   +  + ]
     163                 :             : 
     164   [ +  -  +  -  :          16 :     WAIT_FOR(futures);
             +  -  +  + ]
     165   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(counter, num_tasks);
     166                 :             : 
     167                 :           1 :     blocker.release(NUM_WORKERS_DEFAULT - 1);
     168   [ +  -  +  -  :          16 :     WAIT_FOR(blocking_tasks);
             +  -  +  + ]
     169         [ +  - ]:           1 :     threadPool.Stop();
     170   [ +  -  +  -  :           1 :     BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
                   +  - ]
     171                 :           1 : }
     172                 :             : 
     173                 :             : // Test 3, wait for work to finish
     174   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(wait_for_task_to_finish)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     175                 :             : {
     176         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     177         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     178                 :           1 :     std::atomic<bool> flag = false;
     179                 :           2 :     std::future<void> future = Submit(threadPool, [&flag]() {
     180                 :           1 :         UninterruptibleSleep(200ms);
     181                 :           1 :         flag.store(true, std::memory_order_release);
     182         [ +  - ]:           2 :     });
     183   [ +  -  +  -  :           2 :     BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready);
             +  -  +  - ]
     184   [ +  -  +  -  :           2 :     BOOST_CHECK(flag.load(std::memory_order_acquire));
                   +  - ]
     185                 :           1 : }
     186                 :             : 
     187                 :             : // Test 4, obtain result object
     188   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(get_result_from_completed_task)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     189                 :             : {
     190         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     191         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     192         [ +  - ]:           1 :     std::future<bool> future_bool = Submit(threadPool, []() { return true; });
     193   [ +  -  +  -  :           2 :     BOOST_CHECK(future_bool.get());
             +  -  +  - ]
     194                 :             : 
     195   [ +  -  +  - ]:           2 :     std::future<std::string> future_str = Submit(threadPool, []() { return std::string("true"); });
     196         [ +  - ]:           1 :     std::string result = future_str.get();
     197   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(result, "true");
     198   [ -  +  -  + ]:           1 : }
     199                 :             : 
     200                 :             : // Test 5, throw exception and catch it on the consumer side
     201   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     202                 :             : {
     203         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     204         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     205                 :             : 
     206         [ +  - ]:          10 :     const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
     207                 :             : 
     208                 :           1 :     const int num_tasks = 5;
     209                 :           1 :     std::vector<std::future<void>> futures;
     210         [ +  - ]:           1 :     futures.reserve(num_tasks);
     211         [ +  + ]:           6 :     for (int i = 0; i < num_tasks; i++) {
     212   [ +  -  +  - ]:          20 :         futures.emplace_back(Submit(threadPool, [&make_err, i] { throw std::runtime_error(make_err(i)); }));
     213                 :             :     }
     214                 :             : 
     215         [ +  + ]:           6 :     for (int i = 0; i < num_tasks; i++) {
     216   [ +  -  -  +  :          10 :         BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
          -  -  -  -  -  
          +  +  -  +  -  
          -  +  +  -  +  
                      - ]
     217                 :             :     }
     218                 :           1 : }
     219                 :             : 
     220                 :             : // Test 6, all workers are busy, help them by processing tasks from outside
     221   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     222                 :             : {
     223         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     224         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     225                 :             : 
     226         [ +  - ]:           1 :     std::counting_semaphore<> blocker(0);
     227         [ +  - ]:           1 :     const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
     228                 :             : 
     229                 :             :     // Now submit tasks and check that none of them are executed.
     230                 :           1 :     int num_tasks = 20;
     231                 :           1 :     std::atomic<int> counter = 0;
     232         [ +  + ]:          21 :     for (int i = 0; i < num_tasks; i++) {
     233         [ +  - ]:          60 :         (void)Submit(threadPool, [&counter]() {
     234                 :          20 :             counter.fetch_add(1, std::memory_order_relaxed);
     235                 :             :         });
     236                 :             :     }
     237         [ +  - ]:           1 :     UninterruptibleSleep(100ms);
     238   [ +  -  +  -  :           1 :     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
                   +  - ]
     239                 :             : 
     240                 :             :     // Now process manually
     241         [ +  + ]:          21 :     for (int i = 0; i < num_tasks; i++) {
     242         [ +  - ]:          20 :         threadPool.ProcessTask();
     243                 :             :     }
     244   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(counter.load(), num_tasks);
     245   [ +  -  +  -  :           1 :     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
                   +  - ]
     246                 :           1 :     blocker.release(NUM_WORKERS_DEFAULT);
     247         [ +  - ]:           1 :     threadPool.Stop();
     248   [ +  -  +  -  :          14 :     WAIT_FOR(blocking_tasks);
             +  -  +  + ]
     249                 :           1 : }
     250                 :             : 
     251                 :             : // Test 7, submit tasks from other tasks
     252   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(recursive_task_submission)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     253                 :             : {
     254         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     255         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     256                 :             : 
     257         [ +  - ]:           1 :     std::promise<void> signal;
     258         [ +  - ]:           2 :     (void)Submit(threadPool, [&]() {
     259         [ +  - ]:           1 :         (void)Submit(threadPool, [&]() {
     260   [ -  -  +  - ]:           1 :             signal.set_value();
     261                 :             :         });
     262                 :           1 :     });
     263                 :             : 
     264   [ +  -  +  - ]:           2 :     signal.get_future().wait();
     265         [ +  - ]:           1 :     threadPool.Stop();
     266                 :           1 : }
     267                 :             : 
     268                 :             : // Test 8, submit task when all threads are busy and then stop the pool
     269   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     270                 :             : {
     271         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     272         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     273                 :             : 
     274         [ +  - ]:           1 :     std::counting_semaphore<> blocker(0);
     275         [ +  - ]:           1 :     const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
     276                 :             : 
     277                 :             :     // Submit an extra task that should execute once a worker is free
     278         [ +  - ]:           1 :     std::future<bool> future = Submit(threadPool, []() { return true; });
     279                 :             : 
     280                 :             :     // At this point, all workers are blocked, and the extra task is queued
     281   [ +  -  +  -  :           1 :     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
                   +  - ]
     282                 :             : 
     283                 :             :     // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
     284                 :           2 :     std::thread thread_unblocker([&blocker]() {
     285                 :           1 :         UninterruptibleSleep(300ms);
     286                 :           1 :         blocker.release(NUM_WORKERS_DEFAULT);
     287         [ +  - ]:           2 :     });
     288                 :             : 
     289                 :             :     // Stop the pool while the workers are still blocked
     290         [ +  - ]:           1 :     threadPool.Stop();
     291                 :             : 
     292                 :             :     // Expect the submitted task to complete
     293   [ +  -  +  -  :           2 :     BOOST_CHECK(future.get());
             +  -  +  - ]
     294         [ +  - ]:           1 :     thread_unblocker.join();
     295                 :             : 
     296                 :             :     // Obviously all the previously blocking tasks should be completed at this point too
     297   [ +  -  +  -  :          12 :     WAIT_FOR(blocking_tasks);
             +  -  +  + ]
     298                 :             : 
     299                 :             :     // Pool should be stopped and no workers remaining
     300   [ +  -  +  -  :           1 :     BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
                   +  - ]
     301         [ -  + ]:           1 : }
     302                 :             : 
     303                 :             : // Test 9, more workers than available cores (congestion test)
     304   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     305                 :             : {
     306         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     307   [ +  -  -  +  :           1 :     threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2×
                   +  - ]
     308                 :             : 
     309                 :           1 :     int num_tasks = 200;
     310                 :           1 :     std::atomic<int> counter{0};
     311                 :             : 
     312                 :           1 :     std::vector<std::future<void>> futures;
     313         [ +  - ]:           1 :     futures.reserve(num_tasks);
     314         [ +  + ]:         201 :     for (int i = 0; i < num_tasks; i++) {
     315   [ +  -  +  - ]:         600 :         futures.emplace_back(Submit(threadPool, [&counter] {
     316                 :         200 :             counter.fetch_add(1, std::memory_order_relaxed);
     317                 :             :         }));
     318                 :             :     }
     319                 :             : 
     320   [ +  -  +  -  :         201 :     WAIT_FOR(futures);
             +  -  +  + ]
     321   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(counter.load(), num_tasks);
     322                 :           1 : }
     323                 :             : 
     324                 :             : // Test 10, Interrupt() prevents further submissions
     325   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     326                 :             : {
     327                 :             :     // 1) Interrupt from main thread
     328         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     329         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     330         [ +  - ]:           1 :     threadPool.Interrupt();
     331                 :             : 
     332                 :           1 :     auto res = threadPool.Submit([]{});
     333   [ +  -  +  -  :           2 :     BOOST_CHECK(!res);
                   +  - ]
     334   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
     335                 :             : 
     336                 :           1 :     std::vector<std::function<void()>> tasks;
     337                 :           1 :     const auto range_res{threadPool.Submit(std::move(tasks))};
     338   [ +  -  +  -  :           2 :     BOOST_CHECK(!range_res);
                   +  - ]
     339   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "Interrupted");
     340                 :             : 
     341                 :             :     // Reset pool
     342         [ +  - ]:           1 :     threadPool.Stop();
     343                 :             : 
     344                 :             :     // 2) Interrupt() from a worker thread
     345                 :             :     // One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks.
     346         [ +  - ]:           1 :     threadPool.Start(/*num_workers=*/3);
     347                 :           1 :     std::atomic<int> counter{0};
     348         [ +  - ]:           1 :     std::counting_semaphore<> blocker(0);
     349         [ +  - ]:           1 :     const auto blocking_tasks = BlockWorkers(threadPool, blocker, 1);
     350         [ +  - ]:           2 :     Submit(threadPool, [&threadPool, &counter]{
     351                 :           1 :         threadPool.Interrupt();
     352                 :           1 :         counter.fetch_add(1, std::memory_order_relaxed);
     353         [ +  - ]:           2 :     }).get();
     354                 :           1 :     blocker.release(1); // unblock worker
     355                 :             : 
     356   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(counter.load(), 1);
     357         [ +  - ]:           1 :     threadPool.Stop();
     358   [ +  -  +  -  :           2 :     WAIT_FOR(blocking_tasks);
             +  -  +  + ]
     359   [ +  -  +  -  :           1 :     BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
                   +  - ]
     360                 :           1 : }
     361                 :             : 
     362                 :             : // Test 11, Start() must not cause a deadlock when called during Stop()
     363   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(start_mid_stop_does_not_deadlock)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     364                 :             : {
     365         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     366         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     367                 :             : 
     368                 :             :     // Keep all workers busy so Stop() gets stuck waiting for them to finish during join()
     369         [ +  - ]:           1 :     std::counting_semaphore<> workers_blocker(0);
     370         [ +  - ]:           1 :     const auto blocking_tasks = BlockWorkers(threadPool, workers_blocker, NUM_WORKERS_DEFAULT);
     371                 :             : 
     372         [ +  - ]:           2 :     std::thread stopper_thread([&threadPool] { threadPool.Stop(); });
     373                 :             : 
     374                 :             :     // Stop() takes ownership of the workers before joining them, so WorkersCount()
     375                 :             :     // hits 0 the moment Stop() is waiting for them to join. That is our signal
     376                 :             :     // to call Start() right into the middle of the joining phase.
     377   [ +  -  +  + ]:          17 :     while (threadPool.WorkersCount() != 0) {
     378                 :          16 :         std::this_thread::yield(); // let the OS breathe so it can switch context
     379                 :             :     }
     380                 :             :     // Now we know for sure the stopper thread is hanging while workers are still alive.
     381                 :             :     // Restart the pool and resume workers so the stopper thread can proceed.
     382                 :             :     // This will throw an exception only if the pool handles Start-Stop race properly,
     383                 :             :     // otherwise it will proceed and hang the stopper_thread.
     384                 :           1 :     try {
     385         [ -  + ]:           1 :         threadPool.Start(NUM_WORKERS_DEFAULT);
     386         [ -  + ]:           1 :     } catch (std::exception& e) {
     387   [ +  -  +  - ]:           1 :         BOOST_CHECK_EQUAL(e.what(), "Thread pool has been interrupted or is stopping");
     388                 :           1 :     }
     389                 :           1 :     workers_blocker.release(NUM_WORKERS_DEFAULT);
     390   [ +  -  +  -  :          16 :     WAIT_FOR(blocking_tasks);
             +  -  +  + ]
     391                 :             : 
     392                 :             :     // If Stop() is stuck, joining the stopper thread will deadlock
     393         [ +  - ]:           1 :     stopper_thread.join();
     394                 :           1 : }
     395                 :             : 
     396                 :             : // Test 12, queued tasks complete after Interrupt()
     397   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(queued_tasks_complete_after_interrupt)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     398                 :             : {
     399         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     400         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     401                 :             : 
     402         [ +  - ]:           1 :     std::counting_semaphore<> blocker(0);
     403         [ +  - ]:           1 :     const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
     404                 :             : 
     405                 :             :     // Queue tasks while all workers are busy, then interrupt
     406                 :           1 :     std::atomic<int> counter{0};
     407                 :           1 :     const int num_tasks = 10;
     408                 :           1 :     std::vector<std::future<void>> futures;
     409         [ +  - ]:           1 :     futures.reserve(num_tasks);
     410         [ +  + ]:          11 :     for (int i = 0; i < num_tasks; i++) {
     411   [ +  -  +  - ]:          30 :         futures.emplace_back(Submit(threadPool, [&counter]{ counter.fetch_add(1, std::memory_order_relaxed); }));
     412                 :             :     }
     413         [ +  - ]:           1 :     threadPool.Interrupt();
     414                 :             : 
     415                 :             :     // Queued tasks must still complete despite the interrupt
     416                 :           1 :     blocker.release(NUM_WORKERS_DEFAULT);
     417   [ +  -  +  -  :          11 :     WAIT_FOR(futures);
             +  -  +  + ]
     418   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(counter.load(), num_tasks);
     419                 :             : 
     420         [ +  - ]:           1 :     threadPool.Stop();
     421   [ +  -  +  -  :          14 :     WAIT_FOR(blocking_tasks);
             +  -  +  + ]
     422                 :           1 : }
     423                 :             : 
     424                 :             : // Test 13, ensure the Stop() calling thread helps drain the queue
     425   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     426                 :             : {
     427         [ +  - ]:           1 :     ThreadPool threadPool(POOL_NAME);
     428         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     429                 :             : 
     430         [ +  - ]:           1 :     std::counting_semaphore<> blocker(0);
     431         [ +  - ]:           1 :     const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
     432                 :             : 
     433                 :           1 :     auto main_thread_id = std::this_thread::get_id();
     434                 :           1 :     std::atomic<int> main_thread_tasks{0};
     435                 :           1 :     const size_t num_tasks = 20;
     436         [ +  + ]:          21 :     for (size_t i = 0; i < num_tasks; i++) {
     437         [ +  - ]:          60 :         (void)Submit(threadPool, [&main_thread_tasks, main_thread_id]() {
     438         [ +  - ]:          20 :             if (std::this_thread::get_id() == main_thread_id)
     439                 :          20 :                 main_thread_tasks.fetch_add(1, std::memory_order_relaxed);
     440                 :          20 :         });
     441                 :             :     }
     442   [ +  -  +  -  :           1 :     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
                   +  - ]
     443                 :             : 
     444                 :             :     // Delay release so Stop() drains all tasks from the calling thread
     445                 :           2 :     std::thread unblocker([&blocker, &threadPool]() {
     446         [ +  + ]:           3 :         while (threadPool.WorkQueueSize() > 0) {
     447                 :           2 :             std::this_thread::yield();
     448                 :             :         }
     449                 :           1 :         blocker.release(NUM_WORKERS_DEFAULT);
     450         [ +  - ]:           2 :     });
     451                 :             : 
     452         [ +  - ]:           1 :     threadPool.Stop();
     453         [ +  - ]:           1 :     unblocker.join();
     454                 :             : 
     455                 :             :     // Check the main thread processed all tasks
     456   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(main_thread_tasks.load(), num_tasks);
     457   [ +  -  +  -  :          14 :     WAIT_FOR(blocking_tasks);
             +  -  +  + ]
     458                 :           1 : }
     459                 :             : 
     460                 :             : // Test 14, submit range of tasks in one lock acquisition
     461   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(submit_range_of_tasks_complete_successfully)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     462                 :             : {
     463                 :           1 :     constexpr int32_t num_tasks{50};
     464                 :             : 
     465         [ +  - ]:           1 :     ThreadPool threadPool{POOL_NAME};
     466         [ +  - ]:           1 :     threadPool.Start(NUM_WORKERS_DEFAULT);
     467                 :           1 :     std::atomic_int32_t sum{0};
     468                 :         101 :     const auto square{[&sum](int32_t i) {
     469                 :         100 :         sum.fetch_add(i, std::memory_order_relaxed);
     470                 :         100 :         return i * i;
     471                 :           1 :     }};
     472                 :             : 
     473                 :           1 :     std::array<std::function<int32_t()>, static_cast<size_t>(num_tasks)> array_tasks;
     474                 :           1 :     std::vector<std::function<int32_t()>> vector_tasks;
     475         [ +  - ]:           1 :     vector_tasks.reserve(static_cast<size_t>(num_tasks));
     476         [ +  + ]:          51 :     for (const auto i : std::views::iota(int32_t{1}, num_tasks + 1)) {
     477         [ -  + ]:         100 :         array_tasks.at(static_cast<size_t>(i - 1)) = [i, square] { return square(i); };
     478         [ +  - ]:         100 :         vector_tasks.emplace_back([i, square] { return square(i); });
     479                 :             :     }
     480                 :             : 
     481         [ -  + ]:           1 :     auto futures{std::move(*Assert(threadPool.Submit(std::move(array_tasks))))};
     482   [ +  -  -  +  :           1 :     BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks));
                   +  - ]
     483   [ -  +  +  - ]:           1 :     std::ranges::move(*Assert(threadPool.Submit(std::move(vector_tasks))), std::back_inserter(futures));
     484   [ +  -  -  +  :           1 :     BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks * 2));
                   +  - ]
     485                 :             : 
     486                 :           1 :     auto squares_sum{0};
     487         [ +  + ]:         101 :     for (auto& future : futures) {
     488         [ +  - ]:         100 :         squares_sum += future.get();
     489                 :             :     }
     490                 :             : 
     491                 :             :     // 2x Gauss sum.
     492                 :           1 :     const auto expected_sum{2 * ((num_tasks * (num_tasks + 1)) / 2)};
     493                 :           1 :     const auto expected_squares_sum{2 * ((num_tasks * (num_tasks + 1) * ((num_tasks * 2) + 1)) / 6)};
     494   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(sum, expected_sum);
     495   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(squares_sum, expected_squares_sum);
     496   [ +  -  +  -  :           1 :     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
                   +  - ]
     497                 :           2 : }
     498                 :             : 
     499                 :             : BOOST_AUTO_TEST_SUITE_END()
        

Generated by: LCOV version 2.0-1