Branch data Line data Source code
1 : : // Copyright (c) 2020-present The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #include <test/util/net.h>
6 : :
7 : : #include <net.h>
8 : : #include <net_processing.h>
9 : : #include <netaddress.h>
10 : : #include <netmessagemaker.h>
11 : : #include <node/connection_types.h>
12 : : #include <node/eviction.h>
13 : : #include <protocol.h>
14 : : #include <random.h>
15 : : #include <serialize.h>
16 : : #include <span.h>
17 : : #include <sync.h>
18 : :
19 : : #include <chrono>
20 : : #include <optional>
21 : : #include <vector>
22 : :
23 : 18646 : void ConnmanTestMsg::Handshake(CNode& node,
24 : : bool successfully_connected,
25 : : ServiceFlags remote_services,
26 : : ServiceFlags local_services,
27 : : int32_t version,
28 : : bool relay_txs)
29 : : {
30 : 18646 : auto& peerman{static_cast<PeerManager&>(*m_msgproc)};
31 : 18646 : auto& connman{*this};
32 : :
33 : 18646 : peerman.InitializeNode(node, local_services);
34 : 18646 : peerman.SendMessages(&node);
35 : 18646 : FlushSendBuffer(node); // Drop the version message added by SendMessages.
36 : :
37 : 18646 : CSerializedNetMsg msg_version{
38 [ + - ]: 37292 : NetMsg::Make(NetMsgType::VERSION,
39 : : version, //
40 [ + - ]: 18646 : Using<CustomUintFormatter<8>>(remote_services), //
41 : 18646 : int64_t{}, // dummy time
42 : 18646 : int64_t{}, // ignored service bits
43 [ + - ]: 37292 : CNetAddr::V1(CService{}), // dummy
44 : 18646 : int64_t{}, // ignored service bits
45 [ + - ]: 37292 : CNetAddr::V1(CService{}), // ignored
46 : 18646 : uint64_t{1}, // dummy nonce
47 [ + - ]: 18646 : std::string{}, // dummy subver
48 [ + - ]: 18646 : int32_t{}, // dummy starting_height
49 : : relay_txs),
50 [ + - ]: 18646 : };
51 : :
52 [ + - ]: 18646 : (void)connman.ReceiveMsgFrom(node, std::move(msg_version));
53 [ + - ]: 18646 : node.fPauseSend = false;
54 [ + - ]: 18646 : connman.ProcessMessagesOnce(node);
55 [ + - ]: 18646 : peerman.SendMessages(&node);
56 [ + - ]: 18646 : FlushSendBuffer(node); // Drop the verack message added by SendMessages.
57 [ + + ]: 18646 : if (node.fDisconnect) return;
58 [ - + ]: 15226 : assert(node.nVersion == version);
59 [ + + - + ]: 20046 : assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
60 [ + - ]: 15226 : CNodeStateStats statestats;
61 [ + - - + ]: 15226 : assert(peerman.GetNodeStateStats(node.GetId(), statestats));
62 [ + + + + : 16398 : assert(statestats.m_relay_txs == (relay_txs && !node.IsBlockOnlyConn()));
- + ]
63 [ - + ]: 15226 : assert(statestats.their_services == remote_services);
64 [ + + ]: 15226 : if (successfully_connected) {
65 [ + - + - ]: 11173 : CSerializedNetMsg msg_verack{NetMsg::Make(NetMsgType::VERACK)};
66 [ + - ]: 11173 : (void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
67 [ + - ]: 11173 : node.fPauseSend = false;
68 [ + - ]: 11173 : connman.ProcessMessagesOnce(node);
69 [ + - ]: 11173 : peerman.SendMessages(&node);
70 [ - + ]: 11173 : assert(node.fSuccessfullyConnected == true);
71 : 11173 : }
72 : 33872 : }
73 : :
74 : 10548 : void ConnmanTestMsg::ResetAddrCache() { m_addr_response_caches = {}; }
75 : :
76 : 10548 : void ConnmanTestMsg::ResetMaxOutboundCycle()
77 : : {
78 : 10548 : LOCK(m_total_bytes_sent_mutex);
79 : 10548 : nMaxOutboundCycleStartTime = 0s;
80 [ + - ]: 10548 : nMaxOutboundTotalBytesSentInCycle = 0;
81 : 10548 : }
82 : :
83 : 334253 : void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, std::span<const uint8_t> msg_bytes, bool& complete) const
84 : : {
85 [ - + ]: 334253 : assert(node.ReceiveMsgBytes(msg_bytes, complete));
86 [ + + ]: 334253 : if (complete) {
87 : 164176 : node.MarkReceivedMsgsForProcessing();
88 : : }
89 : 334253 : }
90 : :
91 : 182914 : void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
92 : : {
93 : 182914 : LOCK(node.cs_vSend);
94 : 182914 : node.vSendMsg.clear();
95 : 182914 : node.m_send_memusage = 0;
96 : 52422 : while (true) {
97 [ + + ]: 235336 : const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
98 [ + + ]: 235336 : if (to_send.empty()) break;
99 : 52422 : node.m_transport->MarkBytesSent(to_send.size());
100 : 52422 : }
101 : 182914 : }
102 : :
103 : 175441 : bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
104 : : {
105 : 175441 : bool queued = node.m_transport->SetMessageToSend(ser_msg);
106 [ - + ]: 175441 : assert(queued);
107 : 175441 : bool complete{false};
108 : 334253 : while (true) {
109 [ + + ]: 509694 : const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
110 [ + + ]: 509694 : if (to_send.empty()) break;
111 : 334253 : NodeReceiveMsgBytes(node, to_send, complete);
112 : 334253 : node.m_transport->MarkBytesSent(to_send.size());
113 : 334253 : }
114 : 175441 : return complete;
115 : : }
116 : :
117 : 0 : CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
118 : : {
119 [ # # ]: 0 : CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true);
120 [ # # ]: 0 : if (!node) return nullptr;
121 : 0 : node->SetCommonVersion(PROTOCOL_VERSION);
122 : 0 : peerman.InitializeNode(*node, ServiceFlags(NODE_NETWORK | NODE_WITNESS));
123 : 0 : node->fSuccessfullyConnected = true;
124 : 0 : AddTestNode(*node);
125 : 0 : return node;
126 : : }
127 : :
128 : 0 : std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context)
129 : : {
130 : 0 : std::vector<NodeEvictionCandidate> candidates;
131 [ # # ]: 0 : candidates.reserve(n_candidates);
132 [ # # ]: 0 : for (int id = 0; id < n_candidates; ++id) {
133 [ # # ]: 0 : candidates.push_back({
134 : 0 : .id=id,
135 : 0 : .m_connected=std::chrono::seconds{random_context.randrange(100)},
136 : 0 : .m_min_ping_time=std::chrono::microseconds{random_context.randrange(100)},
137 : 0 : .m_last_block_time=std::chrono::seconds{random_context.randrange(100)},
138 : 0 : .m_last_tx_time=std::chrono::seconds{random_context.randrange(100)},
139 : 0 : .fRelevantServices=random_context.randbool(),
140 : 0 : .m_relay_txs=random_context.randbool(),
141 : 0 : .fBloomFilter=random_context.randbool(),
142 : 0 : .nKeyedNetGroup=random_context.randrange(100u),
143 : 0 : .prefer_evict=random_context.randbool(),
144 : 0 : .m_is_local=random_context.randbool(),
145 [ # # ]: 0 : .m_network=ALL_NETWORKS[random_context.randrange(ALL_NETWORKS.size())],
146 : : .m_noban=false,
147 : : .m_conn_type=ConnectionType::INBOUND,
148 : : });
149 : : }
150 : 0 : return candidates;
151 : 0 : }
152 : :
153 : : // Have different ZeroSock (or others that inherit from it) objects have different
154 : : // m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two
155 : : // different objects comparing as equal.
156 : : static std::atomic<SOCKET> g_mocked_sock_fd{0};
157 : :
158 : 0 : ZeroSock::ZeroSock() : Sock{g_mocked_sock_fd++} {}
159 : :
160 : : // Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that.
161 : 0 : ZeroSock::~ZeroSock() { m_socket = INVALID_SOCKET; }
162 : :
163 : 0 : ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; }
164 : :
165 : 0 : ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const
166 : : {
167 : 0 : memset(buf, 0x0, len);
168 : 0 : return len;
169 : : }
170 : :
171 : 0 : int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; }
172 : :
173 : 0 : int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; }
174 : :
175 : 0 : int ZeroSock::Listen(int) const { return 0; }
176 : :
177 : 0 : std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const
178 : : {
179 [ # # ]: 0 : if (addr != nullptr) {
180 : : // Pretend all connections come from 5.5.5.5:6789
181 [ # # ]: 0 : memset(addr, 0x00, *addr_len);
182 : 0 : const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in));
183 [ # # ]: 0 : if (*addr_len >= write_len) {
184 : 0 : *addr_len = write_len;
185 : 0 : sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr);
186 : 0 : addr_in->sin_family = AF_INET;
187 : 0 : memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr));
188 : 0 : addr_in->sin_port = htons(6789);
189 : : }
190 : : }
191 : 0 : return std::make_unique<ZeroSock>();
192 : : }
193 : :
194 : 0 : int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const
195 : : {
196 : 0 : std::memset(opt_val, 0x0, *opt_len);
197 : 0 : return 0;
198 : : }
199 : :
200 : 0 : int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; }
201 : :
202 : 0 : int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const
203 : : {
204 : 0 : std::memset(name, 0x0, *name_len);
205 : 0 : return 0;
206 : : }
207 : :
208 : 0 : bool ZeroSock::SetNonBlocking() const { return true; }
209 : :
210 : 0 : bool ZeroSock::IsSelectable() const { return true; }
211 : :
212 : 0 : bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
213 : : {
214 [ # # ]: 0 : if (occurred != nullptr) {
215 : 0 : *occurred = requested;
216 : : }
217 : 0 : return true;
218 : : }
219 : :
220 : 0 : bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
221 : : {
222 [ # # ]: 0 : for (auto& [sock, events] : events_per_sock) {
223 : 0 : (void)sock;
224 : 0 : events.occurred = events.requested;
225 : : }
226 : 0 : return true;
227 : : }
228 : :
229 : 0 : ZeroSock& ZeroSock::operator=(Sock&& other)
230 : : {
231 : 0 : assert(false && "Move of Sock into ZeroSock not allowed.");
232 : : return *this;
233 : : }
234 : :
235 : 0 : StaticContentsSock::StaticContentsSock(const std::string& contents)
236 [ # # ]: 0 : : m_contents{contents}
237 : : {
238 : 0 : }
239 : :
240 : 0 : ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const
241 : : {
242 [ # # ]: 0 : const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)};
243 [ # # ]: 0 : std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes);
244 [ # # ]: 0 : if ((flags & MSG_PEEK) == 0) {
245 : 0 : m_consumed += consume_bytes;
246 : : }
247 : 0 : return consume_bytes;
248 : : }
249 : :
250 : 0 : StaticContentsSock& StaticContentsSock::operator=(Sock&& other)
251 : : {
252 : 0 : assert(false && "Move of Sock into StaticContentsSock not allowed.");
253 : : return *this;
254 : : }
255 : :
256 : 0 : ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
257 : : {
258 : 0 : WAIT_LOCK(m_mutex, lock);
259 : :
260 [ # # ]: 0 : if (m_data.empty()) {
261 [ # # ]: 0 : if (m_eof) {
262 : : return 0;
263 : : }
264 : 0 : errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
265 : 0 : return -1;
266 : : }
267 : :
268 [ # # ]: 0 : const size_t read_bytes{std::min(len, m_data.size())};
269 : :
270 [ # # ]: 0 : std::memcpy(buf, m_data.data(), read_bytes);
271 [ # # ]: 0 : if ((flags & MSG_PEEK) == 0) {
272 : 0 : m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
273 : : }
274 : :
275 : 0 : return read_bytes;
276 : 0 : }
277 : :
278 : 0 : std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
279 : : {
280 : 0 : V1Transport transport{NodeId{0}};
281 : :
282 : 0 : {
283 [ # # ]: 0 : WAIT_LOCK(m_mutex, lock);
284 : :
285 [ # # ]: 0 : WaitForDataOrEof(lock);
286 [ # # # # ]: 0 : if (m_eof && m_data.empty()) {
287 : 0 : return std::nullopt;
288 : : }
289 : :
290 : 0 : for (;;) {
291 [ # # ]: 0 : std::span<const uint8_t> s{m_data};
292 [ # # # # ]: 0 : if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s.
293 : 0 : return std::nullopt;
294 : : }
295 : 0 : m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
296 [ # # # # ]: 0 : if (transport.ReceivedMessageComplete()) {
297 : : break;
298 : : }
299 [ # # ]: 0 : if (m_data.empty()) {
300 [ # # ]: 0 : WaitForDataOrEof(lock);
301 [ # # # # ]: 0 : if (m_eof && m_data.empty()) {
302 : 0 : return std::nullopt;
303 : : }
304 : : }
305 : : }
306 : 0 : }
307 : :
308 : 0 : bool reject{false};
309 [ # # ]: 0 : CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
310 [ # # ]: 0 : if (reject) {
311 : 0 : return std::nullopt;
312 : : }
313 : 0 : return std::make_optional<CNetMessage>(std::move(msg));
314 : 0 : }
315 : :
316 : 0 : void DynSock::Pipe::PushBytes(const void* buf, size_t len)
317 : : {
318 : 0 : LOCK(m_mutex);
319 : 0 : const uint8_t* b = static_cast<const uint8_t*>(buf);
320 [ # # ]: 0 : m_data.insert(m_data.end(), b, b + len);
321 [ # # ]: 0 : m_cond.notify_all();
322 : 0 : }
323 : :
324 : 0 : void DynSock::Pipe::Eof()
325 : : {
326 : 0 : LOCK(m_mutex);
327 : 0 : m_eof = true;
328 [ # # ]: 0 : m_cond.notify_all();
329 : 0 : }
330 : :
331 : 0 : void DynSock::Pipe::WaitForDataOrEof(UniqueLock<Mutex>& lock)
332 : : {
333 : 0 : Assert(lock.mutex() == &m_mutex);
334 : :
335 : 0 : m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
336 : 0 : AssertLockHeld(m_mutex);
337 [ # # # # ]: 0 : return !m_data.empty() || m_eof;
338 : : });
339 : 0 : }
340 : :
341 : 0 : DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets)
342 [ # # # # ]: 0 : : m_pipes{pipes}, m_accept_sockets{accept_sockets}
343 : : {
344 : 0 : }
345 : :
346 : 0 : DynSock::~DynSock()
347 : : {
348 : 0 : m_pipes->send.Eof();
349 [ # # # # ]: 0 : }
350 : :
351 : 0 : ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
352 : : {
353 : 0 : return m_pipes->recv.GetBytes(buf, len, flags);
354 : : }
355 : :
356 : 0 : ssize_t DynSock::Send(const void* buf, size_t len, int) const
357 : : {
358 : 0 : m_pipes->send.PushBytes(buf, len);
359 : 0 : return len;
360 : : }
361 : :
362 : 0 : std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const
363 : : {
364 : 0 : ZeroSock::Accept(addr, addr_len);
365 [ # # ]: 0 : return m_accept_sockets->Pop().value_or(nullptr);
366 : : }
367 : :
368 : 0 : bool DynSock::Wait(std::chrono::milliseconds timeout,
369 : : Event requested,
370 : : Event* occurred) const
371 : : {
372 [ # # ]: 0 : EventsPerSock ev;
373 [ # # ]: 0 : ev.emplace(this, Events{requested});
374 [ # # ]: 0 : const bool ret{WaitMany(timeout, ev)};
375 [ # # ]: 0 : if (occurred != nullptr) {
376 : 0 : *occurred = ev.begin()->second.occurred;
377 : : }
378 : 0 : return ret;
379 : 0 : }
380 : :
381 : 0 : bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
382 : : {
383 : 0 : const auto deadline = std::chrono::steady_clock::now() + timeout;
384 : 0 : bool at_least_one_event_occurred{false};
385 : :
386 : 0 : for (;;) {
387 : : // Check all sockets for readiness without waiting.
388 [ # # # # ]: 0 : for (auto& [sock, events] : events_per_sock) {
389 [ # # ]: 0 : if ((events.requested & Sock::SEND) != 0) {
390 : : // Always ready for Send().
391 : 0 : events.occurred |= Sock::SEND;
392 : 0 : at_least_one_event_occurred = true;
393 : : }
394 : :
395 [ # # ]: 0 : if ((events.requested & Sock::RECV) != 0) {
396 : 0 : auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
397 : 0 : uint8_t b;
398 [ # # # # ]: 0 : if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
399 : 0 : events.occurred |= Sock::RECV;
400 : 0 : at_least_one_event_occurred = true;
401 : : }
402 : : }
403 : : }
404 : :
405 [ # # # # ]: 0 : if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
406 : : break;
407 : : }
408 : :
409 : 0 : std::this_thread::sleep_for(10ms);
410 : 0 : }
411 : :
412 : 0 : return true;
413 : : }
414 : :
415 : 0 : DynSock& DynSock::operator=(Sock&&)
416 : : {
417 : 0 : assert(false && "Move of Sock into DynSock not allowed.");
418 : : return *this;
419 : : }
|