LCOV - code coverage report
Current view: top level - src/util - threadpool.h (source / functions) Coverage Total Hit
Test: test_bitcoin_coverage.info Lines: 97.9 % 96 94
Test Date: 2026-03-13 04:56:35 Functions: 96.9 % 32 31
Branches: 60.9 % 133 81

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or https://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #ifndef BITCOIN_UTIL_THREADPOOL_H
       6                 :             : #define BITCOIN_UTIL_THREADPOOL_H
       7                 :             : 
       8                 :             : #include <sync.h>
       9                 :             : #include <tinyformat.h>
      10                 :             : #include <util/check.h>
      11                 :             : #include <util/expected.h>
      12                 :             : #include <util/thread.h>
      13                 :             : 
      14                 :             : #include <algorithm>
      15                 :             : #include <condition_variable>
      16                 :             : #include <functional>
      17                 :             : #include <future>
      18                 :             : #include <queue>
      19                 :             : #include <ranges>
      20                 :             : #include <thread>
      21                 :             : #include <type_traits>
      22                 :             : #include <utility>
      23                 :             : #include <vector>
      24                 :             : 
      25                 :             : /**
      26                 :             :  * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
      27                 :             :  *
      28                 :             :  * The thread pool maintains a set of worker threads that consume and execute
      29                 :             :  * tasks submitted through Submit(). Once started, tasks can be queued and
      30                 :             :  * processed asynchronously until Stop() is called.
      31                 :             :  *
      32                 :             :  * ### Thread-safety and lifecycle
      33                 :             :  * - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
      34                 :             :  *   Calling `Stop()` from a worker thread will deadlock, as it waits for all
      35                 :             :  *   workers to join, including the current one.
      36                 :             :  *
      37                 :             :  * - `Submit()` can be called from any thread, including workers. It safely
      38                 :             :  *   enqueues new work for execution as long as the pool has active workers.
      39                 :             :  *
      40                 :             :  * - `Interrupt()` stops new task submission and lets queued ones drain
      41                 :             :  *   in the background. Callers can continue other shutdown steps and call
      42                 :             :  *   Stop() at the end to ensure no remaining tasks are left to execute.
      43                 :             :  *
      44                 :             :  * - `Stop()` prevents further task submission and blocks until all the
      45                 :             :  *   queued ones are completed.
      46                 :             :  */
      47                 :             : class ThreadPool
      48                 :             : {
      49                 :             : private:
      50                 :             :     std::string m_name;
      51                 :             :     Mutex m_mutex;
      52                 :             :     std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
      53                 :             :     std::condition_variable m_cv;
      54                 :             :     // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
      55                 :             :     // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
      56                 :             :     // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
      57                 :             :     bool m_interrupt GUARDED_BY(m_mutex){false};
      58                 :             :     std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
      59                 :             : 
      60                 :         149 :     void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
      61                 :             :     {
      62                 :         149 :         WAIT_LOCK(m_mutex, wait_lock);
      63                 :        1033 :         for (;;) {
      64         [ +  + ]:         591 :             std::packaged_task<void()> task;
      65                 :         591 :             {
      66                 :             :                 // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
      67   [ +  +  +  + ]:         591 :                 if (!m_interrupt && m_work_queue.empty()) {
      68                 :             :                     // Block until the pool is interrupted or a task is available.
      69   [ +  +  +  +  :         565 :                     m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
                   +  - ]
      70                 :             :                 }
      71                 :             : 
      72                 :             :                 // If stopped and no work left, exit worker
      73   [ +  +  +  + ]:         591 :                 if (m_interrupt && m_work_queue.empty()) {
      74                 :         298 :                     return;
      75                 :             :                 }
      76                 :             : 
      77                 :         442 :                 task = std::move(m_work_queue.front());
      78                 :         442 :                 m_work_queue.pop();
      79                 :             :             }
      80                 :             : 
      81                 :         442 :             {
      82                 :             :                 // Execute the task without the lock
      83         [ +  - ]:         442 :                 REVERSE_LOCK(wait_lock, m_mutex);
      84         [ +  - ]:         442 :                 task();
      85                 :         442 :             }
      86         [ +  - ]:         591 :         }
      87                 :         149 :     }
      88                 :             : 
      89                 :             : public:
      90   [ -  +  +  - ]:         340 :     explicit ThreadPool(const std::string& name) : m_name(name) {}
      91                 :             : 
      92                 :         170 :     ~ThreadPool()
      93                 :             :     {
      94                 :         170 :         Stop(); // In case it hasn't been stopped.
      95                 :         170 :     }
      96                 :             : 
      97                 :             :     /**
      98                 :             :      * @brief Start worker threads.
      99                 :             :      *
     100                 :             :      * Creates and launches `num_workers` threads that begin executing tasks
     101                 :             :      * from the queue. If the pool is already started, throws.
     102                 :             :      *
     103                 :             :      * Must be called from a controller (non-worker) thread.
     104                 :             :      */
     105                 :          19 :     void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     106                 :             :     {
     107         [ -  + ]:          19 :         assert(num_workers > 0);
     108                 :          19 :         LOCK(m_mutex);
     109   [ +  +  +  - ]:          19 :         if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping");
     110   [ -  +  -  - ]:          17 :         if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
     111                 :             : 
     112                 :             :         // Create workers
     113         [ +  - ]:          17 :         m_workers.reserve(num_workers);
     114         [ +  + ]:         166 :         for (int i = 0; i < num_workers; i++) {
     115   [ +  -  +  - ]:         447 :             m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
     116                 :             :         }
     117                 :          17 :     }
     118                 :             : 
     119                 :             :     /**
     120                 :             :      * @brief Stop all worker threads and wait for them to exit.
     121                 :             :      *
     122                 :             :      * Sets the interrupt flag, wakes all waiting workers, and joins them.
     123                 :             :      * Any remaining tasks in the queue will be processed before returning.
     124                 :             :      *
     125                 :             :      * Must be called from a controller (non-worker) thread.
     126                 :             :      * Concurrent calls to Start() will be rejected while Stop() is in progress.
     127                 :             :      */
     128                 :         182 :     void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     129                 :             :     {
     130                 :             :         // Notify workers and join them
     131                 :         182 :         std::vector<std::thread> threads_to_join;
     132                 :         182 :         {
     133         [ +  - ]:         182 :             LOCK(m_mutex);
     134                 :             :             // Ensure Stop() is not called from a worker thread while workers are still registered,
     135                 :             :             // otherwise a self-join deadlock would occur.
     136                 :         182 :             auto id = std::this_thread::get_id();
     137   [ -  +  +  + ]:         331 :             for (const auto& worker : m_workers) assert(worker.get_id() != id);
     138                 :             :             // Early shutdown to return right away on any concurrent Submit() call
     139                 :         182 :             m_interrupt = true;
     140         [ +  - ]:         182 :             threads_to_join.swap(m_workers);
     141                 :         182 :         }
     142                 :         182 :         m_cv.notify_all();
     143                 :             :         // Help draining queue
     144   [ +  -  +  + ]:         385 :         while (ProcessTask()) {}
     145                 :             :         // Free resources
     146   [ +  -  +  + ]:         331 :         for (auto& worker : threads_to_join) worker.join();
     147                 :             : 
     148                 :             :         // Since we currently wait for tasks completion, sanity-check empty queue
     149         [ +  - ]:         182 :         LOCK(m_mutex);
     150         [ +  - ]:         182 :         Assume(m_work_queue.empty());
     151                 :             :         // Re-allow Start() now that all workers have exited
     152         [ +  - ]:         182 :         m_interrupt = false;
     153                 :         182 :     }
     154                 :             : 
     155                 :             :     enum class SubmitError {
     156                 :             :         Inactive,
     157                 :             :         Interrupted,
     158                 :             :     };
     159                 :             : 
     160                 :             :     template <class F>
     161                 :             :     using Future = std::future<std::invoke_result_t<F>>;
     162                 :             : 
     163                 :             :     template <class R>
     164                 :             :     using RangeFuture = Future<std::ranges::range_reference_t<R>>;
     165                 :             : 
     166                 :             :     template <class F>
     167                 :             :     using PackagedTask = std::packaged_task<std::invoke_result_t<F>()>;
     168                 :             : 
     169                 :             :     /**
     170                 :             :      * @brief Enqueues a new task for asynchronous execution.
     171                 :             :      *
     172                 :             :      * @param  fn Callable to execute asynchronously.
     173                 :             :      * @return On success, a future containing fn's result.
     174                 :             :      *         On failure, an error indicating why the task was rejected:
     175                 :             :      *         - SubmitError::Inactive: Pool has no workers (never started or already stopped).
     176                 :             :      *         - SubmitError::Interrupted: Pool task acceptance has been interrupted.
     177                 :             :      *
     178                 :             :      * Thread-safe: Can be called from any thread, including within the provided 'fn' callable.
     179                 :             :      *
     180                 :             :      * @warning Ignoring the returned future requires guarding the task against
     181                 :             :      *          uncaught exceptions, as they would otherwise be silently discarded.
     182                 :             :      */
     183                 :             :     template <class F>
     184                 :         388 :     [[nodiscard]] util::Expected<Future<F>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     185                 :             :     {
     186                 :         388 :         PackagedTask<F> task{std::forward<F>(fn)};
     187                 :         388 :         auto future{task.get_future()};
     188                 :             :         {
     189                 :         388 :             LOCK(m_mutex);
     190         [ +  + ]:         388 :             if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
     191         [ +  + ]:         385 :             if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
     192                 :             : 
     193         [ +  - ]:         383 :             m_work_queue.emplace(std::move(task));
     194                 :             :         }
     195                 :         383 :         m_cv.notify_one();
     196                 :         383 :         return {std::move(future)};
     197                 :         388 :     }
     198                 :             : 
     199                 :             :     /**
     200                 :             :      * @brief Enqueues a range of tasks for asynchronous execution.
     201                 :             :      *
     202                 :             :      * @param  fns Callables to execute asynchronously.
     203                 :             :      * @return On success, a vector of futures containing each element of fns's result in order.
     204                 :             :      *         On failure, an error indicating why the range was rejected:
     205                 :             :      *         - SubmitError::Inactive: Pool has no workers (never started or already stopped).
     206                 :             :      *         - SubmitError::Interrupted: Pool task acceptance has been interrupted.
     207                 :             :      *
     208                 :             :      * This is more efficient when submitting many tasks at once, since
     209                 :             :      * the queue lock is only taken once internally and all worker threads are
     210                 :             :      * notified. For single tasks, Submit() is preferred since only one worker
     211                 :             :      * thread is notified.
     212                 :             :      *
     213                 :             :      * Thread-safe: Can be called from any thread, including within submitted callables.
     214                 :             :      *
     215                 :             :      * @warning Ignoring the returned futures requires guarding tasks against
     216                 :             :      *          uncaught exceptions, as they would otherwise be silently discarded.
     217                 :             :      */
     218                 :             :     template <std::ranges::sized_range R>
     219                 :             :         requires(!std::is_lvalue_reference_v<R>)
     220                 :           4 :     [[nodiscard]] util::Expected<std::vector<RangeFuture<R>>, SubmitError> Submit(R&& fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     221                 :             :     {
     222         [ -  + ]:           4 :         std::vector<RangeFuture<R>> futures;
     223                 :           4 :         futures.reserve(std::ranges::size(fns));
     224                 :             : 
     225                 :             :         {
     226                 :           4 :             LOCK(m_mutex);
     227         [ +  + ]:           4 :             if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
     228         [ +  + ]:           3 :             if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
     229         [ +  + ]:         102 :             for (auto&& fn : fns) {
     230                 :         100 :                 PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)};
     231         [ -  + ]:         100 :                 futures.emplace_back(task.get_future());
     232                 :         100 :                 m_work_queue.emplace(std::move(task));
     233                 :             :             }
     234                 :           2 :         }
     235                 :           2 :         m_cv.notify_all();
     236                 :           2 :         return {std::move(futures)};
     237                 :           4 :     }
     238                 :             : 
     239                 :             :     /**
     240                 :             :      * @brief Execute a single queued task synchronously.
     241                 :             :      * Removes one task from the queue and executes it on the calling thread.
     242                 :             :      */
     243                 :         223 :     bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     244                 :             :     {
     245         [ +  - ]:         223 :         std::packaged_task<void()> task;
     246                 :         223 :         {
     247         [ +  - ]:         223 :             LOCK(m_mutex);
     248   [ +  +  +  - ]:         223 :             if (m_work_queue.empty()) return false;
     249                 :             : 
     250                 :             :             // Pop the task
     251                 :          41 :             task = std::move(m_work_queue.front());
     252         [ +  - ]:          41 :             m_work_queue.pop();
     253                 :         182 :         }
     254         [ +  - ]:          41 :         task();
     255                 :             :         return true;
     256                 :         223 :     }
     257                 :             : 
     258                 :             :     /**
     259                 :             :      * @brief Stop accepting new tasks and begin asynchronous shutdown.
     260                 :             :      *
     261                 :             :      * Wakes all worker threads so they can drain the queue and exit.
     262                 :             :      * Unlike Stop(), this function does not wait for threads to finish.
     263                 :             :      *
     264                 :             :      * Note: The next step in the pool lifecycle is calling Stop(), which
     265                 :             :      *       releases any dangling resources and resets the pool state
     266                 :             :      *       for shutdown or restart.
     267                 :             :      */
     268                 :           5 :     void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     269                 :             :     {
     270         [ +  - ]:          10 :         WITH_LOCK(m_mutex, m_interrupt = true);
     271                 :           5 :         m_cv.notify_all();
     272                 :           5 :     }
     273                 :             : 
     274                 :          16 :     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     275                 :             :     {
     276   [ #  #  #  #  :          32 :         return WITH_LOCK(m_mutex, return m_work_queue.size());
           #  # ][ +  -  
          +  -  +  -  +  
          -  +  -  +  -  
             -  +  +  - ]
     277                 :             :     }
     278                 :             : 
     279                 :          54 :     size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     280                 :             :     {
     281   [ +  -  +  -  :         108 :         return WITH_LOCK(m_mutex, return m_workers.size());
          +  -  +  -  -  
                +  +  - ]
     282                 :             :     }
     283                 :             : };
     284                 :             : 
     285                 :           7 : constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
     286      [ +  +  - ]:           7 :     switch (err) {
     287                 :           4 :         case ThreadPool::SubmitError::Inactive:
     288                 :           4 :             return "No active workers";
     289                 :           3 :         case ThreadPool::SubmitError::Interrupted:
     290                 :           3 :             return "Interrupted";
     291                 :             :     }
     292                 :           0 :     Assume(false); // Unreachable
     293                 :           0 :     return "Unknown error";
     294                 :             : }
     295                 :             : 
     296                 :             : #endif // BITCOIN_UTIL_THREADPOOL_H
        

Generated by: LCOV version 2.0-1