LCOV - code coverage report
Current view: top level - src - scheduler.cpp (source / functions) Coverage Total Hit
Test: fuzz_coverage.info Lines: 83.2 % 113 94
Test Date: 2024-11-04 04:15:01 Functions: 70.6 % 17 12
Branches: 46.2 % 106 49

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) 2015-2022 The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #include <scheduler.h>
       6                 :             : 
       7                 :             : #include <sync.h>
       8                 :             : #include <util/time.h>
       9                 :             : 
      10                 :             : #include <cassert>
      11                 :             : #include <functional>
      12                 :             : #include <utility>
      13                 :             : 
      14                 :        1172 : CScheduler::CScheduler() = default;
      15                 :             : 
      16                 :        1172 : CScheduler::~CScheduler()
      17                 :             : {
      18         [ -  + ]:        1172 :     assert(nThreadsServicingQueue == 0);
      19   [ -  +  -  - ]:        1172 :     if (stopWhenEmpty) assert(taskQueue.empty());
      20                 :        1172 : }
      21                 :             : 
      22                 :             : 
      23                 :        1172 : void CScheduler::serviceQueue()
      24                 :             : {
      25                 :        1172 :     WAIT_LOCK(newTaskMutex, lock);
      26                 :        1172 :     ++nThreadsServicingQueue;
      27                 :             : 
      28                 :             :     // newTaskMutex is locked throughout this loop EXCEPT
      29                 :             :     // when the thread is waiting or when the user's function
      30                 :             :     // is called.
      31         [ +  + ]:      931738 :     while (!shouldStop()) {
      32                 :             :         try {
      33   [ +  +  +  + ]:     2657631 :             while (!shouldStop() && taskQueue.empty()) {
      34                 :             :                 // Wait until there is something to do.
      35         [ +  - ]:      398810 :                 newTaskScheduled.wait(lock);
      36                 :             :             }
      37                 :             : 
      38                 :             :             // Wait until either there is a new task, or until
      39                 :             :             // the time of the first item on the queue:
      40                 :             : 
      41   [ +  +  +  - ]:     1860011 :             while (!shouldStop() && !taskQueue.empty()) {
      42         [ +  - ]:      929445 :                 std::chrono::steady_clock::time_point timeToWaitFor = taskQueue.begin()->first;
      43   [ -  +  +  - ]:      929445 :                 if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
      44                 :             :                     break; // Exit loop after timeout, it means we reached the time of the event
      45                 :             :                 }
      46                 :             :             }
      47                 :             : 
      48                 :             :             // If there are multiple threads, the queue can empty while we're waiting (another
      49                 :             :             // thread may service the task we were waiting on).
      50   [ +  +  +  - ]:     1860004 :             if (shouldStop() || taskQueue.empty())
      51                 :        1128 :                 continue;
      52                 :             : 
      53         [ +  - ]:      929438 :             Function f = taskQueue.begin()->second;
      54                 :      929438 :             taskQueue.erase(taskQueue.begin());
      55                 :             : 
      56                 :      929438 :             {
      57                 :             :                 // Unlock before calling f, so it can reschedule itself or another task
      58                 :             :                 // without deadlocking:
      59         [ +  - ]:      929438 :                 REVERSE_LOCK(lock);
      60         [ +  - ]:      929438 :                 f();
      61                 :      929438 :             }
      62                 :      929438 :         } catch (...) {
      63                 :           0 :             --nThreadsServicingQueue;
      64                 :           0 :             throw;
      65                 :           0 :         }
      66                 :             :     }
      67                 :        1172 :     --nThreadsServicingQueue;
      68         [ +  - ]:        1172 :     newTaskScheduled.notify_one();
      69                 :        1172 : }
      70                 :             : 
      71                 :      929445 : void CScheduler::schedule(CScheduler::Function f, std::chrono::steady_clock::time_point t)
      72                 :             : {
      73                 :      929445 :     {
      74                 :      929445 :         LOCK(newTaskMutex);
      75   [ +  -  +  -  :     1858890 :         taskQueue.insert(std::make_pair(t, f));
                   +  - ]
      76                 :      929445 :     }
      77                 :      929445 :     newTaskScheduled.notify_one();
      78                 :      929445 : }
      79                 :             : 
      80                 :           4 : void CScheduler::MockForward(std::chrono::seconds delta_seconds)
      81                 :             : {
      82   [ +  -  -  + ]:           4 :     assert(delta_seconds > 0s && delta_seconds <= 1h);
      83                 :             : 
      84                 :           4 :     {
      85                 :           4 :         LOCK(newTaskMutex);
      86                 :             : 
      87                 :             :         // use temp_queue to maintain updated schedule
      88                 :           4 :         std::multimap<std::chrono::steady_clock::time_point, Function> temp_queue;
      89                 :             : 
      90         [ -  + ]:           4 :         for (const auto& element : taskQueue) {
      91         [ #  # ]:           0 :             temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
      92                 :             :         }
      93                 :             : 
      94                 :             :         // point taskQueue to temp_queue
      95                 :           4 :         taskQueue = std::move(temp_queue);
      96         [ +  - ]:           4 :     }
      97                 :             : 
      98                 :             :     // notify that the taskQueue needs to be processed
      99                 :           4 :     newTaskScheduled.notify_one();
     100                 :           4 : }
     101                 :             : 
     102                 :           0 : static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
     103                 :             : {
     104                 :           0 :     f();
     105   [ #  #  #  #  :           0 :     s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
             #  #  #  # ]
     106                 :           0 : }
     107                 :             : 
     108                 :           0 : void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
     109                 :             : {
     110   [ #  #  #  #  :           0 :     scheduleFromNow([this, f, delta] { Repeat(*this, f, delta); }, delta);
             #  #  #  # ]
     111                 :           0 : }
     112                 :             : 
     113                 :           0 : size_t CScheduler::getQueueInfo(std::chrono::steady_clock::time_point& first,
     114                 :             :                                 std::chrono::steady_clock::time_point& last) const
     115                 :             : {
     116                 :           0 :     LOCK(newTaskMutex);
     117         [ #  # ]:           0 :     size_t result = taskQueue.size();
     118         [ #  # ]:           0 :     if (!taskQueue.empty()) {
     119                 :           0 :         first = taskQueue.begin()->first;
     120                 :           0 :         last = taskQueue.rbegin()->first;
     121                 :             :     }
     122         [ #  # ]:           0 :     return result;
     123                 :           0 : }
     124                 :             : 
     125                 :        1172 : bool CScheduler::AreThreadsServicingQueue() const
     126                 :             : {
     127                 :        1172 :     LOCK(newTaskMutex);
     128         [ +  - ]:        1172 :     return nThreadsServicingQueue;
     129                 :        1172 : }
     130                 :             : 
     131                 :             : 
     132                 :     1450650 : void SerialTaskRunner::MaybeScheduleProcessQueue()
     133                 :             : {
     134                 :     1450650 :     {
     135                 :     1450650 :         LOCK(m_callbacks_mutex);
     136                 :             :         // Try to avoid scheduling too many copies here, but if we
     137                 :             :         // accidentally have two ProcessQueue's scheduled at once its
     138                 :             :         // not a big deal.
     139         [ +  + ]:     1450650 :         if (m_are_callbacks_running) return;
     140         [ +  + ]:     1342595 :         if (m_callbacks_pending.empty()) return;
     141                 :      521205 :     }
     142         [ +  - ]:     2788328 :     m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now());
     143                 :             : }
     144                 :             : 
     145                 :      930610 : void SerialTaskRunner::ProcessQueue()
     146                 :             : {
     147         [ +  - ]:      930610 :     std::function<void()> callback;
     148                 :      930610 :     {
     149         [ +  - ]:      930610 :         LOCK(m_callbacks_mutex);
     150         [ +  - ]:      930610 :         if (m_are_callbacks_running) return;
     151         [ +  + ]:      930610 :         if (m_callbacks_pending.empty()) return;
     152                 :      725325 :         m_are_callbacks_running = true;
     153                 :             : 
     154                 :      725325 :         callback = std::move(m_callbacks_pending.front());
     155         [ +  - ]:      725325 :         m_callbacks_pending.pop_front();
     156                 :      205285 :     }
     157                 :             : 
     158                 :             :     // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
     159                 :             :     // to ensure both happen safely even if callback() throws.
     160                 :      725325 :     struct RAIICallbacksRunning {
     161                 :             :         SerialTaskRunner* instance;
     162                 :      725325 :         explicit RAIICallbacksRunning(SerialTaskRunner* _instance) : instance(_instance) {}
     163                 :      725325 :         ~RAIICallbacksRunning()
     164                 :             :         {
     165                 :      725325 :             {
     166                 :      725325 :                 LOCK(instance->m_callbacks_mutex);
     167         [ +  - ]:      725325 :                 instance->m_are_callbacks_running = false;
     168                 :      725325 :             }
     169                 :      725325 :             instance->MaybeScheduleProcessQueue();
     170                 :      725325 :         }
     171                 :      725325 :     } raiicallbacksrunning(this);
     172                 :             : 
     173         [ +  - ]:      725325 :     callback();
     174                 :      930610 : }
     175                 :             : 
     176                 :      725325 : void SerialTaskRunner::insert(std::function<void()> func)
     177                 :             : {
     178                 :      725325 :     {
     179                 :      725325 :         LOCK(m_callbacks_mutex);
     180         [ +  - ]:      725325 :         m_callbacks_pending.emplace_back(std::move(func));
     181                 :      725325 :     }
     182                 :      725325 :     MaybeScheduleProcessQueue();
     183                 :      725325 : }
     184                 :             : 
     185                 :        1172 : void SerialTaskRunner::flush()
     186                 :             : {
     187         [ +  - ]:        1172 :     assert(!m_scheduler.AreThreadsServicingQueue());
     188                 :             :     bool should_continue = true;
     189         [ +  + ]:        2344 :     while (should_continue) {
     190                 :        1172 :         ProcessQueue();
     191                 :        1172 :         LOCK(m_callbacks_mutex);
     192         [ +  - ]:        1172 :         should_continue = !m_callbacks_pending.empty();
     193                 :        1172 :     }
     194                 :        1172 : }
     195                 :             : 
     196                 :      104191 : size_t SerialTaskRunner::size()
     197                 :             : {
     198                 :      104191 :     LOCK(m_callbacks_mutex);
     199         [ +  - ]:      104191 :     return m_callbacks_pending.size();
     200                 :             : }
        

Generated by: LCOV version 2.0-1