LCOV - code coverage report
Current view: top level - src/util - threadpool.h (source / functions) Coverage Total Hit
Test: test_bitcoin_coverage.info Lines: 100.0 % 72 72
Test Date: 2026-02-19 05:39:36 Functions: 96.3 % 27 26
Branches: 56.2 % 112 63

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or https://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #ifndef BITCOIN_UTIL_THREADPOOL_H
       6                 :             : #define BITCOIN_UTIL_THREADPOOL_H
       7                 :             : 
       8                 :             : #include <sync.h>
       9                 :             : #include <tinyformat.h>
      10                 :             : #include <util/check.h>
      11                 :             : #include <util/thread.h>
      12                 :             : 
      13                 :             : #include <algorithm>
      14                 :             : #include <condition_variable>
      15                 :             : #include <functional>
      16                 :             : #include <future>
      17                 :             : #include <queue>
      18                 :             : #include <stdexcept>
      19                 :             : #include <thread>
      20                 :             : #include <utility>
      21                 :             : #include <vector>
      22                 :             : 
      23                 :             : /**
      24                 :             :  * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
      25                 :             :  *
      26                 :             :  * The thread pool maintains a set of worker threads that consume and execute
      27                 :             :  * tasks submitted through Submit(). Once started, tasks can be queued and
      28                 :             :  * processed asynchronously until Stop() is called.
      29                 :             :  *
      30                 :             :  * ### Thread-safety and lifecycle
      31                 :             :  * - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
      32                 :             :  *   Calling `Stop()` from a worker thread will deadlock, as it waits for all
      33                 :             :  *   workers to join, including the current one.
      34                 :             :  *
      35                 :             :  * - `Submit()` can be called from any thread, including workers. It safely
      36                 :             :  *   enqueues new work for execution as long as the pool has active workers.
      37                 :             :  *
      38                 :             :  * - `Interrupt()` stops new task submission and lets queued ones drain
      39                 :             :  *   in the background. Callers can continue other shutdown steps and call
      40                 :             :  *   Stop() at the end to ensure no remaining tasks are left to execute.
      41                 :             :  *
      42                 :             :  * - `Stop()` prevents further task submission and blocks until all the
      43                 :             :  *   queued ones are completed.
      44                 :             :  */
      45                 :             : class ThreadPool
      46                 :             : {
      47                 :             : private:
      48                 :             :     std::string m_name;
      49                 :             :     Mutex m_mutex;
      50                 :             :     std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
      51                 :             :     std::condition_variable m_cv;
      52                 :             :     // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
      53                 :             :     // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
      54                 :             :     // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
      55                 :             :     bool m_interrupt GUARDED_BY(m_mutex){false};
      56                 :             :     std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
      57                 :             : 
      58                 :         106 :     void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
      59                 :             :     {
      60                 :         106 :         WAIT_LOCK(m_mutex, wait_lock);
      61                 :         688 :         for (;;) {
      62         [ +  + ]:         397 :             std::packaged_task<void()> task;
      63                 :         397 :             {
      64                 :             :                 // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
      65   [ +  +  +  + ]:         397 :                 if (!m_interrupt && m_work_queue.empty()) {
      66                 :             :                     // Block until the pool is interrupted or a task is available.
      67   [ +  +  +  +  :         612 :                     m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
                   +  - ]
      68                 :             :                 }
      69                 :             : 
      70                 :             :                 // If stopped and no work left, exit worker
      71   [ +  +  +  + ]:         397 :                 if (m_interrupt && m_work_queue.empty()) {
      72                 :         212 :                     return;
      73                 :             :                 }
      74                 :             : 
      75                 :         291 :                 task = std::move(m_work_queue.front());
      76                 :         291 :                 m_work_queue.pop();
      77                 :             :             }
      78                 :             : 
      79                 :         291 :             {
      80                 :             :                 // Execute the task without the lock
      81         [ +  - ]:         291 :                 REVERSE_LOCK(wait_lock, m_mutex);
      82         [ +  - ]:         291 :                 task();
      83                 :         291 :             }
      84         [ +  - ]:         397 :         }
      85                 :         106 :     }
      86                 :             : 
      87                 :             : public:
      88   [ -  +  +  - ]:         318 :     explicit ThreadPool(const std::string& name) : m_name(name) {}
      89                 :             : 
      90                 :         159 :     ~ThreadPool()
      91                 :             :     {
      92                 :         159 :         Stop(); // In case it hasn't been stopped.
      93                 :         159 :     }
      94                 :             : 
      95                 :             :     /**
      96                 :             :      * @brief Start worker threads.
      97                 :             :      *
      98                 :             :      * Creates and launches `num_workers` threads that begin executing tasks
      99                 :             :      * from the queue. If the pool is already started, throws.
     100                 :             :      *
     101                 :             :      * Must be called from a controller (non-worker) thread.
     102                 :             :      */
     103                 :          11 :     void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     104                 :             :     {
     105         [ -  + ]:          11 :         assert(num_workers > 0);
     106                 :          11 :         LOCK(m_mutex);
     107   [ -  +  -  - ]:          11 :         if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
     108                 :          11 :         m_interrupt = false; // Reset
     109                 :             : 
     110                 :             :         // Create workers
     111         [ +  - ]:          11 :         m_workers.reserve(num_workers);
     112         [ +  + ]:         117 :         for (int i = 0; i < num_workers; i++) {
     113   [ +  -  +  - ]:         318 :             m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
     114                 :             :         }
     115                 :          11 :     }
     116                 :             : 
     117                 :             :     /**
     118                 :             :      * @brief Stop all worker threads and wait for them to exit.
     119                 :             :      *
     120                 :             :      * Sets the interrupt flag, wakes all waiting workers, and joins them.
     121                 :             :      * Any remaining tasks in the queue will be processed before returning.
     122                 :             :      *
     123                 :             :      * Must be called from a controller (non-worker) thread.
     124                 :             :      */
     125                 :         166 :     void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     126                 :             :     {
     127                 :             :         // Notify workers and join them
     128                 :         166 :         std::vector<std::thread> threads_to_join;
     129                 :         166 :         {
     130         [ +  - ]:         166 :             LOCK(m_mutex);
     131                 :             :             // Ensure Stop() is not called from a worker thread while workers are still registered,
     132                 :             :             // otherwise a self-join deadlock would occur.
     133                 :         166 :             auto id = std::this_thread::get_id();
     134   [ -  +  +  + ]:         272 :             for (const auto& worker : m_workers) assert(worker.get_id() != id);
     135                 :             :             // Early shutdown to return right away on any concurrent Submit() call
     136                 :         166 :             m_interrupt = true;
     137         [ +  - ]:         166 :             threads_to_join.swap(m_workers);
     138                 :         166 :         }
     139                 :         166 :         m_cv.notify_all();
     140   [ +  -  +  + ]:         272 :         for (auto& worker : threads_to_join) worker.join();
     141                 :             :         // Since we currently wait for tasks completion, sanity-check empty queue
     142   [ +  -  +  - ]:         332 :         WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
     143                 :             :         // Note: m_interrupt is left true until next Start()
     144                 :         166 :     }
     145                 :             : 
     146                 :             :     /**
     147                 :             :      * @brief Enqueues a new task for asynchronous execution.
     148                 :             :      *
     149                 :             :      * Returns a `std::future` that provides the task's result or propagates
     150                 :             :      * any exception it throws.
     151                 :             :      * Note: Ignoring the returned future requires guarding the task against
     152                 :             :      * uncaught exceptions, as they would otherwise be silently discarded.
     153                 :             :      */
     154                 :             :     template <class F> [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     155                 :         313 :     auto Submit(F&& fn)
     156                 :             :     {
     157                 :         313 :         std::packaged_task task{std::forward<F>(fn)};
     158         [ +  - ]:         313 :         auto future{task.get_future()};
     159                 :             :         {
     160         [ +  - ]:         313 :             LOCK(m_mutex);
     161   [ +  +  +  + ]:         313 :             if (m_interrupt || m_workers.empty()) {
     162         [ +  - ]:           2 :                 throw std::runtime_error("No active workers; cannot accept new tasks");
     163                 :             :             }
     164   [ +  -  +  - ]:         311 :             m_work_queue.emplace(std::move(task));
     165                 :           2 :         }
     166                 :         311 :         m_cv.notify_one();
     167                 :         311 :         return future;
     168                 :         313 :     }
     169                 :             : 
     170                 :             :     /**
     171                 :             :      * @brief Execute a single queued task synchronously.
     172                 :             :      * Removes one task from the queue and executes it on the calling thread.
     173                 :             :      */
     174                 :          20 :     void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     175                 :             :     {
     176         [ +  - ]:          20 :         std::packaged_task<void()> task;
     177                 :          20 :         {
     178         [ +  - ]:          20 :             LOCK(m_mutex);
     179   [ -  +  -  - ]:          20 :             if (m_work_queue.empty()) return;
     180                 :             : 
     181                 :             :             // Pop the task
     182                 :          20 :             task = std::move(m_work_queue.front());
     183         [ +  - ]:          20 :             m_work_queue.pop();
     184                 :          20 :         }
     185         [ +  - ]:          20 :         task();
     186                 :          20 :     }
     187                 :             : 
     188                 :             :     /**
     189                 :             :      * @brief Stop accepting new tasks and begin asynchronous shutdown.
     190                 :             :      *
     191                 :             :      * Wakes all worker threads so they can drain the queue and exit.
     192                 :             :      * Unlike Stop(), this function does not wait for threads to finish.
     193                 :             :      */
     194                 :           3 :     void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     195                 :             :     {
     196         [ +  - ]:           6 :         WITH_LOCK(m_mutex, m_interrupt = true);
     197                 :           3 :         m_cv.notify_all();
     198                 :           3 :     }
     199                 :             : 
     200                 :           4 :     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     201                 :             :     {
     202   [ +  -  +  -  :           8 :         return WITH_LOCK(m_mutex, return m_work_queue.size());
          +  -  +  -  -  
           +  +  - ][ #  
             #  #  #  #  
                      # ]
     203                 :             :     }
     204                 :             : 
     205                 :           3 :     size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     206                 :             :     {
     207   [ +  -  +  -  :           6 :         return WITH_LOCK(m_mutex, return m_workers.size());
          +  -  -  +  +  
                      - ]
     208                 :             :     }
     209                 :             : };
     210                 :             : 
     211                 :             : #endif // BITCOIN_UTIL_THREADPOOL_H
        

Generated by: LCOV version 2.0-1