LCOV - code coverage report
Current view: top level - src - checkqueue.h (source / functions) Coverage Total Hit
Test: fuzz_coverage.info Lines: 100.0 % 83 83
Test Date: 2025-01-22 04:09:46 Functions: 93.8 % 16 15
Branches: 74.0 % 96 71

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) 2012-2022 The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #ifndef BITCOIN_CHECKQUEUE_H
       6                 :             : #define BITCOIN_CHECKQUEUE_H
       7                 :             : 
       8                 :             : #include <logging.h>
       9                 :             : #include <sync.h>
      10                 :             : #include <tinyformat.h>
      11                 :             : #include <util/threadnames.h>
      12                 :             : 
      13                 :             : #include <algorithm>
      14                 :             : #include <iterator>
      15                 :             : #include <optional>
      16                 :             : #include <vector>
      17                 :             : 
      18                 :             : /**
      19                 :             :  * Queue for verifications that have to be performed.
      20                 :             :   * The verifications are represented by a type T, which must provide an
      21                 :             :   * operator(), returning an std::optional<R>.
      22                 :             :   *
      23                 :             :   * The overall result of the computation is std::nullopt if all invocations
      24                 :             :   * return std::nullopt, or one of the other results otherwise.
      25                 :             :   *
      26                 :             :   * One thread (the master) is assumed to push batches of verifications
      27                 :             :   * onto the queue, where they are processed by N-1 worker threads. When
      28                 :             :   * the master is done adding work, it temporarily joins the worker pool
      29                 :             :   * as an N'th worker, until all jobs are done.
      30                 :             :   *
      31                 :             :   */
      32                 :             : template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
      33                 :             : class CCheckQueue
      34                 :             : {
      35                 :             : private:
      36                 :             :     //! Mutex to protect the inner state
      37                 :             :     Mutex m_mutex;
      38                 :             : 
      39                 :             :     //! Worker threads block on this when out of work
      40                 :             :     std::condition_variable m_worker_cv;
      41                 :             : 
      42                 :             :     //! Master thread blocks on this when out of work
      43                 :             :     std::condition_variable m_master_cv;
      44                 :             : 
      45                 :             :     //! The queue of elements to be processed.
      46                 :             :     //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
      47                 :             :     std::vector<T> queue GUARDED_BY(m_mutex);
      48                 :             : 
      49                 :             :     //! The number of workers (including the master) that are idle.
      50                 :             :     int nIdle GUARDED_BY(m_mutex){0};
      51                 :             : 
      52                 :             :     //! The total number of workers (including the master).
      53                 :             :     int nTotal GUARDED_BY(m_mutex){0};
      54                 :             : 
      55                 :             :     //! The temporary evaluation result.
      56                 :             :     std::optional<R> m_result GUARDED_BY(m_mutex);
      57                 :             : 
      58                 :             :     /**
      59                 :             :      * Number of verifications that haven't completed yet.
      60                 :             :      * This includes elements that are no longer queued, but still in the
      61                 :             :      * worker's own batches.
      62                 :             :      */
      63                 :             :     unsigned int nTodo GUARDED_BY(m_mutex){0};
      64                 :             : 
      65                 :             :     //! The maximum number of elements to be processed in one batch
      66                 :             :     const unsigned int nBatchSize;
      67                 :             : 
      68                 :             :     std::vector<std::thread> m_worker_threads;
      69                 :             :     bool m_request_stop GUARDED_BY(m_mutex){false};
      70                 :             : 
      71                 :             :     /** Internal function that does bulk of the verification work. If fMaster, return the final result. */
      72                 :      270765 :     std::optional<R> Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
      73                 :             :     {
      74         [ +  + ]:      270765 :         std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
      75                 :      270765 :         std::vector<T> vChecks;
      76         [ +  - ]:      270765 :         vChecks.reserve(nBatchSize);
      77                 :      270765 :         unsigned int nNow = 0;
      78                 :      270765 :         std::optional<R> local_result;
      79                 :             :         bool do_work;
      80                 :         261 :         do {
      81                 :             :             {
      82         [ +  - ]:      275319 :                 WAIT_LOCK(m_mutex, lock);
      83                 :             :                 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
      84         [ +  + ]:      275319 :                 if (nNow) {
      85   [ +  +  +  - ]:        4554 :                     if (local_result.has_value() && !m_result.has_value()) {
      86                 :          97 :                         std::swap(local_result, m_result);
      87                 :             :                     }
      88                 :        4554 :                     nTodo -= nNow;
      89   [ +  +  +  + ]:        4554 :                     if (nTodo == 0 && !fMaster) {
      90                 :             :                         // We processed the last element; inform the master it can exit and return the result
      91                 :          40 :                         m_master_cv.notify_one();
      92                 :             :                     }
      93                 :             :                 } else {
      94                 :             :                     // first iteration
      95                 :      270765 :                     nTotal++;
      96                 :             :                 }
      97                 :             :                 // logically, the do loop starts here
      98   [ +  +  +  + ]:      279681 :                 while (queue.empty() && !m_request_stop) {
      99   [ +  +  +  + ]:      271045 :                     if (fMaster && nTodo == 0) {
     100                 :      266683 :                         nTotal--;
     101                 :      266683 :                         std::optional<R> to_return = std::move(m_result);
     102                 :             :                         // reset the status for new work later
     103         [ +  + ]:      266683 :                         m_result = std::nullopt;
     104                 :             :                         // return the current status
     105                 :         157 :                         return to_return;
     106                 :             :                     }
     107                 :        4362 :                     nIdle++;
     108         [ +  - ]:        4362 :                     cond.wait(lock); // wait
     109                 :        4362 :                     nIdle--;
     110                 :             :                 }
     111         [ +  + ]:        8636 :                 if (m_request_stop) {
     112                 :             :                     // return value does not matter, because m_request_stop is only set in the destructor.
     113                 :        4082 :                     return std::nullopt;
     114                 :             :                 }
     115                 :             : 
     116                 :             :                 // Decide how many work units to process now.
     117                 :             :                 // * Do not try to do everything at once, but aim for increasingly smaller batches so
     118                 :             :                 //   all workers finish approximately simultaneously.
     119                 :             :                 // * Try to account for idle jobs which will instantly start helping.
     120                 :             :                 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
     121   [ +  +  +  + ]:        5230 :                 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
     122         [ +  - ]:        4554 :                 auto start_it = queue.end() - nNow;
     123         [ +  - ]:        4554 :                 vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
     124                 :        4554 :                 queue.erase(start_it, queue.end());
     125                 :             :                 // Check whether we need to do work at all
     126         [ +  - ]:        4554 :                 do_work = !m_result.has_value();
     127                 :      270765 :             }
     128                 :             :             // execute work
     129         [ +  + ]:        4554 :             if (do_work) {
     130         [ +  + ]:        3326 :                 for (T& check : vChecks) {
     131         [ +  + ]:        2734 :                     local_result = check();
           [ +  -  +  - ]
     132         [ +  + ]:        2473 :                     if (local_result.has_value()) break;
     133                 :             :                 }
     134                 :             :             }
     135         [ +  - ]:        4711 :             vChecks.clear();
     136                 :             :         } while (true);
     137                 :      270765 :     }
     138                 :             : 
     139                 :             : public:
     140                 :             :     //! Mutex to ensure only one concurrent CCheckQueueControl
     141                 :             :     Mutex m_control_mutex;
     142                 :             : 
     143                 :             :     //! Create a new check queue
     144                 :        2219 :     explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
     145         [ +  - ]:        2219 :         : nBatchSize(batch_size)
     146                 :             :     {
     147         [ +  - ]:        2219 :         LogInfo("Script verification uses %d additional threads", worker_threads_num);
     148         [ +  - ]:        2219 :         m_worker_threads.reserve(worker_threads_num);
     149         [ +  + ]:        6301 :         for (int n = 0; n < worker_threads_num; ++n) {
     150         [ +  - ]:        4082 :             m_worker_threads.emplace_back([this, n]() {
     151         [ +  - ]:        4082 :                 util::ThreadRename(strprintf("scriptch.%i", n));
     152                 :        4082 :                 Loop(false /* worker thread */);
     153                 :             :             });
     154                 :             :         }
     155                 :        2219 :     }
     156                 :             : 
     157                 :             :     // Since this class manages its own resources, which is a thread
     158                 :             :     // pool `m_worker_threads`, copy and move operations are not appropriate.
     159                 :             :     CCheckQueue(const CCheckQueue&) = delete;
     160                 :             :     CCheckQueue& operator=(const CCheckQueue&) = delete;
     161                 :             :     CCheckQueue(CCheckQueue&&) = delete;
     162                 :             :     CCheckQueue& operator=(CCheckQueue&&) = delete;
     163                 :             : 
     164                 :             :     //! Join the execution until completion. If at least one evaluation wasn't successful, return
     165                 :             :     //! its error.
     166                 :      266683 :     std::optional<R> Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     167                 :             :     {
     168         [ +  - ]:      266594 :         return Loop(true /* master thread */);
     169                 :             :     }
     170                 :             : 
     171                 :             :     //! Add a batch of checks to the queue
     172                 :       22351 :     void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     173                 :             :     {
     174         [ +  + ]:       22351 :         if (vChecks.empty()) {
     175                 :             :             return;
     176                 :             :         }
     177                 :             : 
     178                 :             :         {
     179                 :         361 :             LOCK(m_mutex);
     180         [ +  - ]:         361 :             queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
     181         [ +  - ]:         361 :             nTodo += vChecks.size();
     182         [ +  + ]:         361 :         }
     183                 :             : 
     184         [ +  + ]:         361 :         if (vChecks.size() == 1) {
     185                 :         244 :             m_worker_cv.notify_one();
     186                 :             :         } else {
     187                 :         117 :             m_worker_cv.notify_all();
     188                 :             :         }
     189                 :             :     }
     190                 :             : 
     191                 :        2219 :     ~CCheckQueue()
     192                 :             :     {
     193         [ +  - ]:        4438 :         WITH_LOCK(m_mutex, m_request_stop = true);
     194                 :        2219 :         m_worker_cv.notify_all();
     195         [ +  + ]:        6301 :         for (std::thread& t : m_worker_threads) {
     196                 :        4082 :             t.join();
     197                 :             :         }
     198                 :        2219 :     }
     199                 :             : 
     200                 :      268567 :     bool HasThreads() const { return !m_worker_threads.empty(); }
     201                 :             : };
     202                 :             : 
     203                 :             : /**
     204                 :             :  * RAII-style controller object for a CCheckQueue that guarantees the passed
     205                 :             :  * queue is finished before continuing.
     206                 :             :  */
     207                 :             : template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
     208                 :             : class CCheckQueueControl
     209                 :             : {
     210                 :             : private:
     211                 :             :     CCheckQueue<T, R> * const pqueue;
     212                 :             :     bool fDone;
     213                 :             : 
     214                 :             : public:
     215                 :             :     CCheckQueueControl() = delete;
     216                 :             :     CCheckQueueControl(const CCheckQueueControl&) = delete;
     217                 :             :     CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
     218                 :      266615 :     explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
     219                 :             :     {
     220                 :             :         // passed queue is supposed to be unused, or nullptr
     221         [ +  - ]:      266526 :         if (pqueue != nullptr) {
     222         [ +  - ]:      266615 :             ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
     223                 :             :         }
     224                 :             :     }
     225                 :             : 
     226                 :      266615 :     std::optional<R> Complete()
     227                 :             :     {
     228         [ -  + ]:      266615 :         if (pqueue == nullptr) return std::nullopt;
     229                 :      266615 :         auto ret = pqueue->Complete();
     230                 :      266615 :         fDone = true;
     231                 :      266615 :         return ret;
     232                 :      266526 :     }
     233                 :             : 
     234                 :       22281 :     void Add(std::vector<T>&& vChecks)
     235                 :             :     {
     236         [ +  - ]:       22281 :         if (pqueue != nullptr) {
     237         [ +  - ]:       22281 :             pqueue->Add(std::move(vChecks));
     238                 :             :         }
     239                 :             :     }
     240                 :             : 
     241                 :      266615 :     ~CCheckQueueControl()
     242                 :             :     {
     243         [ +  + ]:      266615 :         if (!fDone)
     244                 :          71 :             Complete();
     245         [ +  - ]:      266615 :         if (pqueue != nullptr) {
     246                 :      266615 :             LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
     247                 :             :         }
     248                 :      266615 :     }
     249                 :             : };
     250                 :             : 
     251                 :             : #endif // BITCOIN_CHECKQUEUE_H
        

Generated by: LCOV version 2.0-1