Branch data Line data Source code
1 : : // Copyright (c) 2015-present The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #include <zmq/zmqnotificationinterface.h>
6 : :
7 : : #include <common/args.h>
8 : : #include <kernel/mempool_entry.h>
9 : : #include <kernel/types.h>
10 : : #include <logging.h>
11 : : #include <netbase.h>
12 : : #include <primitives/block.h>
13 : : #include <primitives/transaction.h>
14 : : #include <validationinterface.h>
15 : : #include <zmq/zmqabstractnotifier.h>
16 : : #include <zmq/zmqpublishnotifier.h>
17 : : #include <zmq/zmqutil.h>
18 : :
19 : : #include <zmq.h>
20 : :
21 : : #include <cassert>
22 : : #include <map>
23 : : #include <string>
24 : : #include <utility>
25 : : #include <vector>
26 : :
27 : : using kernel::ChainstateRole;
28 : :
29 : 0 : CZMQNotificationInterface::CZMQNotificationInterface() = default;
30 : :
31 : 0 : CZMQNotificationInterface::~CZMQNotificationInterface()
32 : : {
33 : 0 : Shutdown();
34 : 0 : }
35 : :
36 : 0 : std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
37 : : {
38 : 0 : std::list<const CZMQAbstractNotifier*> result;
39 [ # # ]: 0 : for (const auto& n : notifiers) {
40 [ # # ]: 0 : result.push_back(n.get());
41 : : }
42 : 0 : return result;
43 : 0 : }
44 : :
45 : 1 : std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<std::byte>&, const CBlockIndex&)> get_block_by_index)
46 : : {
47 [ + - ]: 1 : std::map<std::string, CZMQNotifierFactory> factories;
48 [ + - + - ]: 1 : factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
49 [ + - + - ]: 1 : factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
50 [ + - + - ]: 1 : factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
51 : 0 : return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
52 : 1 : };
53 [ + - + - ]: 1 : factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
54 [ + - + - ]: 1 : factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
55 : :
56 : 1 : std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
57 [ + + ]: 6 : for (const auto& entry : factories)
58 : : {
59 [ + - ]: 5 : std::string arg("-zmq" + entry.first);
60 : 5 : const auto& factory = entry.second;
61 [ + - - + ]: 5 : for (std::string& address : gArgs.GetArgs(arg)) {
62 : : // libzmq uses prefix "ipc://" for UNIX domain sockets
63 [ # # # # : 0 : if (address.starts_with(ADDR_PREFIX_UNIX)) {
# # ]
64 [ # # ]: 0 : address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
65 : : }
66 : :
67 [ # # ]: 0 : std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
68 [ # # ]: 0 : notifier->SetType(entry.first);
69 [ # # ]: 0 : notifier->SetAddress(address);
70 [ # # # # : 0 : notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
# # ]
71 [ # # ]: 0 : notifiers.push_back(std::move(notifier));
72 : 5 : }
73 : 5 : }
74 : :
75 [ - + ]: 1 : if (!notifiers.empty())
76 : : {
77 [ # # # # ]: 0 : std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
78 : 0 : notificationInterface->notifiers = std::move(notifiers);
79 : :
80 [ # # # # ]: 0 : if (notificationInterface->Initialize()) {
81 : : return notificationInterface;
82 : : }
83 : 0 : }
84 : :
85 : 1 : return nullptr;
86 : 1 : }
87 : :
88 : : // Called at startup to conditionally set up ZMQ socket(s)
89 : 0 : bool CZMQNotificationInterface::Initialize()
90 : : {
91 : 0 : int major = 0, minor = 0, patch = 0;
92 : 0 : zmq_version(&major, &minor, &patch);
93 [ # # ]: 0 : LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
94 : :
95 [ # # ]: 0 : LogDebug(BCLog::ZMQ, "Initialize notification interface\n");
96 [ # # ]: 0 : assert(!pcontext);
97 : :
98 : 0 : pcontext = zmq_ctx_new();
99 : :
100 [ # # ]: 0 : if (!pcontext)
101 : : {
102 [ # # ]: 0 : zmqError("Unable to initialize context");
103 : 0 : return false;
104 : : }
105 : :
106 [ # # ]: 0 : for (auto& notifier : notifiers) {
107 [ # # ]: 0 : if (notifier->Initialize(pcontext)) {
108 [ # # # # : 0 : LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
# # # # ]
109 : : } else {
110 [ # # # # : 0 : LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
# # # # ]
111 : 0 : return false;
112 : : }
113 : : }
114 : :
115 : : return true;
116 : : }
117 : :
118 : : // Called during shutdown sequence
119 : 0 : void CZMQNotificationInterface::Shutdown()
120 : : {
121 [ # # ]: 0 : LogDebug(BCLog::ZMQ, "Shutdown notification interface\n");
122 [ # # ]: 0 : if (pcontext)
123 : : {
124 [ # # ]: 0 : for (auto& notifier : notifiers) {
125 [ # # # # : 0 : LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
# # # # ]
126 : 0 : notifier->Shutdown();
127 : : }
128 : 0 : zmq_ctx_term(pcontext);
129 : :
130 : 0 : pcontext = nullptr;
131 : : }
132 : 0 : }
133 : :
134 : : namespace {
135 : :
136 : : template <typename Function>
137 : 0 : void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
138 : : {
139 [ # # ]: 0 : for (auto i = notifiers.begin(); i != notifiers.end(); ) {
140 : 0 : CZMQAbstractNotifier* notifier = i->get();
141 [ # # ]: 0 : if (func(notifier)) {
142 : 0 : ++i;
143 : : } else {
144 : 0 : notifier->Shutdown();
145 : 0 : i = notifiers.erase(i);
146 : : }
147 : : }
148 : 0 : }
149 : :
150 : : } // anonymous namespace
151 : :
152 : 0 : void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
153 : : {
154 [ # # ]: 0 : if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
155 : : return;
156 : :
157 : 0 : TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
158 : 0 : return notifier->NotifyBlock(pindexNew);
159 : : });
160 : : }
161 : :
162 : 0 : void CZMQNotificationInterface::TransactionAddedToMempool(const NewMempoolTransactionInfo& ptx, uint64_t mempool_sequence)
163 : : {
164 : 0 : const CTransaction& tx = *(ptx.info.m_tx);
165 : :
166 : 0 : TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
167 [ # # # # ]: 0 : return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
168 : : });
169 : 0 : }
170 : :
171 : 0 : void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
172 : : {
173 : : // Called for all non-block inclusion reasons
174 : 0 : const CTransaction& tx = *ptx;
175 : :
176 : 0 : TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
177 : 0 : return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
178 : : });
179 : 0 : }
180 : :
181 : 0 : void CZMQNotificationInterface::BlockConnected(const ChainstateRole& role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
182 : : {
183 [ # # ]: 0 : if (role.historical) {
184 : : return;
185 : : }
186 [ # # ]: 0 : for (const CTransactionRef& ptx : pblock->vtx) {
187 : 0 : const CTransaction& tx = *ptx;
188 : 0 : TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
189 : 0 : return notifier->NotifyTransaction(tx);
190 : : });
191 : : }
192 : :
193 : : // Next we notify BlockConnect listeners for *all* blocks
194 : 0 : TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
195 : 0 : return notifier->NotifyBlockConnect(pindexConnected);
196 : : });
197 : : }
198 : :
199 : 0 : void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
200 : : {
201 [ # # ]: 0 : for (const CTransactionRef& ptx : pblock->vtx) {
202 : 0 : const CTransaction& tx = *ptx;
203 : 0 : TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
204 : 0 : return notifier->NotifyTransaction(tx);
205 : : });
206 : : }
207 : :
208 : : // Next we notify BlockDisconnect listeners for *all* blocks
209 : 0 : TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
210 : 0 : return notifier->NotifyBlockDisconnect(pindexDisconnected);
211 : : });
212 : 0 : }
213 : :
214 : : std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
|