LCOV - code coverage report
Current view: top level - src/util - threadpool.h (source / functions) Coverage Total Hit
Test: total_coverage.info Lines: 97.5 % 80 78
Test Date: 2026-02-25 05:45:00 Functions: 100.0 % 28 28
Branches: 57.9 % 107 62

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or https://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #ifndef BITCOIN_UTIL_THREADPOOL_H
       6                 :             : #define BITCOIN_UTIL_THREADPOOL_H
       7                 :             : 
       8                 :             : #include <sync.h>
       9                 :             : #include <tinyformat.h>
      10                 :             : #include <util/expected.h>
      11                 :             : #include <util/check.h>
      12                 :             : #include <util/thread.h>
      13                 :             : 
      14                 :             : #include <algorithm>
      15                 :             : #include <condition_variable>
      16                 :             : #include <functional>
      17                 :             : #include <future>
      18                 :             : #include <queue>
      19                 :             : #include <thread>
      20                 :             : #include <type_traits>
      21                 :             : #include <utility>
      22                 :             : #include <vector>
      23                 :             : 
      24                 :             : /**
      25                 :             :  * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
      26                 :             :  *
      27                 :             :  * The thread pool maintains a set of worker threads that consume and execute
      28                 :             :  * tasks submitted through Submit(). Once started, tasks can be queued and
      29                 :             :  * processed asynchronously until Stop() is called.
      30                 :             :  *
      31                 :             :  * ### Thread-safety and lifecycle
      32                 :             :  * - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
      33                 :             :  *   Calling `Stop()` from a worker thread will deadlock, as it waits for all
      34                 :             :  *   workers to join, including the current one.
      35                 :             :  *
      36                 :             :  * - `Submit()` can be called from any thread, including workers. It safely
      37                 :             :  *   enqueues new work for execution as long as the pool has active workers.
      38                 :             :  *
      39                 :             :  * - `Interrupt()` stops new task submission and lets queued ones drain
      40                 :             :  *   in the background. Callers can continue other shutdown steps and call
      41                 :             :  *   Stop() at the end to ensure no remaining tasks are left to execute.
      42                 :             :  *
      43                 :             :  * - `Stop()` prevents further task submission and blocks until all the
      44                 :             :  *   queued ones are completed.
      45                 :             :  */
      46                 :             : class ThreadPool
      47                 :             : {
      48                 :             : private:
      49                 :             :     std::string m_name;
      50                 :             :     Mutex m_mutex;
      51                 :             :     std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
      52                 :             :     std::condition_variable m_cv;
      53                 :             :     // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
      54                 :             :     // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
      55                 :             :     // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
      56                 :             :     bool m_interrupt GUARDED_BY(m_mutex){false};
      57                 :             :     std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
      58                 :             : 
      59                 :        2384 :     void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
      60                 :             :     {
      61                 :        2384 :         WAIT_LOCK(m_mutex, wait_lock);
      62                 :      409424 :         for (;;) {
      63         [ +  + ]:      205904 :             std::packaged_task<void()> task;
      64                 :      205904 :             {
      65                 :             :                 // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
      66   [ +  +  +  + ]:      205904 :                 if (!m_interrupt && m_work_queue.empty()) {
      67                 :             :                     // Block until the pool is interrupted or a task is available.
      68   [ +  +  +  +  :      611511 :                     m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
                   +  - ]
      69                 :             :                 }
      70                 :             : 
      71                 :             :                 // If stopped and no work left, exit worker
      72   [ +  +  +  + ]:      205904 :                 if (m_interrupt && m_work_queue.empty()) {
      73                 :        4768 :                     return;
      74                 :             :                 }
      75                 :             : 
      76                 :      203520 :                 task = std::move(m_work_queue.front());
      77                 :      203520 :                 m_work_queue.pop();
      78                 :             :             }
      79                 :             : 
      80                 :      203520 :             {
      81                 :             :                 // Execute the task without the lock
      82         [ +  - ]:      203520 :                 REVERSE_LOCK(wait_lock, m_mutex);
      83         [ +  - ]:      203520 :                 task();
      84                 :      203520 :             }
      85         [ +  - ]:      205904 :         }
      86                 :        2384 :     }
      87                 :             : 
      88                 :             : public:
      89   [ -  +  +  - ]:        2768 :     explicit ThreadPool(const std::string& name) : m_name(name) {}
      90                 :             : 
      91                 :        1384 :     ~ThreadPool()
      92                 :             :     {
      93                 :        1384 :         Stop(); // In case it hasn't been stopped.
      94                 :        1384 :     }
      95                 :             : 
      96                 :             :     /**
      97                 :             :      * @brief Start worker threads.
      98                 :             :      *
      99                 :             :      * Creates and launches `num_workers` threads that begin executing tasks
     100                 :             :      * from the queue. If the pool is already started, throws.
     101                 :             :      *
     102                 :             :      * Must be called from a controller (non-worker) thread.
     103                 :             :      */
     104                 :        1140 :     void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     105                 :             :     {
     106         [ -  + ]:        1140 :         assert(num_workers > 0);
     107                 :        1140 :         LOCK(m_mutex);
     108   [ -  +  -  - ]:        1140 :         if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
     109                 :        1140 :         m_interrupt = false; // Reset
     110                 :             : 
     111                 :             :         // Create workers
     112         [ +  - ]:        1140 :         m_workers.reserve(num_workers);
     113         [ +  + ]:        3524 :         for (int i = 0; i < num_workers; i++) {
     114   [ +  -  +  - ]:        7152 :             m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
     115                 :             :         }
     116                 :        1140 :     }
     117                 :             : 
     118                 :             :     /**
     119                 :             :      * @brief Stop all worker threads and wait for them to exit.
     120                 :             :      *
     121                 :             :      * Sets the interrupt flag, wakes all waiting workers, and joins them.
     122                 :             :      * Any remaining tasks in the queue will be processed before returning.
     123                 :             :      *
     124                 :             :      * Must be called from a controller (non-worker) thread.
     125                 :             :      */
     126                 :        2572 :     void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     127                 :             :     {
     128                 :             :         // Notify workers and join them
     129                 :        2572 :         std::vector<std::thread> threads_to_join;
     130                 :        2572 :         {
     131         [ +  - ]:        2572 :             LOCK(m_mutex);
     132                 :             :             // Ensure Stop() is not called from a worker thread while workers are still registered,
     133                 :             :             // otherwise a self-join deadlock would occur.
     134                 :        2572 :             auto id = std::this_thread::get_id();
     135   [ -  +  +  + ]:        4956 :             for (const auto& worker : m_workers) assert(worker.get_id() != id);
     136                 :             :             // Early shutdown to return right away on any concurrent Submit() call
     137                 :        2572 :             m_interrupt = true;
     138         [ +  - ]:        2572 :             threads_to_join.swap(m_workers);
     139                 :        2572 :         }
     140                 :        2572 :         m_cv.notify_all();
     141   [ +  -  +  + ]:        4956 :         for (auto& worker : threads_to_join) worker.join();
     142                 :             :         // Since we currently wait for tasks completion, sanity-check empty queue
     143   [ +  -  +  - ]:        5144 :         WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
     144                 :             :         // Note: m_interrupt is left true until next Start()
     145                 :        2572 :     }
     146                 :             : 
     147                 :             :     enum class SubmitError {
     148                 :             :         Inactive,
     149                 :             :         Interrupted,
     150                 :             :     };
     151                 :             : 
     152                 :             :     /**
     153                 :             :      * @brief Enqueues a new task for asynchronous execution.
     154                 :             :      *
     155                 :             :      * @param  fn Callable to execute asynchronously.
     156                 :             :      * @return On success, a future containing fn's result.
     157                 :             :      *         On failure, an error indicating why the task was rejected:
     158                 :             :      *         - SubmitError::Inactive: Pool has no workers (never started or already stopped).
     159                 :             :      *         - SubmitError::Interrupted: Pool task acceptance has been interrupted.
     160                 :             :      *
     161                 :             :      * Thread-safe: Can be called from any thread, including within the provided 'fn' callable.
     162                 :             :      *
     163                 :             :      * @warning Ignoring the returned future requires guarding the task against
     164                 :             :      *          uncaught exceptions, as they would otherwise be silently discarded.
     165                 :             :      */
     166                 :             :     template <class F>
     167                 :      203542 :     [[nodiscard]] util::Expected<std::future<std::invoke_result_t<F>>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     168                 :             :     {
     169                 :      203542 :         std::packaged_task<std::invoke_result_t<F>()> task{std::forward<F>(fn)};
     170                 :      203542 :         auto future{task.get_future()};
     171                 :             :         {
     172                 :      203542 :             LOCK(m_mutex);
     173         [ +  + ]:      203542 :             if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
     174         [ +  + ]:      203541 :             if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
     175                 :             : 
     176         [ +  - ]:      203540 :             m_work_queue.emplace(std::move(task));
     177                 :           2 :         }
     178                 :      203540 :         m_cv.notify_one();
     179                 :      203540 :         return {std::move(future)};
     180                 :      203542 :     }
     181                 :             : 
     182                 :             :     /**
     183                 :             :      * @brief Execute a single queued task synchronously.
     184                 :             :      * Removes one task from the queue and executes it on the calling thread.
     185                 :             :      */
     186                 :          20 :     void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     187                 :             :     {
     188         [ +  - ]:          20 :         std::packaged_task<void()> task;
     189                 :          20 :         {
     190         [ +  - ]:          20 :             LOCK(m_mutex);
     191   [ -  +  -  - ]:          20 :             if (m_work_queue.empty()) return;
     192                 :             : 
     193                 :             :             // Pop the task
     194                 :          20 :             task = std::move(m_work_queue.front());
     195         [ +  - ]:          20 :             m_work_queue.pop();
     196                 :          20 :         }
     197         [ +  - ]:          20 :         task();
     198                 :          20 :     }
     199                 :             : 
     200                 :             :     /**
     201                 :             :      * @brief Stop accepting new tasks and begin asynchronous shutdown.
     202                 :             :      *
     203                 :             :      * Wakes all worker threads so they can drain the queue and exit.
     204                 :             :      * Unlike Stop(), this function does not wait for threads to finish.
     205                 :             :      */
     206                 :        1184 :     void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     207                 :             :     {
     208         [ +  - ]:        2368 :         WITH_LOCK(m_mutex, m_interrupt = true);
     209                 :        1184 :         m_cv.notify_all();
     210                 :        1184 :     }
     211                 :             : 
     212                 :      203213 :     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     213                 :             :     {
     214   [ +  -  +  +  :      406426 :         return WITH_LOCK(m_mutex, return m_work_queue.size());
          +  -  +  -  -  
           +  +  - ][ #  
             #  #  #  #  
                      # ]
     215                 :             :     }
     216                 :             : 
     217                 :           3 :     size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     218                 :             :     {
     219   [ +  -  +  -  :           6 :         return WITH_LOCK(m_mutex, return m_workers.size());
          +  -  -  +  +  
                      - ]
     220                 :             :     }
     221                 :             : };
     222                 :             : 
     223                 :           2 : constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
     224      [ +  +  - ]:           2 :     switch (err) {
     225                 :           1 :         case ThreadPool::SubmitError::Inactive:
     226                 :           1 :             return "No active workers";
     227                 :           1 :         case ThreadPool::SubmitError::Interrupted:
     228                 :           1 :             return "Interrupted";
     229                 :             :     }
     230                 :           0 :     Assume(false); // Unreachable
     231                 :           0 :     return "Unknown error";
     232                 :             : }
     233                 :             : 
     234                 :             : #endif // BITCOIN_UTIL_THREADPOOL_H
        

Generated by: LCOV version 2.0-1