Branch data Line data Source code
1 : : // Copyright (c) 2012-2022 The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef BITCOIN_CHECKQUEUE_H
6 : : #define BITCOIN_CHECKQUEUE_H
7 : :
8 : : #include <logging.h>
9 : : #include <sync.h>
10 : : #include <tinyformat.h>
11 : : #include <util/threadnames.h>
12 : :
13 : : #include <algorithm>
14 : : #include <iterator>
15 : : #include <optional>
16 : : #include <vector>
17 : :
18 : : /**
19 : : * Queue for verifications that have to be performed.
20 : : * The verifications are represented by a type T, which must provide an
21 : : * operator(), returning an std::optional<R>.
22 : : *
23 : : * The overall result of the computation is std::nullopt if all invocations
24 : : * return std::nullopt, or one of the other results otherwise.
25 : : *
26 : : * One thread (the master) is assumed to push batches of verifications
27 : : * onto the queue, where they are processed by N-1 worker threads. When
28 : : * the master is done adding work, it temporarily joins the worker pool
29 : : * as an N'th worker, until all jobs are done.
30 : : *
31 : : */
32 : : template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
33 : : class CCheckQueue
34 : : {
35 : : private:
36 : : //! Mutex to protect the inner state
37 : : Mutex m_mutex;
38 : :
39 : : //! Worker threads block on this when out of work
40 : : std::condition_variable m_worker_cv;
41 : :
42 : : //! Master thread blocks on this when out of work
43 : : std::condition_variable m_master_cv;
44 : :
45 : : //! The queue of elements to be processed.
46 : : //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
47 : : std::vector<T> queue GUARDED_BY(m_mutex);
48 : :
49 : : //! The number of workers (including the master) that are idle.
50 : : int nIdle GUARDED_BY(m_mutex){0};
51 : :
52 : : //! The total number of workers (including the master).
53 : : int nTotal GUARDED_BY(m_mutex){0};
54 : :
55 : : //! The temporary evaluation result.
56 : : std::optional<R> m_result GUARDED_BY(m_mutex);
57 : :
58 : : /**
59 : : * Number of verifications that haven't completed yet.
60 : : * This includes elements that are no longer queued, but still in the
61 : : * worker's own batches.
62 : : */
63 : : unsigned int nTodo GUARDED_BY(m_mutex){0};
64 : :
65 : : //! The maximum number of elements to be processed in one batch
66 : : const unsigned int nBatchSize;
67 : :
68 : : std::vector<std::thread> m_worker_threads;
69 : : bool m_request_stop GUARDED_BY(m_mutex){false};
70 : :
71 : : /** Internal function that does bulk of the verification work. If fMaster, return the final result. */
72 : 191091 : std::optional<R> Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
73 : : {
74 [ + + ]: 191091 : std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
75 : 191091 : std::vector<T> vChecks;
76 [ + - ]: 191091 : vChecks.reserve(nBatchSize);
77 : 191091 : unsigned int nNow = 0;
78 : 191091 : std::optional<R> local_result;
79 : : bool do_work;
80 : 251925 : do {
81 : : {
82 [ + - ]: 1109722 : WAIT_LOCK(m_mutex, lock);
83 : : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
84 [ + + ]: 1109722 : if (nNow) {
85 [ + + + - ]: 918631 : if (local_result.has_value() && !m_result.has_value()) {
86 : 3610 : std::swap(local_result, m_result);
87 : : }
88 : 918631 : nTodo -= nNow;
89 [ + + + + ]: 918631 : if (nTodo == 0 && !fMaster) {
90 : : // We processed the last element; inform the master it can exit and return the result
91 : 54888 : m_master_cv.notify_one();
92 : : }
93 : : } else {
94 : : // first iteration
95 : 191091 : nTotal++;
96 : : }
97 : : // logically, the do loop starts here
98 [ + + + + ]: 1323388 : while (queue.empty() && !m_request_stop) {
99 [ + + + + ]: 390319 : if (fMaster && nTodo == 0) {
100 : 176653 : nTotal--;
101 : 176653 : std::optional<R> to_return = std::move(m_result);
102 : : // reset the status for new work later
103 [ + + ]: 176653 : m_result = std::nullopt;
104 : : // return the current status
105 : 2245 : return to_return;
106 : : }
107 : 213666 : nIdle++;
108 [ + - ]: 213666 : cond.wait(lock); // wait
109 : 213666 : nIdle--;
110 : : }
111 [ + + ]: 933069 : if (m_request_stop) {
112 : : // return value does not matter, because m_request_stop is only set in the destructor.
113 : 14438 : return std::nullopt;
114 : : }
115 : :
116 : : // Decide how many work units to process now.
117 : : // * Do not try to do everything at once, but aim for increasingly smaller batches so
118 : : // all workers finish approximately simultaneously.
119 : : // * Try to account for idle jobs which will instantly start helping.
120 : : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
121 [ + + + + ]: 1764342 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
122 [ + - ]: 918631 : auto start_it = queue.end() - nNow;
123 [ + - ]: 918631 : vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
124 : 918631 : queue.erase(start_it, queue.end());
125 : : // Check whether we need to do work at all
126 [ + - ]: 918631 : do_work = !m_result.has_value();
127 : 190084 : }
128 : : // execute work
129 [ + + ]: 918631 : if (do_work) {
130 [ + + + + ]: 13340389 : for (T& check : vChecks) {
[ + + # # ]
131 [ + + + + ]: 12486773 : local_result = check();
[ + - + + ]
132 [ + + ]: 12437064 : if (local_result.has_value()) break;
133 : : }
134 : : }
135 [ + - ]: 919899 : vChecks.clear();
136 : 224511 : } while (true);
137 : 191091 : }
138 : :
139 : : public:
140 : : //! Mutex to ensure only one concurrent CCheckQueueControl
141 : : Mutex m_control_mutex;
142 : :
143 : : //! Create a new check queue
144 : 1125 : explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
145 [ + - ]: 1125 : : nBatchSize(batch_size)
146 : : {
147 [ + - ]: 1125 : LogInfo("Script verification uses %d additional threads", worker_threads_num);
148 [ + - ]: 1125 : m_worker_threads.reserve(worker_threads_num);
149 [ + + ]: 15563 : for (int n = 0; n < worker_threads_num; ++n) {
150 [ + - ]: 14468 : m_worker_threads.emplace_back([this, n]() {
151 [ + - + - : 14438 : util::ThreadRename(strprintf("scriptch.%i", n));
+ - + - +
- + - ][ +
- # # # #
# # # # #
# ]
152 : 14438 : Loop(false /* worker thread */);
153 : : });
154 : : }
155 : 1125 : }
156 : :
157 : : // Since this class manages its own resources, which is a thread
158 : : // pool `m_worker_threads`, copy and move operations are not appropriate.
159 : : CCheckQueue(const CCheckQueue&) = delete;
160 : : CCheckQueue& operator=(const CCheckQueue&) = delete;
161 : : CCheckQueue(CCheckQueue&&) = delete;
162 : : CCheckQueue& operator=(CCheckQueue&&) = delete;
163 : :
164 : : //! Join the execution until completion. If at least one evaluation wasn't successful, return
165 : : //! its error.
166 : 176653 : std::optional<R> Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
167 : : {
168 : 176653 : return Loop(true /* master thread */);
169 : : }
170 : :
171 : : //! Add a batch of checks to the queue
172 : 2822698 : void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
173 : : {
174 [ + + ]: 2822698 : if (vChecks.empty()) {
175 : : return;
176 : : }
177 : :
178 : : {
179 : 2518964 : LOCK(m_mutex);
180 [ + - ]: 2518964 : queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
181 [ + - ]: 2518964 : nTodo += vChecks.size();
182 [ + + ]: 2518964 : }
183 : :
184 [ + + ]: 2518964 : if (vChecks.size() == 1) {
185 : 300751 : m_worker_cv.notify_one();
186 : : } else {
187 : 2218213 : m_worker_cv.notify_all();
188 : : }
189 : : }
190 : :
191 : 1125 : ~CCheckQueue()
192 : : {
193 [ + - ]: 2250 : WITH_LOCK(m_mutex, m_request_stop = true);
194 : 1125 : m_worker_cv.notify_all();
195 [ + + ]: 15563 : for (std::thread& t : m_worker_threads) {
196 : 14438 : t.join();
197 : : }
198 : 1125 : }
199 : :
200 : 175650 : bool HasThreads() const { return !m_worker_threads.empty(); }
201 : : };
202 : :
203 : : /**
204 : : * RAII-style controller object for a CCheckQueue that guarantees the passed
205 : : * queue is finished before continuing.
206 : : */
207 : : template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
208 : : class CCheckQueueControl
209 : : {
210 : : private:
211 : : CCheckQueue<T, R> * const pqueue;
212 : : bool fDone;
213 : :
214 : : public:
215 : : CCheckQueueControl() = delete;
216 : : CCheckQueueControl(const CCheckQueueControl&) = delete;
217 : : CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
218 : 177429 : explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
219 : : {
220 : : // passed queue is supposed to be unused, or nullptr
221 [ + + + - : 177428 : if (pqueue != nullptr) {
+ - + - +
- + - + -
+ - ][ + + ]
222 [ + - + - : 176653 : ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
+ - + - +
- ][ + - #
# # # # #
# # ]
223 : : }
224 : : }
225 : :
226 : 178205 : std::optional<R> Complete()
227 : : {
228 [ + + ]: 178205 : if (pqueue == nullptr) return std::nullopt;
229 : 176653 : auto ret = pqueue->Complete();
230 : 176653 : fDone = true;
231 : 176653 : return ret;
232 : 174408 : }
233 : :
234 : 2823322 : void Add(std::vector<T>&& vChecks)
235 : : {
236 [ + + + - : 2823322 : if (pqueue != nullptr) {
+ - + - +
- - + ][ +
- # # # #
# # # # #
# ]
237 [ + - + - : 2822698 : pqueue->Add(std::move(vChecks));
+ - + - +
- + - ][ +
- # # # #
# # # # #
# ]
238 : : }
239 : : }
240 : :
241 : 177429 : ~CCheckQueueControl()
242 : : {
243 [ + + ]: 177429 : if (!fDone)
244 : 1781 : Complete();
245 [ + + ]: 177429 : if (pqueue != nullptr) {
246 : 176653 : LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
247 : : }
248 : 177429 : }
249 : : };
250 : :
251 : : #endif // BITCOIN_CHECKQUEUE_H
|