Branch data Line data Source code
1 : : // Copyright (c) 2015-present The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #include <zmq/zmqnotificationinterface.h>
6 : :
7 : : #include <common/args.h>
8 : : #include <kernel/mempool_entry.h>
9 : : #include <kernel/types.h>
10 : : #include <logging.h>
11 : : #include <netbase.h>
12 : : #include <primitives/block.h>
13 : : #include <primitives/transaction.h>
14 : : #include <util/check.h>
15 : : #include <zmq/zmqabstractnotifier.h>
16 : : #include <zmq/zmqpublishnotifier.h>
17 : : #include <zmq/zmqutil.h>
18 : :
19 : : #include <zmq.h>
20 : :
21 : : #include <map>
22 : : #include <string>
23 : : #include <utility>
24 : : #include <vector>
25 : :
26 : : using kernel::ChainstateRole;
27 : :
28 : 8 : CZMQNotificationInterface::CZMQNotificationInterface() = default;
29 : :
30 : 8 : CZMQNotificationInterface::~CZMQNotificationInterface()
31 : : {
32 : 8 : Shutdown();
33 : 8 : }
34 : :
35 : 2 : std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
36 : : {
37 : 2 : std::list<const CZMQAbstractNotifier*> result;
38 [ + + ]: 10 : for (const auto& n : notifiers) {
39 [ + - ]: 8 : result.push_back(n.get());
40 : : }
41 : 2 : return result;
42 : 0 : }
43 : :
44 : 1065 : std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<std::byte>&, const CBlockIndex&)> get_block_by_index)
45 : : {
46 [ + - ]: 1065 : std::map<std::string, CZMQNotifierFactory> factories;
47 [ + - + - ]: 1065 : factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
48 [ + - + - ]: 1065 : factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
49 [ + - + - ]: 1065 : factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
50 : 2 : return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
51 : 1065 : };
52 [ + - + - ]: 1065 : factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
53 [ + - + - ]: 1065 : factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
54 : :
55 : 1065 : std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
56 [ + + ]: 6390 : for (const auto& entry : factories)
57 : : {
58 [ + - ]: 5325 : std::string arg("-zmq" + entry.first);
59 : 5325 : const auto& factory = entry.second;
60 [ + - + + ]: 5343 : for (std::string& address : gArgs.GetArgs(arg)) {
61 : : // libzmq uses prefix "ipc://" for UNIX domain sockets
62 [ - + - + : 18 : if (address.starts_with(ADDR_PREFIX_UNIX)) {
+ + ]
63 [ - + ]: 4 : address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
64 : : }
65 : :
66 [ + - ]: 18 : std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
67 [ + - ]: 18 : notifier->SetType(entry.first);
68 [ + - ]: 18 : notifier->SetAddress(address);
69 [ + - + - : 36 : notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
+ - ]
70 [ + - ]: 18 : notifiers.push_back(std::move(notifier));
71 : 5343 : }
72 : 5325 : }
73 : :
74 [ + + ]: 1065 : if (!notifiers.empty())
75 : : {
76 [ + - + - ]: 8 : std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
77 : 8 : notificationInterface->notifiers = std::move(notifiers);
78 : :
79 [ + - + + ]: 8 : if (notificationInterface->Initialize()) {
80 : : return notificationInterface;
81 : : }
82 : 2 : }
83 : :
84 : 1059 : return nullptr;
85 : 1065 : }
86 : :
87 : : // Called at startup to conditionally set up ZMQ socket(s)
88 : 8 : bool CZMQNotificationInterface::Initialize()
89 : : {
90 : 8 : int major = 0, minor = 0, patch = 0;
91 : 8 : zmq_version(&major, &minor, &patch);
92 [ + - ]: 8 : LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
93 : :
94 [ + - ]: 8 : LogDebug(BCLog::ZMQ, "Initialize notification interface\n");
95 [ - + ]: 8 : assert(!pcontext);
96 : :
97 : 8 : pcontext = zmq_ctx_new();
98 : :
99 [ - + ]: 8 : if (!pcontext)
100 : : {
101 [ # # ]: 0 : zmqError("Unable to initialize context");
102 : 0 : return false;
103 : : }
104 : :
105 [ + + ]: 22 : for (auto& notifier : notifiers) {
106 [ + + ]: 16 : if (notifier->Initialize(pcontext)) {
107 [ + - - + : 56 : LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
- + + - ]
108 : : } else {
109 [ + - - + : 8 : LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
- + + - ]
110 : 2 : return false;
111 : : }
112 : : }
113 : :
114 : : return true;
115 : : }
116 : :
117 : : // Called during shutdown sequence
118 : 8 : void CZMQNotificationInterface::Shutdown()
119 : : {
120 [ + - ]: 8 : LogDebug(BCLog::ZMQ, "Shutdown notification interface\n");
121 [ + - ]: 8 : if (pcontext)
122 : : {
123 [ + + ]: 26 : for (auto& notifier : notifiers) {
124 [ + - - + : 72 : LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
- + + - ]
125 : 18 : notifier->Shutdown();
126 : : }
127 : 8 : zmq_ctx_term(pcontext);
128 : :
129 : 8 : pcontext = nullptr;
130 : : }
131 : 8 : }
132 : :
133 : : namespace {
134 : :
135 : : template <typename Function>
136 : 169 : void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
137 : : {
138 [ + + ]: 510 : for (auto i = notifiers.begin(); i != notifiers.end(); ) {
139 : 341 : CZMQAbstractNotifier* notifier = i->get();
140 [ + - ]: 341 : if (func(notifier)) {
141 : 341 : ++i;
142 : : } else {
143 : 0 : notifier->Shutdown();
144 : 0 : i = notifiers.erase(i);
145 : : }
146 : : }
147 : 169 : }
148 : :
149 : : } // anonymous namespace
150 : :
151 : 32 : void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
152 : : {
153 [ + - ]: 32 : if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
154 : : return;
155 : :
156 : 32 : TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
157 : 81 : return notifier->NotifyBlock(pindexNew);
158 : : });
159 : : }
160 : :
161 : 28 : void CZMQNotificationInterface::TransactionAddedToMempool(const NewMempoolTransactionInfo& ptx, uint64_t mempool_sequence)
162 : : {
163 : 28 : const CTransaction& tx = *(ptx.info.m_tx);
164 : :
165 : 28 : TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
166 [ + - - + ]: 36 : return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
167 : : });
168 : 28 : }
169 : :
170 : 4 : void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
171 : : {
172 : : // Called for all non-block inclusion reasons
173 : 4 : const CTransaction& tx = *ptx;
174 : :
175 : 4 : TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
176 : 4 : return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
177 : : });
178 : 4 : }
179 : :
180 : 34 : void CZMQNotificationInterface::BlockConnected(const ChainstateRole& role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
181 : : {
182 [ + - ]: 34 : if (role.historical) {
183 : : return;
184 : : }
185 [ + + ]: 93 : for (const CTransactionRef& ptx : pblock->vtx) {
186 : 59 : const CTransaction& tx = *ptx;
187 : 59 : TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
188 : 117 : return notifier->NotifyTransaction(tx);
189 : : });
190 : : }
191 : :
192 : : // Next we notify BlockConnect listeners for *all* blocks
193 : 34 : TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
194 : 84 : return notifier->NotifyBlockConnect(pindexConnected);
195 : : });
196 : : }
197 : :
198 : 5 : void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
199 : : {
200 [ + + ]: 12 : for (const CTransactionRef& ptx : pblock->vtx) {
201 : 7 : const CTransaction& tx = *ptx;
202 : 7 : TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
203 : 11 : return notifier->NotifyTransaction(tx);
204 : : });
205 : : }
206 : :
207 : : // Next we notify BlockDisconnect listeners for *all* blocks
208 : 5 : TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
209 : 8 : return notifier->NotifyBlockDisconnect(pindexDisconnected);
210 : : });
211 : 5 : }
212 : :
213 : : std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
|