Branch data Line data Source code
1 : : // Copyright (c) 2015-2022 The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #include <zmq/zmqnotificationinterface.h>
6 : :
7 : : #include <common/args.h>
8 : : #include <kernel/chain.h>
9 : : #include <kernel/mempool_entry.h>
10 : : #include <logging.h>
11 : : #include <netbase.h>
12 : : #include <primitives/block.h>
13 : : #include <primitives/transaction.h>
14 : : #include <validationinterface.h>
15 : : #include <zmq/zmqabstractnotifier.h>
16 : : #include <zmq/zmqpublishnotifier.h>
17 : : #include <zmq/zmqutil.h>
18 : :
19 : : #include <zmq.h>
20 : :
21 : : #include <cassert>
22 : : #include <map>
23 : : #include <string>
24 : : #include <utility>
25 : : #include <vector>
26 : :
27 : 8 : CZMQNotificationInterface::CZMQNotificationInterface() = default;
28 : :
29 : 16 : CZMQNotificationInterface::~CZMQNotificationInterface()
30 : : {
31 : 8 : Shutdown();
32 : 16 : }
33 : :
34 : 2 : std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
35 : : {
36 : 2 : std::list<const CZMQAbstractNotifier*> result;
37 [ + + ]: 10 : for (const auto& n : notifiers) {
38 [ + - ]: 8 : result.push_back(n.get());
39 : : }
40 : 2 : return result;
41 : 0 : }
42 : :
43 : 913 : std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<uint8_t>&, const CBlockIndex&)> get_block_by_index)
44 : : {
45 [ + - ]: 913 : std::map<std::string, CZMQNotifierFactory> factories;
46 [ + - + - ]: 913 : factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
47 [ + - + - ]: 913 : factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
48 [ + - + - ]: 913 : factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
49 : 2 : return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
50 : 913 : };
51 [ + - + - ]: 913 : factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
52 [ + - + - ]: 913 : factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
53 : :
54 : 913 : std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
55 [ + + ]: 5478 : for (const auto& entry : factories)
56 : : {
57 [ + - ]: 4565 : std::string arg("-zmq" + entry.first);
58 : 4565 : const auto& factory = entry.second;
59 [ + - + + ]: 4583 : for (std::string& address : gArgs.GetArgs(arg)) {
60 : : // libzmq uses prefix "ipc://" for UNIX domain sockets
61 [ + - + + ]: 18 : if (address.substr(0, ADDR_PREFIX_UNIX.length()) == ADDR_PREFIX_UNIX) {
62 [ + - ]: 4 : address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
63 : : }
64 : :
65 [ + - ]: 18 : std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
66 [ + - ]: 18 : notifier->SetType(entry.first);
67 [ + - ]: 18 : notifier->SetAddress(address);
68 [ + - + - : 36 : notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
+ - ]
69 [ + - ]: 18 : notifiers.push_back(std::move(notifier));
70 : 4583 : }
71 : 4565 : }
72 : :
73 [ + + ]: 913 : if (!notifiers.empty())
74 : : {
75 [ + - + - ]: 8 : std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
76 : 8 : notificationInterface->notifiers = std::move(notifiers);
77 : :
78 [ + - + + ]: 8 : if (notificationInterface->Initialize()) {
79 : : return notificationInterface;
80 : : }
81 : 2 : }
82 : :
83 : 907 : return nullptr;
84 : 913 : }
85 : :
86 : : // Called at startup to conditionally set up ZMQ socket(s)
87 : 8 : bool CZMQNotificationInterface::Initialize()
88 : : {
89 : 8 : int major = 0, minor = 0, patch = 0;
90 : 8 : zmq_version(&major, &minor, &patch);
91 [ + - ]: 8 : LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
92 : :
93 [ + - ]: 8 : LogDebug(BCLog::ZMQ, "Initialize notification interface\n");
94 [ - + ]: 8 : assert(!pcontext);
95 : :
96 : 8 : pcontext = zmq_ctx_new();
97 : :
98 [ - + ]: 8 : if (!pcontext)
99 : : {
100 [ # # ]: 0 : zmqError("Unable to initialize context");
101 : 0 : return false;
102 : : }
103 : :
104 [ + + ]: 22 : for (auto& notifier : notifiers) {
105 [ + + ]: 16 : if (notifier->Initialize(pcontext)) {
106 [ + - + - : 28 : LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ - ]
107 : : } else {
108 [ + - + - : 4 : LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ - ]
109 : 2 : return false;
110 : : }
111 : : }
112 : :
113 : : return true;
114 : : }
115 : :
116 : : // Called during shutdown sequence
117 : 8 : void CZMQNotificationInterface::Shutdown()
118 : : {
119 [ + - ]: 8 : LogDebug(BCLog::ZMQ, "Shutdown notification interface\n");
120 [ + - ]: 8 : if (pcontext)
121 : : {
122 [ + + ]: 26 : for (auto& notifier : notifiers) {
123 [ + - + - : 36 : LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
+ - ]
124 : 18 : notifier->Shutdown();
125 : : }
126 : 8 : zmq_ctx_term(pcontext);
127 : :
128 : 8 : pcontext = nullptr;
129 : : }
130 : 8 : }
131 : :
132 : : namespace {
133 : :
134 : : template <typename Function>
135 : 173 : void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
136 : : {
137 [ + + ]: 522 : for (auto i = notifiers.begin(); i != notifiers.end(); ) {
138 : 349 : CZMQAbstractNotifier* notifier = i->get();
139 [ + - ]: 349 : if (func(notifier)) {
140 : 349 : ++i;
141 : : } else {
142 : 0 : notifier->Shutdown();
143 : 0 : i = notifiers.erase(i);
144 : : }
145 : : }
146 : 173 : }
147 : :
148 : : } // anonymous namespace
149 : :
150 : 32 : void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
151 : : {
152 [ + + ]: 32 : if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
153 : : return;
154 : :
155 : 31 : TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
156 : 79 : return notifier->NotifyBlock(pindexNew);
157 : : });
158 : : }
159 : :
160 : 29 : void CZMQNotificationInterface::TransactionAddedToMempool(const NewMempoolTransactionInfo& ptx, uint64_t mempool_sequence)
161 : : {
162 : 29 : const CTransaction& tx = *(ptx.info.m_tx);
163 : :
164 : 29 : TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
165 [ + - - + ]: 38 : return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
166 : : });
167 : 29 : }
168 : :
169 : 4 : void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
170 : : {
171 : : // Called for all non-block inclusion reasons
172 : 4 : const CTransaction& tx = *ptx;
173 : :
174 : 4 : TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
175 : 4 : return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
176 : : });
177 : 4 : }
178 : :
179 : 34 : void CZMQNotificationInterface::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
180 : : {
181 [ + - ]: 34 : if (role == ChainstateRole::BACKGROUND) {
182 : : return;
183 : : }
184 [ + + ]: 94 : for (const CTransactionRef& ptx : pblock->vtx) {
185 : 60 : const CTransaction& tx = *ptx;
186 : 60 : TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
187 : 119 : return notifier->NotifyTransaction(tx);
188 : : });
189 : : }
190 : :
191 : : // Next we notify BlockConnect listeners for *all* blocks
192 : 34 : TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
193 : 84 : return notifier->NotifyBlockConnect(pindexConnected);
194 : : });
195 : : }
196 : :
197 : 6 : void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
198 : : {
199 [ + + ]: 15 : for (const CTransactionRef& ptx : pblock->vtx) {
200 : 9 : const CTransaction& tx = *ptx;
201 : 9 : TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
202 : 15 : return notifier->NotifyTransaction(tx);
203 : : });
204 : : }
205 : :
206 : : // Next we notify BlockDisconnect listeners for *all* blocks
207 : 6 : TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
208 : 10 : return notifier->NotifyBlockDisconnect(pindexDisconnected);
209 : : });
210 : 6 : }
211 : :
212 : : std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
|