Branch data Line data Source code
1 : : // Copyright (c) 2012-2022 The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef BITCOIN_CHECKQUEUE_H
6 : : #define BITCOIN_CHECKQUEUE_H
7 : :
8 : : #include <sync.h>
9 : : #include <tinyformat.h>
10 : : #include <util/threadnames.h>
11 : :
12 : : #include <algorithm>
13 : : #include <iterator>
14 : : #include <vector>
15 : :
16 : : /**
17 : : * Queue for verifications that have to be performed.
18 : : * The verifications are represented by a type T, which must provide an
19 : : * operator(), returning a bool.
20 : : *
21 : : * One thread (the master) is assumed to push batches of verifications
22 : : * onto the queue, where they are processed by N-1 worker threads. When
23 : : * the master is done adding work, it temporarily joins the worker pool
24 : : * as an N'th worker, until all jobs are done.
25 : : */
26 : : template <typename T>
27 : : class CCheckQueue
28 : : {
29 : : private:
30 : : //! Mutex to protect the inner state
31 : : Mutex m_mutex;
32 : :
33 : : //! Worker threads block on this when out of work
34 : : std::condition_variable m_worker_cv;
35 : :
36 : : //! Master thread blocks on this when out of work
37 : : std::condition_variable m_master_cv;
38 : :
39 : : //! The queue of elements to be processed.
40 : : //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
41 : : std::vector<T> queue GUARDED_BY(m_mutex);
42 : :
43 : : //! The number of workers (including the master) that are idle.
44 : : int nIdle GUARDED_BY(m_mutex){0};
45 : :
46 : : //! The total number of workers (including the master).
47 : : int nTotal GUARDED_BY(m_mutex){0};
48 : :
49 : : //! The temporary evaluation result.
50 : : bool fAllOk GUARDED_BY(m_mutex){true};
51 : :
52 : : /**
53 : : * Number of verifications that haven't completed yet.
54 : : * This includes elements that are no longer queued, but still in the
55 : : * worker's own batches.
56 : : */
57 : : unsigned int nTodo GUARDED_BY(m_mutex){0};
58 : :
59 : : //! The maximum number of elements to be processed in one batch
60 : : const unsigned int nBatchSize;
61 : :
62 : : std::vector<std::thread> m_worker_threads;
63 : : bool m_request_stop GUARDED_BY(m_mutex){false};
64 : :
65 : : /** Internal function that does bulk of the verification work. */
66 : 16976 : bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
67 : : {
68 [ + + ]: 16976 : std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
69 : 16976 : std::vector<T> vChecks;
70 [ + - ]: 16976 : vChecks.reserve(nBatchSize);
71 : : unsigned int nNow = 0;
72 : : bool fOk = true;
73 : 238432 : do {
74 : : {
75 [ + - ]: 3977365 : WAIT_LOCK(m_mutex, lock);
76 : : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
77 [ + + ]: 3977365 : if (nNow) {
78 : 3960389 : fAllOk &= fOk;
79 : 3960389 : nTodo -= nNow;
80 [ + + + + ]: 3960389 : if (nTodo == 0 && !fMaster)
81 : : // We processed the last element; inform the master it can exit and return the result
82 : 183341 : m_master_cv.notify_one();
83 : : } else {
84 : : // first iteration
85 : 16976 : nTotal++;
86 : : }
87 : : // logically, the do loop starts here
88 [ + + + + ]: 4533743 : while (queue.empty() && !m_request_stop) {
89 [ + + + + ]: 572966 : if (fMaster && nTodo == 0) {
90 : 16588 : nTotal--;
91 : 16588 : bool fRet = fAllOk;
92 : : // reset the status for new work later
93 : 16588 : fAllOk = true;
94 : : // return the current status
95 : 16588 : return fRet;
96 : : }
97 : 556378 : nIdle++;
98 [ + - ]: 556378 : cond.wait(lock); // wait
99 : 556378 : nIdle--;
100 : : }
101 [ + + ]: 3960777 : if (m_request_stop) {
102 : : return false;
103 : : }
104 : :
105 : : // Decide how many work units to process now.
106 : : // * Do not try to do everything at once, but aim for increasingly smaller batches so
107 : : // all workers finish approximately simultaneously.
108 : : // * Try to account for idle jobs which will instantly start helping.
109 : : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
110 [ + + + + ]: 7918617 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
111 [ + - ]: 3960389 : auto start_it = queue.end() - nNow;
112 [ + - ]: 3960389 : vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
113 : 3960389 : queue.erase(start_it, queue.end());
114 : : // Check whether we need to do work at all
115 [ + - ]: 3960389 : fOk = fAllOk;
116 : 1275 : }
117 : : // execute work
118 [ + + ]: 16926838 : for (T& check : vChecks)
119 [ + + ]: 12966449 : if (fOk)
120 [ + - ]: 12946917 : fOk = check();
121 [ + - ]: 3961664 : vChecks.clear();
122 : 238148 : } while (true);
123 : 16976 : }
124 : :
125 : : public:
126 : : //! Mutex to ensure only one concurrent CCheckQueueControl
127 : : Mutex m_control_mutex;
128 : :
129 : : //! Create a new check queue
130 : 183 : explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
131 [ + - ]: 183 : : nBatchSize(batch_size)
132 : : {
133 [ + - ]: 183 : m_worker_threads.reserve(worker_threads_num);
134 [ + + ]: 571 : for (int n = 0; n < worker_threads_num; ++n) {
135 [ + - ]: 776 : m_worker_threads.emplace_back([this, n]() {
136 [ + - ][ + - : 388 : util::ThreadRename(strprintf("scriptch.%i", n));
+ - + - +
- + - +
- ]
137 : 388 : Loop(false /* worker thread */);
138 : : });
139 : : }
140 : 183 : }
141 : :
142 : : // Since this class manages its own resources, which is a thread
143 : : // pool `m_worker_threads`, copy and move operations are not appropriate.
144 : : CCheckQueue(const CCheckQueue&) = delete;
145 : : CCheckQueue& operator=(const CCheckQueue&) = delete;
146 : : CCheckQueue(CCheckQueue&&) = delete;
147 : : CCheckQueue& operator=(CCheckQueue&&) = delete;
148 : :
149 : : //! Wait until execution finishes, and return whether all evaluations were successful.
150 : 16588 : bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
151 : : {
152 : 16588 : return Loop(true /* master thread */);
153 : : }
154 : :
155 : : //! Add a batch of checks to the queue
156 : 2885244 : void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
157 : : {
158 [ + + ]: 2885244 : if (vChecks.empty()) {
159 : : return;
160 : : }
161 : :
162 : : {
163 : 2598109 : LOCK(m_mutex);
164 [ + - ]: 2598109 : queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
165 [ + - ]: 2598109 : nTodo += vChecks.size();
166 [ + + ]: 2598109 : }
167 : :
168 [ + + ]: 2598109 : if (vChecks.size() == 1) {
169 : 294173 : m_worker_cv.notify_one();
170 : : } else {
171 : 2303936 : m_worker_cv.notify_all();
172 : : }
173 : : }
174 : :
175 : 183 : ~CCheckQueue()
176 : : {
177 [ + - ]: 366 : WITH_LOCK(m_mutex, m_request_stop = true);
178 : 183 : m_worker_cv.notify_all();
179 [ + + ]: 571 : for (std::thread& t : m_worker_threads) {
180 : 388 : t.join();
181 : : }
182 : 183 : }
183 : :
184 : 14661 : bool HasThreads() const { return !m_worker_threads.empty(); }
185 : : };
186 : :
187 : : /**
188 : : * RAII-style controller object for a CCheckQueue that guarantees the passed
189 : : * queue is finished before continuing.
190 : : */
191 : : template <typename T>
192 : : class CCheckQueueControl
193 : : {
194 : : private:
195 : : CCheckQueue<T> * const pqueue;
196 : : bool fDone;
197 : :
198 : : public:
199 : : CCheckQueueControl() = delete;
200 : : CCheckQueueControl(const CCheckQueueControl&) = delete;
201 : : CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
202 : 16749 : explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
203 : : {
204 : : // passed queue is supposed to be unused, or nullptr
205 [ + + ][ + - : 16748 : if (pqueue != nullptr) {
+ - + - +
- + - + -
+ - + - ]
206 [ + - ][ + - : 16588 : ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
+ - + - +
- + - ]
207 : : }
208 : : }
209 : :
210 : 16910 : bool Wait()
211 : : {
212 [ + + ]: 16910 : if (pqueue == nullptr)
213 : : return true;
214 : 16588 : bool fRet = pqueue->Wait();
215 : 16588 : fDone = true;
216 : 16588 : return fRet;
217 : : }
218 : :
219 : 2885244 : void Add(std::vector<T>&& vChecks)
220 : : {
221 [ + - ][ + - : 2885244 : if (pqueue != nullptr) {
+ - + - +
- + - -
+ ]
222 [ + - ][ + - : 2885244 : pqueue->Add(std::move(vChecks));
+ - + - +
- + - +
- ]
223 : : }
224 : : }
225 : :
226 : 16749 : ~CCheckQueueControl()
227 : : {
228 [ + + ]: 16749 : if (!fDone)
229 : 1171 : Wait();
230 [ + + ]: 16749 : if (pqueue != nullptr) {
231 : 16588 : LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
232 : : }
233 : 16749 : }
234 : : };
235 : :
236 : : #endif // BITCOIN_CHECKQUEUE_H
|