LCOV - code coverage report
Current view: top level - src/zmq - zmqnotificationinterface.cpp (source / functions) Coverage Total Hit
Test: total_coverage.info Lines: 95.1 % 102 97
Test Date: 2026-02-04 05:05:50 Functions: 100.0 % 20 20
Branches: 58.7 % 126 74

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) 2015-present The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #include <zmq/zmqnotificationinterface.h>
       6                 :             : 
       7                 :             : #include <common/args.h>
       8                 :             : #include <kernel/mempool_entry.h>
       9                 :             : #include <kernel/types.h>
      10                 :             : #include <logging.h>
      11                 :             : #include <netbase.h>
      12                 :             : #include <primitives/block.h>
      13                 :             : #include <primitives/transaction.h>
      14                 :             : #include <util/check.h>
      15                 :             : #include <zmq/zmqabstractnotifier.h>
      16                 :             : #include <zmq/zmqpublishnotifier.h>
      17                 :             : #include <zmq/zmqutil.h>
      18                 :             : 
      19                 :             : #include <zmq.h>
      20                 :             : 
      21                 :             : #include <map>
      22                 :             : #include <string>
      23                 :             : #include <utility>
      24                 :             : #include <vector>
      25                 :             : 
      26                 :             : using kernel::ChainstateRole;
      27                 :             : 
      28                 :           8 : CZMQNotificationInterface::CZMQNotificationInterface() = default;
      29                 :             : 
      30                 :           8 : CZMQNotificationInterface::~CZMQNotificationInterface()
      31                 :             : {
      32                 :           8 :     Shutdown();
      33                 :           8 : }
      34                 :             : 
      35                 :           2 : std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
      36                 :             : {
      37                 :           2 :     std::list<const CZMQAbstractNotifier*> result;
      38         [ +  + ]:          10 :     for (const auto& n : notifiers) {
      39         [ +  - ]:           8 :         result.push_back(n.get());
      40                 :             :     }
      41                 :           2 :     return result;
      42                 :           0 : }
      43                 :             : 
      44                 :        1065 : std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<std::byte>&, const CBlockIndex&)> get_block_by_index)
      45                 :             : {
      46         [ +  - ]:        1065 :     std::map<std::string, CZMQNotifierFactory> factories;
      47   [ +  -  +  - ]:        1065 :     factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
      48   [ +  -  +  - ]:        1065 :     factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
      49   [ +  -  +  - ]:        1065 :     factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
      50                 :           2 :         return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
      51                 :        1065 :     };
      52   [ +  -  +  - ]:        1065 :     factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
      53   [ +  -  +  - ]:        1065 :     factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
      54                 :             : 
      55                 :        1065 :     std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
      56         [ +  + ]:        6390 :     for (const auto& entry : factories)
      57                 :             :     {
      58         [ +  - ]:        5325 :         std::string arg("-zmq" + entry.first);
      59                 :        5325 :         const auto& factory = entry.second;
      60   [ +  -  +  + ]:        5343 :         for (std::string& address : gArgs.GetArgs(arg)) {
      61                 :             :             // libzmq uses prefix "ipc://" for UNIX domain sockets
      62   [ -  +  -  +  :          18 :             if (address.starts_with(ADDR_PREFIX_UNIX)) {
                   +  + ]
      63         [ -  + ]:           4 :                 address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
      64                 :             :             }
      65                 :             : 
      66         [ +  - ]:          18 :             std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
      67         [ +  - ]:          18 :             notifier->SetType(entry.first);
      68         [ +  - ]:          18 :             notifier->SetAddress(address);
      69   [ +  -  +  -  :          36 :             notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
                   +  - ]
      70         [ +  - ]:          18 :             notifiers.push_back(std::move(notifier));
      71                 :        5343 :         }
      72                 :        5325 :     }
      73                 :             : 
      74         [ +  + ]:        1065 :     if (!notifiers.empty())
      75                 :             :     {
      76   [ +  -  +  - ]:           8 :         std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
      77                 :           8 :         notificationInterface->notifiers = std::move(notifiers);
      78                 :             : 
      79   [ +  -  +  + ]:           8 :         if (notificationInterface->Initialize()) {
      80                 :             :             return notificationInterface;
      81                 :             :         }
      82                 :           2 :     }
      83                 :             : 
      84                 :        1059 :     return nullptr;
      85                 :        1065 : }
      86                 :             : 
      87                 :             : // Called at startup to conditionally set up ZMQ socket(s)
      88                 :           8 : bool CZMQNotificationInterface::Initialize()
      89                 :             : {
      90                 :           8 :     int major = 0, minor = 0, patch = 0;
      91                 :           8 :     zmq_version(&major, &minor, &patch);
      92         [ +  - ]:           8 :     LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
      93                 :             : 
      94         [ +  - ]:           8 :     LogDebug(BCLog::ZMQ, "Initialize notification interface\n");
      95         [ -  + ]:           8 :     assert(!pcontext);
      96                 :             : 
      97                 :           8 :     pcontext = zmq_ctx_new();
      98                 :             : 
      99         [ -  + ]:           8 :     if (!pcontext)
     100                 :             :     {
     101         [ #  # ]:           0 :         zmqError("Unable to initialize context");
     102                 :           0 :         return false;
     103                 :             :     }
     104                 :             : 
     105         [ +  + ]:          22 :     for (auto& notifier : notifiers) {
     106         [ +  + ]:          16 :         if (notifier->Initialize(pcontext)) {
     107   [ +  -  -  +  :          56 :             LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
             -  +  +  - ]
     108                 :             :         } else {
     109   [ +  -  -  +  :           8 :             LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
             -  +  +  - ]
     110                 :           2 :             return false;
     111                 :             :         }
     112                 :             :     }
     113                 :             : 
     114                 :             :     return true;
     115                 :             : }
     116                 :             : 
     117                 :             : // Called during shutdown sequence
     118                 :           8 : void CZMQNotificationInterface::Shutdown()
     119                 :             : {
     120         [ +  - ]:           8 :     LogDebug(BCLog::ZMQ, "Shutdown notification interface\n");
     121         [ +  - ]:           8 :     if (pcontext)
     122                 :             :     {
     123         [ +  + ]:          26 :         for (auto& notifier : notifiers) {
     124   [ +  -  -  +  :          72 :             LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
             -  +  +  - ]
     125                 :          18 :             notifier->Shutdown();
     126                 :             :         }
     127                 :           8 :         zmq_ctx_term(pcontext);
     128                 :             : 
     129                 :           8 :         pcontext = nullptr;
     130                 :             :     }
     131                 :           8 : }
     132                 :             : 
     133                 :             : namespace {
     134                 :             : 
     135                 :             : template <typename Function>
     136                 :         169 : void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
     137                 :             : {
     138         [ +  + ]:         510 :     for (auto i = notifiers.begin(); i != notifiers.end(); ) {
     139                 :         341 :         CZMQAbstractNotifier* notifier = i->get();
     140         [ +  - ]:         341 :         if (func(notifier)) {
     141                 :         341 :             ++i;
     142                 :             :         } else {
     143                 :           0 :             notifier->Shutdown();
     144                 :           0 :             i = notifiers.erase(i);
     145                 :             :         }
     146                 :             :     }
     147                 :         169 : }
     148                 :             : 
     149                 :             : } // anonymous namespace
     150                 :             : 
     151                 :          32 : void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
     152                 :             : {
     153         [ +  - ]:          32 :     if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
     154                 :             :         return;
     155                 :             : 
     156                 :          32 :     TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
     157                 :          81 :         return notifier->NotifyBlock(pindexNew);
     158                 :             :     });
     159                 :             : }
     160                 :             : 
     161                 :          28 : void CZMQNotificationInterface::TransactionAddedToMempool(const NewMempoolTransactionInfo& ptx, uint64_t mempool_sequence)
     162                 :             : {
     163                 :          28 :     const CTransaction& tx = *(ptx.info.m_tx);
     164                 :             : 
     165                 :          28 :     TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
     166   [ +  -  -  + ]:          36 :         return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
     167                 :             :     });
     168                 :          28 : }
     169                 :             : 
     170                 :           4 : void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
     171                 :             : {
     172                 :             :     // Called for all non-block inclusion reasons
     173                 :           4 :     const CTransaction& tx = *ptx;
     174                 :             : 
     175                 :           4 :     TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
     176                 :           4 :         return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
     177                 :             :     });
     178                 :           4 : }
     179                 :             : 
     180                 :          34 : void CZMQNotificationInterface::BlockConnected(const ChainstateRole& role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
     181                 :             : {
     182         [ +  - ]:          34 :     if (role.historical) {
     183                 :             :         return;
     184                 :             :     }
     185         [ +  + ]:          93 :     for (const CTransactionRef& ptx : pblock->vtx) {
     186                 :          59 :         const CTransaction& tx = *ptx;
     187                 :          59 :         TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
     188                 :         117 :             return notifier->NotifyTransaction(tx);
     189                 :             :         });
     190                 :             :     }
     191                 :             : 
     192                 :             :     // Next we notify BlockConnect listeners for *all* blocks
     193                 :          34 :     TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
     194                 :          84 :         return notifier->NotifyBlockConnect(pindexConnected);
     195                 :             :     });
     196                 :             : }
     197                 :             : 
     198                 :           5 : void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
     199                 :             : {
     200         [ +  + ]:          12 :     for (const CTransactionRef& ptx : pblock->vtx) {
     201                 :           7 :         const CTransaction& tx = *ptx;
     202                 :           7 :         TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
     203                 :          11 :             return notifier->NotifyTransaction(tx);
     204                 :             :         });
     205                 :             :     }
     206                 :             : 
     207                 :             :     // Next we notify BlockDisconnect listeners for *all* blocks
     208                 :           5 :     TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
     209                 :           8 :         return notifier->NotifyBlockDisconnect(pindexDisconnected);
     210                 :             :     });
     211                 :           5 : }
     212                 :             : 
     213                 :             : std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
        

Generated by: LCOV version 2.0-1