Branch data Line data Source code
1 : : // Copyright (c) 2015-present The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef BITCOIN_SCHEDULER_H
6 : : #define BITCOIN_SCHEDULER_H
7 : :
8 : : #include <attributes.h>
9 : : #include <sync.h>
10 : : #include <util/task_runner.h>
11 : :
12 : : #include <chrono>
13 : : #include <condition_variable>
14 : : #include <cstddef>
15 : : #include <functional>
16 : : #include <list>
17 : : #include <map>
18 : : #include <thread>
19 : : #include <utility>
20 : :
21 : : /**
22 : : * Simple class for background tasks that should be run
23 : : * periodically or once "after a while"
24 : : *
25 : : * Usage:
26 : : *
27 : : * CScheduler* s = new CScheduler();
28 : : * s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { }
29 : : * s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3});
30 : : * std::thread* t = new std::thread([&] { s->serviceQueue(); });
31 : : *
32 : : * ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
33 : : * s->stop();
34 : : * t->join();
35 : : * delete t;
36 : : * delete s; // Must be done after thread is interrupted/joined.
37 : : */
38 : : class CScheduler
39 : : {
40 : : public:
41 : : CScheduler();
42 : : ~CScheduler();
43 : :
44 : : std::thread m_service_thread;
45 : :
46 : : typedef std::function<void()> Function;
47 : :
48 : : /** Call func at/after time t */
49 : : void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
50 : :
51 : : /** Call f once after the delta has passed */
52 : 204 : void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
53 : : {
54 [ + - ]: 204 : schedule(std::move(f), std::chrono::steady_clock::now() + delta);
55 : 204 : }
56 : :
57 : : /**
58 : : * Repeat f until the scheduler is stopped. First run is after delta has passed once.
59 : : *
60 : : * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more
61 : : * accurate scheduling, don't use this method.
62 : : */
63 : : void scheduleEvery(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
64 : :
65 : : /**
66 : : * Mock the scheduler to fast forward in time.
67 : : * Iterates through items on taskQueue and reschedules them
68 : : * to be delta_seconds sooner.
69 : : */
70 : : void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
71 : :
72 : : /**
73 : : * Services the queue 'forever'. Should be run in a thread.
74 : : */
75 : : void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
76 : :
77 : : /** Tell any threads running serviceQueue to stop as soon as the current task is done */
78 : 206 : void stop() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
79 : : {
80 [ + - ]: 412 : WITH_LOCK(newTaskMutex, stopRequested = true);
81 : 206 : newTaskScheduled.notify_all();
82 [ + + ]: 206 : if (m_service_thread.joinable()) m_service_thread.join();
83 : 206 : }
84 : : /** Tell any threads running serviceQueue to stop when there is no work left to be done */
85 : 2 : void StopWhenDrained() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
86 : : {
87 [ + - ]: 4 : WITH_LOCK(newTaskMutex, stopWhenEmpty = true);
88 : 2 : newTaskScheduled.notify_all();
89 [ - + ]: 2 : if (m_service_thread.joinable()) m_service_thread.join();
90 : 2 : }
91 : :
92 : : /**
93 : : * Returns number of tasks waiting to be serviced,
94 : : * and first and last task times
95 : : */
96 : : size_t getQueueInfo(std::chrono::steady_clock::time_point& first,
97 : : std::chrono::steady_clock::time_point& last) const
98 : : EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
99 : :
100 : : /** Returns true if there are threads actively running in serviceQueue() */
101 : : bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
102 : :
103 : : private:
104 : : mutable Mutex newTaskMutex;
105 : : std::condition_variable newTaskScheduled;
106 : : std::multimap<std::chrono::steady_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex);
107 : : int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0};
108 : : bool stopRequested GUARDED_BY(newTaskMutex){false};
109 : : bool stopWhenEmpty GUARDED_BY(newTaskMutex){false};
110 [ + + + + : 234373 : bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
+ - + + +
+ + - + +
+ + + + +
+ + + +
+ ]
111 : : };
112 : :
113 : : /**
114 : : * Class used by CScheduler clients which may schedule multiple jobs
115 : : * which are required to be run serially. Jobs may not be run on the
116 : : * same thread, but no two jobs will be executed
117 : : * at the same time and memory will be release-acquire consistent
118 : : * (the scheduler will internally do an acquire before invoking a callback
119 : : * as well as a release at the end). In practice this means that a callback
120 : : * B() will be able to observe all of the effects of callback A() which executed
121 : : * before it.
122 : : */
123 : 1 : class SerialTaskRunner : public util::TaskRunnerInterface
124 : : {
125 : : private:
126 : : CScheduler& m_scheduler;
127 : :
128 : : Mutex m_callbacks_mutex;
129 : :
130 : : // We are not allowed to assume the scheduler only runs in one thread,
131 : : // but must ensure all callbacks happen in-order, so we end up creating
132 : : // our own queue here :(
133 : : std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex);
134 : : bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false;
135 : :
136 : : void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
137 : : void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
138 : :
139 : : public:
140 [ + - ]: 187 : explicit SerialTaskRunner(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {}
141 : :
142 : : /**
143 : : * Add a callback to be executed. Callbacks are executed serially
144 : : * and memory is release-acquire consistent between callback executions.
145 : : * Practically, this means that callbacks can behave as if they are executed
146 : : * in order by a single thread.
147 : : */
148 : : void insert(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
149 : :
150 : : /**
151 : : * Processes all remaining queue members on the calling thread, blocking until queue is empty
152 : : * Must be called after the CScheduler has no remaining processing threads!
153 : : */
154 : : void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
155 : :
156 : : size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
157 : : };
158 : :
159 : : #endif // BITCOIN_SCHEDULER_H
|