Branch data Line data Source code
1 : : // Copyright (c) 2020-2022 The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #include <test/util/net.h>
6 : :
7 : : #include <net.h>
8 : : #include <net_processing.h>
9 : : #include <netaddress.h>
10 : : #include <netmessagemaker.h>
11 : : #include <node/connection_types.h>
12 : : #include <node/eviction.h>
13 : : #include <protocol.h>
14 : : #include <random.h>
15 : : #include <serialize.h>
16 : : #include <span.h>
17 : : #include <sync.h>
18 : :
19 : : #include <chrono>
20 : : #include <optional>
21 : : #include <vector>
22 : :
23 : 1 : void ConnmanTestMsg::Handshake(CNode& node,
24 : : bool successfully_connected,
25 : : ServiceFlags remote_services,
26 : : ServiceFlags local_services,
27 : : int32_t version,
28 : : bool relay_txs)
29 : : {
30 : 1 : auto& peerman{static_cast<PeerManager&>(*m_msgproc)};
31 : 1 : auto& connman{*this};
32 : :
33 : 1 : peerman.InitializeNode(node, local_services);
34 : 1 : peerman.SendMessages(&node);
35 : 1 : FlushSendBuffer(node); // Drop the version message added by SendMessages.
36 : :
37 : 1 : CSerializedNetMsg msg_version{
38 [ + - ]: 2 : NetMsg::Make(NetMsgType::VERSION,
39 : : version, //
40 [ + - ]: 1 : Using<CustomUintFormatter<8>>(remote_services), //
41 : 1 : int64_t{}, // dummy time
42 : 1 : int64_t{}, // ignored service bits
43 [ + - ]: 2 : CNetAddr::V1(CService{}), // dummy
44 : 1 : int64_t{}, // ignored service bits
45 [ + - ]: 2 : CNetAddr::V1(CService{}), // ignored
46 : 1 : uint64_t{1}, // dummy nonce
47 [ + - ]: 1 : std::string{}, // dummy subver
48 [ + - ]: 1 : int32_t{}, // dummy starting_height
49 : : relay_txs),
50 [ + - ]: 1 : };
51 : :
52 [ + - ]: 1 : (void)connman.ReceiveMsgFrom(node, std::move(msg_version));
53 [ + - ]: 1 : node.fPauseSend = false;
54 [ + - ]: 1 : connman.ProcessMessagesOnce(node);
55 [ + - ]: 1 : peerman.SendMessages(&node);
56 [ + - ]: 1 : FlushSendBuffer(node); // Drop the verack message added by SendMessages.
57 [ - + ]: 1 : if (node.fDisconnect) return;
58 [ - + ]: 1 : assert(node.nVersion == version);
59 [ + - - + ]: 2 : assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
60 [ + - ]: 1 : CNodeStateStats statestats;
61 [ + - - + ]: 1 : assert(peerman.GetNodeStateStats(node.GetId(), statestats));
62 [ + - - + : 1 : assert(statestats.m_relay_txs == (relay_txs && !node.IsBlockOnlyConn()));
- + ]
63 [ - + ]: 1 : assert(statestats.their_services == remote_services);
64 [ + - ]: 1 : if (successfully_connected) {
65 [ + - + - ]: 1 : CSerializedNetMsg msg_verack{NetMsg::Make(NetMsgType::VERACK)};
66 [ + - ]: 1 : (void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
67 [ + - ]: 1 : node.fPauseSend = false;
68 [ + - ]: 1 : connman.ProcessMessagesOnce(node);
69 [ + - ]: 1 : peerman.SendMessages(&node);
70 [ - + ]: 1 : assert(node.fSuccessfullyConnected == true);
71 : 1 : }
72 : 2 : }
73 : :
74 : 3 : void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const
75 : : {
76 [ - + ]: 3 : assert(node.ReceiveMsgBytes(msg_bytes, complete));
77 [ + + ]: 3 : if (complete) {
78 : 2 : node.MarkReceivedMsgsForProcessing();
79 : : }
80 : 3 : }
81 : :
82 : 3 : void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
83 : : {
84 : 3 : LOCK(node.cs_vSend);
85 : 3 : node.vSendMsg.clear();
86 : 3 : node.m_send_memusage = 0;
87 : 5 : while (true) {
88 [ + + ]: 8 : const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
89 [ + + ]: 8 : if (to_send.empty()) break;
90 : 5 : node.m_transport->MarkBytesSent(to_send.size());
91 : 5 : }
92 : 3 : }
93 : :
94 : 2 : bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
95 : : {
96 : 2 : bool queued = node.m_transport->SetMessageToSend(ser_msg);
97 [ - + ]: 2 : assert(queued);
98 : 2 : bool complete{false};
99 : 3 : while (true) {
100 [ + + ]: 5 : const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
101 [ + + ]: 5 : if (to_send.empty()) break;
102 : 3 : NodeReceiveMsgBytes(node, to_send, complete);
103 : 3 : node.m_transport->MarkBytesSent(to_send.size());
104 : 3 : }
105 : 2 : return complete;
106 : : }
107 : :
108 : 10 : CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
109 : : {
110 [ + - ]: 10 : CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true);
111 [ - + ]: 10 : if (!node) return nullptr;
112 : 0 : node->SetCommonVersion(PROTOCOL_VERSION);
113 : 0 : peerman.InitializeNode(*node, ServiceFlags(NODE_NETWORK | NODE_WITNESS));
114 : 0 : node->fSuccessfullyConnected = true;
115 : 0 : AddTestNode(*node);
116 : 0 : return node;
117 : : }
118 : :
119 : 1624 : std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context)
120 : : {
121 : 1624 : std::vector<NodeEvictionCandidate> candidates;
122 [ + - ]: 1624 : candidates.reserve(n_candidates);
123 [ + + ]: 161042 : for (int id = 0; id < n_candidates; ++id) {
124 [ + - ]: 159418 : candidates.push_back({
125 : 159418 : .id=id,
126 : 318836 : .m_connected=std::chrono::seconds{random_context.randrange(100)},
127 : 318836 : .m_min_ping_time=std::chrono::microseconds{random_context.randrange(100)},
128 : 318836 : .m_last_block_time=std::chrono::seconds{random_context.randrange(100)},
129 : 318836 : .m_last_tx_time=std::chrono::seconds{random_context.randrange(100)},
130 : 159418 : .fRelevantServices=random_context.randbool(),
131 : 159418 : .m_relay_txs=random_context.randbool(),
132 : 159418 : .fBloomFilter=random_context.randbool(),
133 : 159418 : .nKeyedNetGroup=random_context.randrange(100u),
134 : 159418 : .prefer_evict=random_context.randbool(),
135 : 159418 : .m_is_local=random_context.randbool(),
136 [ + - ]: 318836 : .m_network=ALL_NETWORKS[random_context.randrange(ALL_NETWORKS.size())],
137 : : .m_noban=false,
138 : : .m_conn_type=ConnectionType::INBOUND,
139 : : });
140 : : }
141 : 1624 : return candidates;
142 : 0 : }
143 : :
144 : : // Have different ZeroSock (or others that inherit from it) objects have different
145 : : // m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two
146 : : // different objects comparing as equal.
147 : : static std::atomic<SOCKET> g_mocked_sock_fd{0};
148 : :
149 : 16 : ZeroSock::ZeroSock() : Sock{g_mocked_sock_fd++} {}
150 : :
151 : : // Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that.
152 : 16 : ZeroSock::~ZeroSock() { m_socket = INVALID_SOCKET; }
153 : :
154 : 32 : ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; }
155 : :
156 : 0 : ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const
157 : : {
158 : 0 : memset(buf, 0x0, len);
159 : 0 : return len;
160 : : }
161 : :
162 : 16 : int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; }
163 : :
164 : 0 : int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; }
165 : :
166 : 0 : int ZeroSock::Listen(int) const { return 0; }
167 : :
168 : 0 : std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const
169 : : {
170 [ # # ]: 0 : if (addr != nullptr) {
171 : : // Pretend all connections come from 5.5.5.5:6789
172 [ # # ]: 0 : memset(addr, 0x00, *addr_len);
173 : 0 : const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in));
174 [ # # ]: 0 : if (*addr_len >= write_len) {
175 : 0 : *addr_len = write_len;
176 : 0 : sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr);
177 : 0 : addr_in->sin_family = AF_INET;
178 : 0 : memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr));
179 : 0 : addr_in->sin_port = htons(6789);
180 : : }
181 : : }
182 : 0 : return std::make_unique<ZeroSock>();
183 : : }
184 : :
185 : 0 : int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const
186 : : {
187 : 0 : std::memset(opt_val, 0x0, *opt_len);
188 : 0 : return 0;
189 : : }
190 : :
191 : 0 : int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; }
192 : :
193 : 0 : int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const
194 : : {
195 : 0 : std::memset(name, 0x0, *name_len);
196 : 0 : return 0;
197 : : }
198 : :
199 : 0 : bool ZeroSock::SetNonBlocking() const { return true; }
200 : :
201 : 0 : bool ZeroSock::IsSelectable() const { return true; }
202 : :
203 : 135 : bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
204 : : {
205 [ + + ]: 135 : if (occurred != nullptr) {
206 : 5 : *occurred = requested;
207 : : }
208 : 135 : return true;
209 : : }
210 : :
211 : 0 : bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
212 : : {
213 [ # # ]: 0 : for (auto& [sock, events] : events_per_sock) {
214 : 0 : (void)sock;
215 : 0 : events.occurred = events.requested;
216 : : }
217 : 0 : return true;
218 : : }
219 : :
220 : 0 : ZeroSock& ZeroSock::operator=(Sock&& other)
221 : : {
222 : 0 : assert(false && "Move of Sock into ZeroSock not allowed.");
223 : : return *this;
224 : : }
225 : :
226 : 16 : StaticContentsSock::StaticContentsSock(const std::string& contents)
227 [ + - ]: 16 : : m_contents{contents}
228 : : {
229 : 16 : }
230 : :
231 : 332 : ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const
232 : : {
233 [ + + ]: 332 : const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)};
234 [ + + ]: 332 : std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes);
235 [ + + ]: 332 : if ((flags & MSG_PEEK) == 0) {
236 : 166 : m_consumed += consume_bytes;
237 : : }
238 : 332 : return consume_bytes;
239 : : }
240 : :
241 : 0 : StaticContentsSock& StaticContentsSock::operator=(Sock&& other)
242 : : {
243 : 0 : assert(false && "Move of Sock into StaticContentsSock not allowed.");
244 : : return *this;
245 : : }
246 : :
247 : 0 : ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
248 : : {
249 : 0 : WAIT_LOCK(m_mutex, lock);
250 : :
251 [ # # ]: 0 : if (m_data.empty()) {
252 [ # # ]: 0 : if (m_eof) {
253 : : return 0;
254 : : }
255 : 0 : errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
256 : 0 : return -1;
257 : : }
258 : :
259 [ # # ]: 0 : const size_t read_bytes{std::min(len, m_data.size())};
260 : :
261 [ # # ]: 0 : std::memcpy(buf, m_data.data(), read_bytes);
262 [ # # ]: 0 : if ((flags & MSG_PEEK) == 0) {
263 : 0 : m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
264 : : }
265 : :
266 : 0 : return read_bytes;
267 : 0 : }
268 : :
269 : 0 : std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
270 : : {
271 : 0 : V1Transport transport{NodeId{0}};
272 : :
273 : 0 : {
274 [ # # ]: 0 : WAIT_LOCK(m_mutex, lock);
275 : :
276 [ # # ]: 0 : WaitForDataOrEof(lock);
277 [ # # # # ]: 0 : if (m_eof && m_data.empty()) {
278 : 0 : return std::nullopt;
279 : : }
280 : :
281 : 0 : for (;;) {
282 [ # # ]: 0 : Span<const uint8_t> s{m_data};
283 [ # # # # ]: 0 : if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s.
284 : 0 : return std::nullopt;
285 : : }
286 : 0 : m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
287 [ # # # # ]: 0 : if (transport.ReceivedMessageComplete()) {
288 : : break;
289 : : }
290 [ # # ]: 0 : if (m_data.empty()) {
291 [ # # ]: 0 : WaitForDataOrEof(lock);
292 [ # # # # ]: 0 : if (m_eof && m_data.empty()) {
293 : 0 : return std::nullopt;
294 : : }
295 : : }
296 : : }
297 : 0 : }
298 : :
299 : 0 : bool reject{false};
300 [ # # ]: 0 : CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
301 [ # # ]: 0 : if (reject) {
302 : 0 : return std::nullopt;
303 : : }
304 : 0 : return std::make_optional<CNetMessage>(std::move(msg));
305 : 0 : }
306 : :
307 : 0 : void DynSock::Pipe::PushBytes(const void* buf, size_t len)
308 : : {
309 : 0 : LOCK(m_mutex);
310 : 0 : const uint8_t* b = static_cast<const uint8_t*>(buf);
311 [ # # ]: 0 : m_data.insert(m_data.end(), b, b + len);
312 [ # # ]: 0 : m_cond.notify_all();
313 : 0 : }
314 : :
315 : 0 : void DynSock::Pipe::Eof()
316 : : {
317 : 0 : LOCK(m_mutex);
318 : 0 : m_eof = true;
319 [ # # ]: 0 : m_cond.notify_all();
320 : 0 : }
321 : :
322 : 0 : void DynSock::Pipe::WaitForDataOrEof(UniqueLock<Mutex>& lock)
323 : : {
324 : 0 : Assert(lock.mutex() == &m_mutex);
325 : :
326 : 0 : m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
327 : 0 : AssertLockHeld(m_mutex);
328 [ # # # # ]: 0 : return !m_data.empty() || m_eof;
329 : : });
330 : 0 : }
331 : :
332 : 0 : DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets)
333 [ # # # # ]: 0 : : m_pipes{pipes}, m_accept_sockets{accept_sockets}
334 : : {
335 : 0 : }
336 : :
337 : 0 : DynSock::~DynSock()
338 : : {
339 : 0 : m_pipes->send.Eof();
340 [ # # # # ]: 0 : }
341 : :
342 : 0 : ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
343 : : {
344 : 0 : return m_pipes->recv.GetBytes(buf, len, flags);
345 : : }
346 : :
347 : 0 : ssize_t DynSock::Send(const void* buf, size_t len, int) const
348 : : {
349 : 0 : m_pipes->send.PushBytes(buf, len);
350 : 0 : return len;
351 : : }
352 : :
353 : 0 : std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const
354 : : {
355 : 0 : ZeroSock::Accept(addr, addr_len);
356 [ # # ]: 0 : return m_accept_sockets->Pop().value_or(nullptr);
357 : : }
358 : :
359 : 0 : bool DynSock::Wait(std::chrono::milliseconds timeout,
360 : : Event requested,
361 : : Event* occurred) const
362 : : {
363 [ # # ]: 0 : EventsPerSock ev;
364 [ # # ]: 0 : ev.emplace(this, Events{requested});
365 [ # # ]: 0 : const bool ret{WaitMany(timeout, ev)};
366 [ # # ]: 0 : if (occurred != nullptr) {
367 : 0 : *occurred = ev.begin()->second.occurred;
368 : : }
369 : 0 : return ret;
370 : 0 : }
371 : :
372 : 0 : bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
373 : : {
374 : 0 : const auto deadline = std::chrono::steady_clock::now() + timeout;
375 : 0 : bool at_least_one_event_occurred{false};
376 : :
377 : 0 : for (;;) {
378 : : // Check all sockets for readiness without waiting.
379 [ # # # # ]: 0 : for (auto& [sock, events] : events_per_sock) {
380 [ # # ]: 0 : if ((events.requested & Sock::SEND) != 0) {
381 : : // Always ready for Send().
382 : 0 : events.occurred |= Sock::SEND;
383 : 0 : at_least_one_event_occurred = true;
384 : : }
385 : :
386 [ # # ]: 0 : if ((events.requested & Sock::RECV) != 0) {
387 : 0 : auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
388 : 0 : uint8_t b;
389 [ # # # # ]: 0 : if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
390 : 0 : events.occurred |= Sock::RECV;
391 : 0 : at_least_one_event_occurred = true;
392 : : }
393 : : }
394 : : }
395 : :
396 [ # # # # ]: 0 : if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
397 : : break;
398 : : }
399 : :
400 : 0 : std::this_thread::sleep_for(10ms);
401 : 0 : }
402 : :
403 : 0 : return true;
404 : : }
405 : :
406 : 0 : DynSock& DynSock::operator=(Sock&&)
407 : : {
408 : 0 : assert(false && "Move of Sock into DynSock not allowed.");
409 : : return *this;
410 : : }
|