Branch data Line data Source code
1 : : // Copyright (c) 2012-2022 The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #include <random.h>
6 : : #include <scheduler.h>
7 : : #include <util/time.h>
8 : :
9 : : #include <boost/test/unit_test.hpp>
10 : :
11 : : #include <functional>
12 : : #include <mutex>
13 : : #include <thread>
14 : : #include <vector>
15 : :
16 : : BOOST_AUTO_TEST_SUITE(scheduler_tests)
17 : :
18 : 400 : static void microTask(CScheduler& s, std::mutex& mutex, int& counter, int delta, std::chrono::steady_clock::time_point rescheduleTime)
19 : : {
20 : 400 : {
21 : 400 : std::lock_guard<std::mutex> lock(mutex);
22 : 400 : counter += delta;
23 : 400 : }
24 : 400 : auto noTime = std::chrono::steady_clock::time_point::min();
25 [ + + ]: 400 : if (rescheduleTime != noTime) {
26 : 200 : CScheduler::Function f = std::bind(µTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime);
27 [ + - + - ]: 400 : s.schedule(f, rescheduleTime);
28 : 200 : }
29 : 400 : }
30 : :
31 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(manythreads)
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- ]
32 : : {
33 : : // Stress test: hundreds of microsecond-scheduled tasks,
34 : : // serviced by 10 threads.
35 : : //
36 : : // So... ten shared counters, which if all the tasks execute
37 : : // properly will sum to the number of tasks done.
38 : : // Each task adds or subtracts a random amount from one of the
39 : : // counters, and then schedules another task 0-1000
40 : : // microseconds in the future to subtract or add from
41 : : // the counter -random_amount+1, so in the end the shared
42 : : // counters should sum to the number of initial tasks performed.
43 : 1 : CScheduler microTasks;
44 : :
45 : 1 : std::mutex counterMutex[10];
46 : 1 : int counter[10] = { 0 };
47 : 1 : FastRandomContext rng{/*fDeterministic=*/true};
48 : 401 : auto zeroToNine = [](FastRandomContext& rc) -> int { return rc.randrange(10); }; // [0, 9]
49 : 401 : auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000]
50 : 401 : auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000]
51 : :
52 : 1 : auto start = std::chrono::steady_clock::now();
53 : 1 : auto now = start;
54 : 1 : std::chrono::steady_clock::time_point first, last;
55 [ + - ]: 1 : size_t nTasks = microTasks.getQueueInfo(first, last);
56 [ + - + - ]: 2 : BOOST_CHECK(nTasks == 0);
57 : :
58 [ + + ]: 101 : for (int i = 0; i < 100; ++i) {
59 : 100 : auto t = now + std::chrono::microseconds(randomMsec(rng));
60 : 200 : auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
61 : 100 : int whichCounter = zeroToNine(rng);
62 [ + - ]: 200 : CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
63 : 100 : std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
64 [ + - ]: 100 : randomDelta(rng), tReschedule);
65 [ + - + - ]: 200 : microTasks.schedule(f, t);
66 : 100 : }
67 [ + - ]: 1 : nTasks = microTasks.getQueueInfo(first, last);
68 [ + - + - : 2 : BOOST_CHECK(nTasks == 100);
+ - ]
69 [ + - + - : 2 : BOOST_CHECK(first < last);
+ - ]
70 [ + - + - : 2 : BOOST_CHECK(last > now);
+ - ]
71 : :
72 : : // As soon as these are created they will start running and servicing the queue
73 : 1 : std::vector<std::thread> microThreads;
74 [ + - ]: 1 : microThreads.reserve(10);
75 [ + + ]: 6 : for (int i = 0; i < 5; i++)
76 [ + - ]: 5 : microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks));
77 : :
78 [ + - ]: 1 : UninterruptibleSleep(std::chrono::microseconds{600});
79 : 1 : now = std::chrono::steady_clock::now();
80 : :
81 : : // More threads and more tasks:
82 [ + + ]: 6 : for (int i = 0; i < 5; i++)
83 [ + - ]: 5 : microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks));
84 [ + + ]: 101 : for (int i = 0; i < 100; i++) {
85 : 100 : auto t = now + std::chrono::microseconds(randomMsec(rng));
86 : 200 : auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
87 : 100 : int whichCounter = zeroToNine(rng);
88 [ + - ]: 200 : CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
89 : 100 : std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
90 [ + - ]: 100 : randomDelta(rng), tReschedule);
91 [ + - + - ]: 200 : microTasks.schedule(f, t);
92 : 100 : }
93 : :
94 : : // Drain the task queue then exit threads
95 [ + - ]: 1 : microTasks.StopWhenDrained();
96 : : // wait until all the threads are done
97 [ + + ]: 11 : for (auto& thread: microThreads) {
98 [ + - + - ]: 10 : if (thread.joinable()) thread.join();
99 : : }
100 : :
101 : 1 : int counterSum = 0;
102 [ + + ]: 11 : for (int i = 0; i < 10; i++) {
103 [ + - + - ]: 20 : BOOST_CHECK(counter[i] != 0);
104 : 10 : counterSum += counter[i];
105 : : }
106 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(counterSum, 200);
107 : 1 : }
108 : :
109 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(wait_until_past)
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- ]
110 : : {
111 : 1 : std::condition_variable condvar;
112 : 1 : Mutex mtx;
113 [ + - ]: 1 : WAIT_LOCK(mtx, lock);
114 : :
115 : 7 : const auto no_wait = [&](const std::chrono::seconds& d) {
116 : 6 : return condvar.wait_until(lock, std::chrono::steady_clock::now() - d);
117 : 1 : };
118 : :
119 [ + - + - : 2 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::seconds{1}));
+ - + - ]
120 [ + - + - : 2 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::minutes{1}));
+ - + - ]
121 [ + - + - : 2 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1}));
+ - + - ]
122 [ + - + - : 2 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{10}));
+ - + - ]
123 [ + - + - : 2 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{100}));
+ - + - ]
124 [ + - + - : 2 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1000}));
+ - + - ]
125 : 1 : }
126 : :
127 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- ]
128 : : {
129 : 1 : CScheduler scheduler;
130 : :
131 : : // each queue should be well ordered with respect to itself but not other queues
132 [ + - ]: 1 : SerialTaskRunner queue1(scheduler);
133 : 1 : SerialTaskRunner queue2(scheduler);
134 : :
135 : : // create more threads than queues
136 : : // if the queues only permit execution of one task at once then
137 : : // the extra threads should effectively be doing nothing
138 : : // if they don't we'll get out of order behaviour
139 : 1 : std::vector<std::thread> threads;
140 [ + - ]: 1 : threads.reserve(5);
141 [ + + ]: 6 : for (int i = 0; i < 5; ++i) {
142 [ + - ]: 10 : threads.emplace_back([&] { scheduler.serviceQueue(); });
143 : : }
144 : :
145 : : // these are not atomic, if SerialTaskRunner prevents
146 : : // parallel execution at the queue level no synchronization should be required here
147 : 1 : int counter1 = 0;
148 : 1 : int counter2 = 0;
149 : :
150 : : // just simply count up on each queue - if execution is properly ordered then
151 : : // the callbacks should run in exactly the order in which they were enqueued
152 [ + + ]: 101 : for (int i = 0; i < 100; ++i) {
153 [ + - ]: 200 : queue1.insert([i, &counter1]() {
154 : 100 : bool expectation = i == counter1++;
155 [ - + ]: 100 : assert(expectation);
156 : 100 : });
157 : :
158 [ + - ]: 300 : queue2.insert([i, &counter2]() {
159 : 100 : bool expectation = i == counter2++;
160 [ - + ]: 100 : assert(expectation);
161 : 100 : });
162 : : }
163 : :
164 : : // finish up
165 [ + - ]: 1 : scheduler.StopWhenDrained();
166 [ + + ]: 6 : for (auto& thread: threads) {
167 [ + - + - ]: 5 : if (thread.joinable()) thread.join();
168 : : }
169 : :
170 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(counter1, 100);
171 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(counter2, 100);
172 : 1 : }
173 : :
174 [ + - + - : 7 : BOOST_AUTO_TEST_CASE(mockforward)
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- ]
175 : : {
176 : 1 : CScheduler scheduler;
177 : :
178 : 1 : int counter{0};
179 : 3 : CScheduler::Function dummy = [&counter]{counter++;};
180 : :
181 : : // schedule jobs for 2, 5 & 8 minutes into the future
182 : :
183 [ + - + - ]: 1 : scheduler.scheduleFromNow(dummy, std::chrono::minutes{2});
184 [ + - + - ]: 1 : scheduler.scheduleFromNow(dummy, std::chrono::minutes{5});
185 [ + - + - ]: 1 : scheduler.scheduleFromNow(dummy, std::chrono::minutes{8});
186 : :
187 : : // check taskQueue
188 : 1 : std::chrono::steady_clock::time_point first, last;
189 [ + - ]: 1 : size_t num_tasks = scheduler.getQueueInfo(first, last);
190 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(num_tasks, 3ul);
191 : :
192 [ + - ]: 2 : std::thread scheduler_thread([&]() { scheduler.serviceQueue(); });
193 : :
194 : : // bump the scheduler forward 5 minutes
195 [ + - ]: 1 : scheduler.MockForward(std::chrono::minutes{5});
196 : :
197 : : // ensure scheduler has chance to process all tasks queued for before 1 ms from now.
198 [ + - ]: 2 : scheduler.scheduleFromNow([&scheduler] { scheduler.stop(); }, std::chrono::milliseconds{1});
199 [ + - ]: 1 : scheduler_thread.join();
200 : :
201 : : // check that the queue only has one job remaining
202 [ + - ]: 1 : num_tasks = scheduler.getQueueInfo(first, last);
203 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(num_tasks, 1ul);
204 : :
205 : : // check that the dummy function actually ran
206 [ + - + - ]: 1 : BOOST_CHECK_EQUAL(counter, 2);
207 : :
208 : : // check that the time of the remaining job has been updated
209 : 1 : auto now = std::chrono::steady_clock::now();
210 [ + - ]: 1 : int delta = std::chrono::duration_cast<std::chrono::seconds>(first - now).count();
211 : : // should be between 2 & 3 minutes from now
212 [ + - + - ]: 2 : BOOST_CHECK(delta > 2*60 && delta < 3*60);
213 : 1 : }
214 : :
215 : : BOOST_AUTO_TEST_SUITE_END()
|