Branch data Line data Source code
1 : : // Copyright (c) 2020-present The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #include <test/util/net.h>
6 : :
7 : : #include <net.h>
8 : : #include <net_processing.h>
9 : : #include <netaddress.h>
10 : : #include <netmessagemaker.h>
11 : : #include <node/connection_types.h>
12 : : #include <node/eviction.h>
13 : : #include <protocol.h>
14 : : #include <random.h>
15 : : #include <serialize.h>
16 : : #include <span.h>
17 : : #include <sync.h>
18 : :
19 : : #include <chrono>
20 : : #include <optional>
21 : : #include <vector>
22 : :
23 : 19165 : void ConnmanTestMsg::Handshake(CNode& node,
24 : : bool successfully_connected,
25 : : ServiceFlags remote_services,
26 : : ServiceFlags local_services,
27 : : int32_t version,
28 : : bool relay_txs)
29 : : {
30 : 19165 : auto& peerman{static_cast<PeerManager&>(*m_msgproc)};
31 : 19165 : auto& connman{*this};
32 : :
33 : 19165 : peerman.InitializeNode(node, local_services);
34 : 19165 : peerman.SendMessages(node);
35 : 19165 : FlushSendBuffer(node); // Drop the version message added by SendMessages.
36 : :
37 : 19165 : CSerializedNetMsg msg_version{
38 : 19165 : NetMsg::Make(NetMsgType::VERSION,
39 : : version, //
40 [ + - ]: 19165 : Using<CustomUintFormatter<8>>(remote_services), //
41 : 19165 : int64_t{}, // dummy time
42 : 19165 : int64_t{}, // ignored service bits
43 [ + - ]: 38330 : CNetAddr::V1(CService{}), // dummy
44 : 19165 : int64_t{}, // ignored service bits
45 [ + - ]: 38330 : CNetAddr::V1(CService{}), // ignored
46 : 19165 : uint64_t{1}, // dummy nonce
47 [ + - ]: 19165 : std::string{}, // dummy subver
48 [ + - ]: 19165 : int32_t{}, // dummy starting_height
49 : : relay_txs),
50 [ + - ]: 19165 : };
51 : :
52 [ + - ]: 19165 : (void)connman.ReceiveMsgFrom(node, std::move(msg_version));
53 [ + - ]: 19165 : node.fPauseSend = false;
54 [ + - ]: 19165 : connman.ProcessMessagesOnce(node);
55 [ + - ]: 19165 : peerman.SendMessages(node);
56 [ + - ]: 19165 : FlushSendBuffer(node); // Drop the verack message added by SendMessages.
57 [ + + ]: 19165 : if (node.fDisconnect) return;
58 [ - + ]: 11426 : assert(node.nVersion == version);
59 [ + + - + ]: 14381 : assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
60 [ + - ]: 11426 : CNodeStateStats statestats;
61 [ + - - + ]: 11426 : assert(peerman.GetNodeStateStats(node.GetId(), statestats));
62 [ + + + + : 12351 : assert(statestats.m_relay_txs == (relay_txs && !node.IsBlockOnlyConn()));
- + ]
63 [ - + ]: 11426 : assert(statestats.their_services == remote_services);
64 [ + + ]: 11426 : if (successfully_connected) {
65 [ + - + - ]: 8536 : CSerializedNetMsg msg_verack{NetMsg::Make(NetMsgType::VERACK)};
66 [ + - ]: 8536 : (void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
67 [ + - ]: 8536 : node.fPauseSend = false;
68 [ + - ]: 8536 : connman.ProcessMessagesOnce(node);
69 [ + - ]: 8536 : peerman.SendMessages(node);
70 [ - + ]: 8536 : assert(node.fSuccessfullyConnected == true);
71 : 8536 : }
72 : 30591 : }
73 : :
74 : 12040 : void ConnmanTestMsg::ResetAddrCache() { m_addr_response_caches = {}; }
75 : :
76 : 12040 : void ConnmanTestMsg::ResetMaxOutboundCycle()
77 : : {
78 : 12040 : LOCK(m_total_bytes_sent_mutex);
79 : 12040 : nMaxOutboundCycleStartTime = 0s;
80 [ + - ]: 12040 : nMaxOutboundTotalBytesSentInCycle = 0;
81 : 12040 : }
82 : :
83 : 12040 : void ConnmanTestMsg::Reset()
84 : : {
85 : 12040 : ResetAddrCache();
86 : 12040 : ResetMaxOutboundCycle();
87 : 12040 : m_private_broadcast.m_outbound_tor_ok_at_least_once.store(false);
88 : 12040 : m_private_broadcast.m_num_to_open.store(0);
89 : 12040 : }
90 : :
91 : 333969 : void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, std::span<const uint8_t> msg_bytes, bool& complete) const
92 : : {
93 [ - + ]: 333969 : assert(node.ReceiveMsgBytes(msg_bytes, complete));
94 [ + + ]: 333969 : if (complete) {
95 : 163969 : node.MarkReceivedMsgsForProcessing();
96 : : }
97 : 333969 : }
98 : :
99 : 184547 : void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
100 : : {
101 : 184547 : LOCK(node.cs_vSend);
102 : 184547 : node.vSendMsg.clear();
103 : 184547 : node.m_send_memusage = 0;
104 : 45754 : while (true) {
105 [ + + ]: 230301 : const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
106 [ + + ]: 230301 : if (to_send.empty()) break;
107 : 45754 : node.m_transport->MarkBytesSent(to_send.size());
108 : 45754 : }
109 : 184547 : }
110 : :
111 : 173918 : bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
112 : : {
113 : 173918 : bool queued = node.m_transport->SetMessageToSend(ser_msg);
114 [ - + ]: 173918 : assert(queued);
115 : 173918 : bool complete{false};
116 : 333969 : while (true) {
117 [ + + ]: 507887 : const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
118 [ + + ]: 507887 : if (to_send.empty()) break;
119 : 333969 : NodeReceiveMsgBytes(node, to_send, complete);
120 : 333969 : node.m_transport->MarkBytesSent(to_send.size());
121 : 333969 : }
122 : 173918 : return complete;
123 : : }
124 : :
125 : 0 : CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
126 : : {
127 [ # # # # : 0 : CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true, /*proxy_override=*/std::nullopt);
# # ]
128 [ # # ]: 0 : if (!node) return nullptr;
129 : 0 : node->SetCommonVersion(PROTOCOL_VERSION);
130 : 0 : peerman.InitializeNode(*node, ServiceFlags(NODE_NETWORK | NODE_WITNESS));
131 : 0 : node->fSuccessfullyConnected = true;
132 : 0 : AddTestNode(*node);
133 : 0 : return node;
134 : : }
135 : :
136 : 0 : std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context)
137 : : {
138 : 0 : std::vector<NodeEvictionCandidate> candidates;
139 [ # # ]: 0 : candidates.reserve(n_candidates);
140 [ # # ]: 0 : for (int id = 0; id < n_candidates; ++id) {
141 [ # # ]: 0 : candidates.push_back({
142 : 0 : .id=id,
143 : 0 : .m_connected=std::chrono::seconds{random_context.randrange(100)},
144 : 0 : .m_min_ping_time=std::chrono::microseconds{random_context.randrange(100)},
145 : 0 : .m_last_block_time=std::chrono::seconds{random_context.randrange(100)},
146 : 0 : .m_last_tx_time=std::chrono::seconds{random_context.randrange(100)},
147 : 0 : .fRelevantServices=random_context.randbool(),
148 : 0 : .m_relay_txs=random_context.randbool(),
149 : 0 : .fBloomFilter=random_context.randbool(),
150 : 0 : .nKeyedNetGroup=random_context.randrange(100u),
151 : 0 : .prefer_evict=random_context.randbool(),
152 : 0 : .m_is_local=random_context.randbool(),
153 [ # # ]: 0 : .m_network=ALL_NETWORKS[random_context.randrange(ALL_NETWORKS.size())],
154 : : .m_noban=false,
155 : : .m_conn_type=ConnectionType::INBOUND,
156 : : });
157 : : }
158 : 0 : return candidates;
159 : 0 : }
160 : :
161 : : // Have different ZeroSock (or others that inherit from it) objects have different
162 : : // m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two
163 : : // different objects comparing as equal.
164 : : static std::atomic<SOCKET> g_mocked_sock_fd{0};
165 : :
166 : 0 : ZeroSock::ZeroSock() : Sock{g_mocked_sock_fd++} {}
167 : :
168 : : // Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that.
169 : 0 : ZeroSock::~ZeroSock() { m_socket = INVALID_SOCKET; }
170 : :
171 : 0 : ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; }
172 : :
173 : 0 : ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const
174 : : {
175 : 0 : memset(buf, 0x0, len);
176 : 0 : return len;
177 : : }
178 : :
179 : 0 : int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; }
180 : :
181 : 0 : int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; }
182 : :
183 : 0 : int ZeroSock::Listen(int) const { return 0; }
184 : :
185 : 0 : std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const
186 : : {
187 [ # # ]: 0 : if (addr != nullptr) {
188 : : // Pretend all connections come from 5.5.5.5:6789
189 [ # # ]: 0 : memset(addr, 0x00, *addr_len);
190 : 0 : const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in));
191 [ # # ]: 0 : if (*addr_len >= write_len) {
192 : 0 : *addr_len = write_len;
193 : 0 : sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr);
194 : 0 : addr_in->sin_family = AF_INET;
195 : 0 : memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr));
196 : 0 : addr_in->sin_port = htons(6789);
197 : : }
198 : : }
199 : 0 : return std::make_unique<ZeroSock>();
200 : : }
201 : :
202 : 0 : int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const
203 : : {
204 : 0 : std::memset(opt_val, 0x0, *opt_len);
205 : 0 : return 0;
206 : : }
207 : :
208 : 0 : int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; }
209 : :
210 : 0 : int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const
211 : : {
212 : 0 : std::memset(name, 0x0, *name_len);
213 : 0 : return 0;
214 : : }
215 : :
216 : 0 : bool ZeroSock::SetNonBlocking() const { return true; }
217 : :
218 : 0 : bool ZeroSock::IsSelectable() const { return true; }
219 : :
220 : 0 : bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
221 : : {
222 [ # # ]: 0 : if (occurred != nullptr) {
223 : 0 : *occurred = requested;
224 : : }
225 : 0 : return true;
226 : : }
227 : :
228 : 0 : bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
229 : : {
230 [ # # ]: 0 : for (auto& [sock, events] : events_per_sock) {
231 : 0 : (void)sock;
232 : 0 : events.occurred = events.requested;
233 : : }
234 : 0 : return true;
235 : : }
236 : :
237 : 0 : ZeroSock& ZeroSock::operator=(Sock&& other)
238 : : {
239 : 0 : assert(false && "Move of Sock into ZeroSock not allowed.");
240 : : return *this;
241 : : }
242 : :
243 : 0 : StaticContentsSock::StaticContentsSock(const std::string& contents)
244 [ # # ]: 0 : : m_contents{contents}
245 : : {
246 : 0 : }
247 : :
248 : 0 : ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const
249 : : {
250 [ # # # # ]: 0 : const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)};
251 [ # # ]: 0 : std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes);
252 [ # # ]: 0 : if ((flags & MSG_PEEK) == 0) {
253 : 0 : m_consumed += consume_bytes;
254 : : }
255 : 0 : return consume_bytes;
256 : : }
257 : :
258 : 0 : StaticContentsSock& StaticContentsSock::operator=(Sock&& other)
259 : : {
260 : 0 : assert(false && "Move of Sock into StaticContentsSock not allowed.");
261 : : return *this;
262 : : }
263 : :
264 : 0 : ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
265 : : {
266 : 0 : WAIT_LOCK(m_mutex, lock);
267 : :
268 [ # # ]: 0 : if (m_data.empty()) {
269 [ # # ]: 0 : if (m_eof) {
270 : : return 0;
271 : : }
272 : 0 : errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
273 : 0 : return -1;
274 : : }
275 : :
276 [ # # # # ]: 0 : const size_t read_bytes{std::min(len, m_data.size())};
277 : :
278 [ # # ]: 0 : std::memcpy(buf, m_data.data(), read_bytes);
279 [ # # ]: 0 : if ((flags & MSG_PEEK) == 0) {
280 : 0 : m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
281 : : }
282 : :
283 : 0 : return read_bytes;
284 : 0 : }
285 : :
286 : 0 : std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
287 : : {
288 : 0 : V1Transport transport{NodeId{0}};
289 : :
290 : 0 : {
291 [ # # ]: 0 : WAIT_LOCK(m_mutex, lock);
292 : :
293 [ # # ]: 0 : WaitForDataOrEof(lock);
294 [ # # # # ]: 0 : if (m_eof && m_data.empty()) {
295 : 0 : return std::nullopt;
296 : : }
297 : :
298 : 0 : for (;;) {
299 [ # # ]: 0 : std::span<const uint8_t> s{m_data};
300 [ # # # # ]: 0 : if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s.
301 : 0 : return std::nullopt;
302 : : }
303 [ # # ]: 0 : m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
304 [ # # # # ]: 0 : if (transport.ReceivedMessageComplete()) {
305 : : break;
306 : : }
307 [ # # ]: 0 : if (m_data.empty()) {
308 [ # # ]: 0 : WaitForDataOrEof(lock);
309 [ # # # # ]: 0 : if (m_eof && m_data.empty()) {
310 : 0 : return std::nullopt;
311 : : }
312 : : }
313 : : }
314 : 0 : }
315 : :
316 : 0 : bool reject{false};
317 [ # # ]: 0 : CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
318 [ # # ]: 0 : if (reject) {
319 : 0 : return std::nullopt;
320 : : }
321 : 0 : return std::make_optional<CNetMessage>(std::move(msg));
322 : 0 : }
323 : :
324 : 0 : void DynSock::Pipe::PushBytes(const void* buf, size_t len)
325 : : {
326 : 0 : LOCK(m_mutex);
327 : 0 : const uint8_t* b = static_cast<const uint8_t*>(buf);
328 [ # # ]: 0 : m_data.insert(m_data.end(), b, b + len);
329 [ # # ]: 0 : m_cond.notify_all();
330 : 0 : }
331 : :
332 : 0 : void DynSock::Pipe::Eof()
333 : : {
334 : 0 : LOCK(m_mutex);
335 : 0 : m_eof = true;
336 [ # # ]: 0 : m_cond.notify_all();
337 : 0 : }
338 : :
339 : 0 : void DynSock::Pipe::WaitForDataOrEof(UniqueLock<Mutex>& lock)
340 : : {
341 [ # # ]: 0 : Assert(lock.mutex() == &m_mutex);
342 : :
343 : 0 : m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
344 : 0 : AssertLockHeld(m_mutex);
345 [ # # # # ]: 0 : return !m_data.empty() || m_eof;
346 : : });
347 : 0 : }
348 : :
349 : 0 : DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets)
350 [ # # # # ]: 0 : : m_pipes{pipes}, m_accept_sockets{accept_sockets}
351 : : {
352 : 0 : }
353 : :
354 : 0 : DynSock::~DynSock()
355 : : {
356 : 0 : m_pipes->send.Eof();
357 [ # # # # ]: 0 : }
358 : :
359 : 0 : ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
360 : : {
361 : 0 : return m_pipes->recv.GetBytes(buf, len, flags);
362 : : }
363 : :
364 : 0 : ssize_t DynSock::Send(const void* buf, size_t len, int) const
365 : : {
366 : 0 : m_pipes->send.PushBytes(buf, len);
367 : 0 : return len;
368 : : }
369 : :
370 : 0 : std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const
371 : : {
372 : 0 : ZeroSock::Accept(addr, addr_len);
373 [ # # ]: 0 : return m_accept_sockets->Pop().value_or(nullptr);
374 : : }
375 : :
376 : 0 : bool DynSock::Wait(std::chrono::milliseconds timeout,
377 : : Event requested,
378 : : Event* occurred) const
379 : : {
380 [ # # ]: 0 : EventsPerSock ev;
381 [ # # ]: 0 : ev.emplace(this, Events{requested});
382 [ # # ]: 0 : const bool ret{WaitMany(timeout, ev)};
383 [ # # ]: 0 : if (occurred != nullptr) {
384 : 0 : *occurred = ev.begin()->second.occurred;
385 : : }
386 : 0 : return ret;
387 : 0 : }
388 : :
389 : 0 : bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
390 : : {
391 : 0 : const auto deadline = std::chrono::steady_clock::now() + timeout;
392 : 0 : bool at_least_one_event_occurred{false};
393 : :
394 : 0 : for (;;) {
395 : : // Check all sockets for readiness without waiting.
396 [ # # # # ]: 0 : for (auto& [sock, events] : events_per_sock) {
397 [ # # ]: 0 : if ((events.requested & Sock::SEND) != 0) {
398 : : // Always ready for Send().
399 : 0 : events.occurred |= Sock::SEND;
400 : 0 : at_least_one_event_occurred = true;
401 : : }
402 : :
403 [ # # ]: 0 : if ((events.requested & Sock::RECV) != 0) {
404 : 0 : auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
405 : 0 : uint8_t b;
406 [ # # # # ]: 0 : if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
407 : 0 : events.occurred |= Sock::RECV;
408 : 0 : at_least_one_event_occurred = true;
409 : : }
410 : : }
411 : : }
412 : :
413 [ # # # # ]: 0 : if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
414 : : break;
415 : : }
416 : :
417 : 0 : std::this_thread::sleep_for(10ms);
418 : 0 : }
419 : :
420 : 0 : return true;
421 : : }
422 : :
423 : 0 : DynSock& DynSock::operator=(Sock&&)
424 : : {
425 : 0 : assert(false && "Move of Sock into DynSock not allowed.");
426 : : return *this;
427 : : }
|