|              Branch data     Line data    Source code 
       1                 :             : // Copyright (c) 2012-2022 The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #include <random.h>
       6                 :             : #include <scheduler.h>
       7                 :             : #include <util/time.h>
       8                 :             : 
       9                 :             : #include <boost/test/unit_test.hpp>
      10                 :             : 
      11                 :             : #include <functional>
      12                 :             : #include <mutex>
      13                 :             : #include <thread>
      14                 :             : #include <vector>
      15                 :             : 
      16                 :             : BOOST_AUTO_TEST_SUITE(scheduler_tests)
      17                 :             : 
      18                 :         400 : static void microTask(CScheduler& s, std::mutex& mutex, int& counter, int delta, std::chrono::steady_clock::time_point rescheduleTime)
      19                 :             : {
      20                 :         400 :     {
      21                 :         400 :         std::lock_guard<std::mutex> lock(mutex);
      22                 :         400 :         counter += delta;
      23                 :         400 :     }
      24                 :         400 :     auto noTime = std::chrono::steady_clock::time_point::min();
      25         [ +  + ]:         400 :     if (rescheduleTime != noTime) {
      26                 :         200 :         CScheduler::Function f = std::bind(µTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime);
      27   [ +  -  +  - ]:         400 :         s.schedule(f, rescheduleTime);
      28                 :         200 :     }
      29                 :         400 : }
      30                 :             : 
      31   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(manythreads)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
      32                 :             : {
      33                 :             :     // Stress test: hundreds of microsecond-scheduled tasks,
      34                 :             :     // serviced by 10 threads.
      35                 :             :     //
      36                 :             :     // So... ten shared counters, which if all the tasks execute
      37                 :             :     // properly will sum to the number of tasks done.
      38                 :             :     // Each task adds or subtracts a random amount from one of the
      39                 :             :     // counters, and then schedules another task 0-1000
      40                 :             :     // microseconds in the future to subtract or add from
      41                 :             :     // the counter -random_amount+1, so in the end the shared
      42                 :             :     // counters should sum to the number of initial tasks performed.
      43                 :           1 :     CScheduler microTasks;
      44                 :             : 
      45                 :           1 :     std::mutex counterMutex[10];
      46                 :           1 :     int counter[10] = { 0 };
      47                 :           1 :     FastRandomContext rng{/*fDeterministic=*/true};
      48                 :         401 :     auto zeroToNine = [](FastRandomContext& rc) -> int { return rc.randrange(10); }; // [0, 9]
      49                 :         401 :     auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000]
      50                 :         401 :     auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000]
      51                 :             : 
      52                 :           1 :     auto start = std::chrono::steady_clock::now();
      53                 :           1 :     auto now = start;
      54                 :           1 :     std::chrono::steady_clock::time_point first, last;
      55         [ +  - ]:           1 :     size_t nTasks = microTasks.getQueueInfo(first, last);
      56   [ +  -  +  - ]:           2 :     BOOST_CHECK(nTasks == 0);
      57                 :             : 
      58         [ +  + ]:         101 :     for (int i = 0; i < 100; ++i) {
      59                 :         100 :         auto t = now + std::chrono::microseconds(randomMsec(rng));
      60                 :         200 :         auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
      61                 :         100 :         int whichCounter = zeroToNine(rng);
      62         [ +  - ]:         200 :         CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
      63                 :         100 :                                              std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
      64         [ +  - ]:         100 :                                              randomDelta(rng), tReschedule);
      65   [ +  -  +  - ]:         200 :         microTasks.schedule(f, t);
      66                 :         100 :     }
      67         [ +  - ]:           1 :     nTasks = microTasks.getQueueInfo(first, last);
      68   [ +  -  +  -  :           2 :     BOOST_CHECK(nTasks == 100);
                   +  - ]
      69   [ +  -  +  -  :           2 :     BOOST_CHECK(first < last);
                   +  - ]
      70   [ +  -  +  -  :           2 :     BOOST_CHECK(last > now);
                   +  - ]
      71                 :             : 
      72                 :             :     // As soon as these are created they will start running and servicing the queue
      73                 :           1 :     std::vector<std::thread> microThreads;
      74         [ +  - ]:           1 :     microThreads.reserve(10);
      75         [ +  + ]:           6 :     for (int i = 0; i < 5; i++)
      76         [ +  - ]:           5 :         microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks));
      77                 :             : 
      78         [ +  - ]:           1 :     UninterruptibleSleep(std::chrono::microseconds{600});
      79                 :           1 :     now = std::chrono::steady_clock::now();
      80                 :             : 
      81                 :             :     // More threads and more tasks:
      82         [ +  + ]:           6 :     for (int i = 0; i < 5; i++)
      83         [ +  - ]:           5 :         microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks));
      84         [ +  + ]:         101 :     for (int i = 0; i < 100; i++) {
      85                 :         100 :         auto t = now + std::chrono::microseconds(randomMsec(rng));
      86                 :         200 :         auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
      87                 :         100 :         int whichCounter = zeroToNine(rng);
      88         [ +  - ]:         200 :         CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
      89                 :         100 :                                              std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
      90         [ +  - ]:         100 :                                              randomDelta(rng), tReschedule);
      91   [ +  -  +  - ]:         200 :         microTasks.schedule(f, t);
      92                 :         100 :     }
      93                 :             : 
      94                 :             :     // Drain the task queue then exit threads
      95         [ +  - ]:           1 :     microTasks.StopWhenDrained();
      96                 :             :     // wait until all the threads are done
      97         [ +  + ]:          11 :     for (auto& thread: microThreads) {
      98   [ +  -  +  - ]:          10 :         if (thread.joinable()) thread.join();
      99                 :             :     }
     100                 :             : 
     101                 :           1 :     int counterSum = 0;
     102         [ +  + ]:          11 :     for (int i = 0; i < 10; i++) {
     103   [ +  -  +  - ]:          20 :         BOOST_CHECK(counter[i] != 0);
     104                 :          10 :         counterSum += counter[i];
     105                 :             :     }
     106   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(counterSum, 200);
     107                 :           1 : }
     108                 :             : 
     109   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(wait_until_past)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     110                 :             : {
     111                 :           1 :     std::condition_variable condvar;
     112                 :           1 :     Mutex mtx;
     113         [ +  - ]:           1 :     WAIT_LOCK(mtx, lock);
     114                 :             : 
     115                 :           7 :     const auto no_wait = [&](const std::chrono::seconds& d) {
     116                 :           6 :         return condvar.wait_until(lock, std::chrono::steady_clock::now() - d);
     117                 :           1 :     };
     118                 :             : 
     119   [ +  -  +  -  :           2 :     BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::seconds{1}));
             +  -  +  - ]
     120   [ +  -  +  -  :           2 :     BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::minutes{1}));
             +  -  +  - ]
     121   [ +  -  +  -  :           2 :     BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1}));
             +  -  +  - ]
     122   [ +  -  +  -  :           2 :     BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{10}));
             +  -  +  - ]
     123   [ +  -  +  -  :           2 :     BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{100}));
             +  -  +  - ]
     124   [ +  -  +  -  :           2 :     BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1000}));
             +  -  +  - ]
     125                 :           1 : }
     126                 :             : 
     127   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     128                 :             : {
     129                 :           1 :     CScheduler scheduler;
     130                 :             : 
     131                 :             :     // each queue should be well ordered with respect to itself but not other queues
     132         [ +  - ]:           1 :     SerialTaskRunner queue1(scheduler);
     133                 :           1 :     SerialTaskRunner queue2(scheduler);
     134                 :             : 
     135                 :             :     // create more threads than queues
     136                 :             :     // if the queues only permit execution of one task at once then
     137                 :             :     // the extra threads should effectively be doing nothing
     138                 :             :     // if they don't we'll get out of order behaviour
     139                 :           1 :     std::vector<std::thread> threads;
     140         [ +  - ]:           1 :     threads.reserve(5);
     141         [ +  + ]:           6 :     for (int i = 0; i < 5; ++i) {
     142         [ +  - ]:          10 :         threads.emplace_back([&] { scheduler.serviceQueue(); });
     143                 :             :     }
     144                 :             : 
     145                 :             :     // these are not atomic, if SerialTaskRunner prevents
     146                 :             :     // parallel execution at the queue level no synchronization should be required here
     147                 :           1 :     int counter1 = 0;
     148                 :           1 :     int counter2 = 0;
     149                 :             : 
     150                 :             :     // just simply count up on each queue - if execution is properly ordered then
     151                 :             :     // the callbacks should run in exactly the order in which they were enqueued
     152         [ +  + ]:         101 :     for (int i = 0; i < 100; ++i) {
     153         [ +  - ]:         200 :         queue1.insert([i, &counter1]() {
     154                 :         100 :             bool expectation = i == counter1++;
     155         [ -  + ]:         100 :             assert(expectation);
     156                 :         100 :         });
     157                 :             : 
     158         [ +  - ]:         300 :         queue2.insert([i, &counter2]() {
     159                 :         100 :             bool expectation = i == counter2++;
     160         [ -  + ]:         100 :             assert(expectation);
     161                 :         100 :         });
     162                 :             :     }
     163                 :             : 
     164                 :             :     // finish up
     165         [ +  - ]:           1 :     scheduler.StopWhenDrained();
     166         [ +  + ]:           6 :     for (auto& thread: threads) {
     167   [ +  -  +  - ]:           5 :         if (thread.joinable()) thread.join();
     168                 :             :     }
     169                 :             : 
     170   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(counter1, 100);
     171   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(counter2, 100);
     172                 :           1 : }
     173                 :             : 
     174   [ +  -  +  -  :           7 : BOOST_AUTO_TEST_CASE(mockforward)
          +  -  +  -  -  
          +  +  -  +  -  
          +  -  +  -  +  
          -  -  +  +  -  
          +  -  +  -  +  
          -  +  -  -  +  
          +  -  +  -  +  
          -  +  -  +  -  
          -  +  +  -  +  
          -  +  -  +  -  
          +  -  -  +  +  
                      - ]
     175                 :             : {
     176                 :           1 :     CScheduler scheduler;
     177                 :             : 
     178                 :           1 :     int counter{0};
     179         [ +  - ]:           3 :     CScheduler::Function dummy = [&counter]{counter++;};
     180                 :             : 
     181                 :             :     // schedule jobs for 2, 5 & 8 minutes into the future
     182                 :             : 
     183   [ +  -  +  - ]:           1 :     scheduler.scheduleFromNow(dummy, std::chrono::minutes{2});
     184   [ +  -  +  - ]:           1 :     scheduler.scheduleFromNow(dummy, std::chrono::minutes{5});
     185   [ +  -  +  - ]:           1 :     scheduler.scheduleFromNow(dummy, std::chrono::minutes{8});
     186                 :             : 
     187                 :             :     // check taskQueue
     188                 :           1 :     std::chrono::steady_clock::time_point first, last;
     189         [ +  - ]:           1 :     size_t num_tasks = scheduler.getQueueInfo(first, last);
     190   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(num_tasks, 3ul);
     191                 :             : 
     192         [ +  - ]:           2 :     std::thread scheduler_thread([&]() { scheduler.serviceQueue(); });
     193                 :             : 
     194                 :             :     // bump the scheduler forward 5 minutes
     195         [ +  - ]:           1 :     scheduler.MockForward(std::chrono::minutes{5});
     196                 :             : 
     197                 :             :     // ensure scheduler has chance to process all tasks queued for before 1 ms from now.
     198         [ +  - ]:           2 :     scheduler.scheduleFromNow([&scheduler] { scheduler.stop(); }, std::chrono::milliseconds{1});
     199         [ +  - ]:           1 :     scheduler_thread.join();
     200                 :             : 
     201                 :             :     // check that the queue only has one job remaining
     202         [ +  - ]:           1 :     num_tasks = scheduler.getQueueInfo(first, last);
     203   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(num_tasks, 1ul);
     204                 :             : 
     205                 :             :     // check that the dummy function actually ran
     206   [ +  -  +  - ]:           1 :     BOOST_CHECK_EQUAL(counter, 2);
     207                 :             : 
     208                 :             :     // check that the time of the remaining job has been updated
     209                 :           1 :     auto now = std::chrono::steady_clock::now();
     210         [ +  - ]:           1 :     int delta = std::chrono::duration_cast<std::chrono::seconds>(first - now).count();
     211                 :             :     // should be between 2 & 3 minutes from now
     212   [ +  -  +  - ]:           2 :     BOOST_CHECK(delta > 2*60 && delta < 3*60);
     213                 :           1 : }
     214                 :             : 
     215                 :             : BOOST_AUTO_TEST_SUITE_END()
         |