Branch data Line data Source code
1 : : // Copyright (c) 2015-2022 The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef BITCOIN_SCHEDULER_H
6 : : #define BITCOIN_SCHEDULER_H
7 : :
8 : : #include <attributes.h>
9 : : #include <sync.h>
10 : : #include <threadsafety.h>
11 : : #include <util/task_runner.h>
12 : :
13 : : #include <chrono>
14 : : #include <condition_variable>
15 : : #include <cstddef>
16 : : #include <functional>
17 : : #include <list>
18 : : #include <map>
19 : : #include <thread>
20 : : #include <utility>
21 : :
22 : : /**
23 : : * Simple class for background tasks that should be run
24 : : * periodically or once "after a while"
25 : : *
26 : : * Usage:
27 : : *
28 : : * CScheduler* s = new CScheduler();
29 : : * s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { }
30 : : * s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3});
31 : : * std::thread* t = new std::thread([&] { s->serviceQueue(); });
32 : : *
33 : : * ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
34 : : * s->stop();
35 : : * t->join();
36 : : * delete t;
37 : : * delete s; // Must be done after thread is interrupted/joined.
38 : : */
39 : : class CScheduler
40 : : {
41 : : public:
42 : : CScheduler();
43 : : ~CScheduler();
44 : :
45 : : std::thread m_service_thread;
46 : :
47 : : typedef std::function<void()> Function;
48 : :
49 : : /** Call func at/after time t */
50 : : void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
51 : :
52 : : /** Call f once after the delta has passed */
53 : 0 : void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
54 : : {
55 [ # # ]: 0 : schedule(std::move(f), std::chrono::steady_clock::now() + delta);
56 : 0 : }
57 : :
58 : : /**
59 : : * Repeat f until the scheduler is stopped. First run is after delta has passed once.
60 : : *
61 : : * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more
62 : : * accurate scheduling, don't use this method.
63 : : */
64 : : void scheduleEvery(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
65 : :
66 : : /**
67 : : * Mock the scheduler to fast forward in time.
68 : : * Iterates through items on taskQueue and reschedules them
69 : : * to be delta_seconds sooner.
70 : : */
71 : : void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
72 : :
73 : : /**
74 : : * Services the queue 'forever'. Should be run in a thread.
75 : : */
76 : : void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
77 : :
78 : : /** Tell any threads running serviceQueue to stop as soon as the current task is done */
79 : 1172 : void stop() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
80 : : {
81 [ + - ]: 2344 : WITH_LOCK(newTaskMutex, stopRequested = true);
82 : 1172 : newTaskScheduled.notify_all();
83 [ + - ]: 1172 : if (m_service_thread.joinable()) m_service_thread.join();
84 : 1172 : }
85 : : /** Tell any threads running serviceQueue to stop when there is no work left to be done */
86 : : void StopWhenDrained() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
87 : : {
88 : : WITH_LOCK(newTaskMutex, stopWhenEmpty = true);
89 : : newTaskScheduled.notify_all();
90 : : if (m_service_thread.joinable()) m_service_thread.join();
91 : : }
92 : :
93 : : /**
94 : : * Returns number of tasks waiting to be serviced,
95 : : * and first and last task times
96 : : */
97 : : size_t getQueueInfo(std::chrono::steady_clock::time_point& first,
98 : : std::chrono::steady_clock::time_point& last) const
99 : : EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
100 : :
101 : : /** Returns true if there are threads actively running in serviceQueue() */
102 : : bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
103 : :
104 : : private:
105 : : mutable Mutex newTaskMutex;
106 : : std::condition_variable newTaskScheduled;
107 : : std::multimap<std::chrono::steady_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex);
108 : : int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0};
109 : : bool stopRequested GUARDED_BY(newTaskMutex){false};
110 : : bool stopWhenEmpty GUARDED_BY(newTaskMutex){false};
111 [ + + - + : 4121125 : bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
- - + + -
+ - - + +
- + - - +
+ - + -
- ]
112 : : };
113 : :
114 : : /**
115 : : * Class used by CScheduler clients which may schedule multiple jobs
116 : : * which are required to be run serially. Jobs may not be run on the
117 : : * same thread, but no two jobs will be executed
118 : : * at the same time and memory will be release-acquire consistent
119 : : * (the scheduler will internally do an acquire before invoking a callback
120 : : * as well as a release at the end). In practice this means that a callback
121 : : * B() will be able to observe all of the effects of callback A() which executed
122 : : * before it.
123 : : */
124 : : class SerialTaskRunner : public util::TaskRunnerInterface
125 : : {
126 : : private:
127 : : CScheduler& m_scheduler;
128 : :
129 : : Mutex m_callbacks_mutex;
130 : :
131 : : // We are not allowed to assume the scheduler only runs in one thread,
132 : : // but must ensure all callbacks happen in-order, so we end up creating
133 : : // our own queue here :(
134 : : std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex);
135 : : bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false;
136 : :
137 : : void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
138 : : void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
139 : :
140 : : public:
141 : 1172 : explicit SerialTaskRunner(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {}
142 : :
143 : : /**
144 : : * Add a callback to be executed. Callbacks are executed serially
145 : : * and memory is release-acquire consistent between callback executions.
146 : : * Practically, this means that callbacks can behave as if they are executed
147 : : * in order by a single thread.
148 : : */
149 : : void insert(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
150 : :
151 : : /**
152 : : * Processes all remaining queue members on the calling thread, blocking until queue is empty
153 : : * Must be called after the CScheduler has no remaining processing threads!
154 : : */
155 : : void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
156 : :
157 : : size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
158 : : };
159 : :
160 : : #endif // BITCOIN_SCHEDULER_H
|