LCOV - code coverage report
Current view: top level - src/zmq - zmqnotificationinterface.cpp (source / functions) Coverage Total Hit
Test: test_bitcoin_coverage.info Lines: 17.6 % 102 18
Test Date: 2026-02-22 04:55:57 Functions: 10.0 % 20 2
Branches: 13.7 % 124 17

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) 2015-present The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #include <zmq/zmqnotificationinterface.h>
       6                 :             : 
       7                 :             : #include <common/args.h>
       8                 :             : #include <kernel/mempool_entry.h>
       9                 :             : #include <kernel/types.h>
      10                 :             : #include <logging.h>
      11                 :             : #include <netbase.h>
      12                 :             : #include <primitives/block.h>
      13                 :             : #include <primitives/transaction.h>
      14                 :             : #include <util/check.h>
      15                 :             : #include <zmq/zmqabstractnotifier.h>
      16                 :             : #include <zmq/zmqpublishnotifier.h>
      17                 :             : #include <zmq/zmqutil.h>
      18                 :             : 
      19                 :             : #include <zmq.h>
      20                 :             : 
      21                 :             : #include <map>
      22                 :             : #include <string>
      23                 :             : #include <utility>
      24                 :             : #include <vector>
      25                 :             : 
      26                 :             : using kernel::ChainstateRole;
      27                 :             : 
      28                 :           0 : CZMQNotificationInterface::CZMQNotificationInterface() = default;
      29                 :             : 
      30                 :           0 : CZMQNotificationInterface::~CZMQNotificationInterface()
      31                 :             : {
      32                 :           0 :     Shutdown();
      33                 :           0 : }
      34                 :             : 
      35                 :           0 : std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
      36                 :             : {
      37                 :           0 :     std::list<const CZMQAbstractNotifier*> result;
      38         [ #  # ]:           0 :     for (const auto& n : notifiers) {
      39         [ #  # ]:           0 :         result.push_back(n.get());
      40                 :             :     }
      41                 :           0 :     return result;
      42                 :           0 : }
      43                 :             : 
      44                 :           1 : std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<std::byte>&, const CBlockIndex&)> get_block_by_index)
      45                 :             : {
      46         [ +  - ]:           1 :     std::map<std::string, CZMQNotifierFactory> factories;
      47   [ +  -  +  - ]:           1 :     factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
      48   [ +  -  +  - ]:           1 :     factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
      49   [ +  -  +  - ]:           1 :     factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
      50                 :           0 :         return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
      51                 :           1 :     };
      52   [ +  -  +  - ]:           1 :     factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
      53   [ +  -  +  - ]:           1 :     factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
      54                 :             : 
      55                 :           1 :     std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
      56         [ +  + ]:           6 :     for (const auto& entry : factories)
      57                 :             :     {
      58         [ +  - ]:           5 :         std::string arg("-zmq" + entry.first);
      59                 :           5 :         const auto& factory = entry.second;
      60   [ +  -  -  + ]:           5 :         for (std::string& address : gArgs.GetArgs(arg)) {
      61                 :             :             // libzmq uses prefix "ipc://" for UNIX domain sockets
      62   [ #  #  #  #  :           0 :             if (address.starts_with(ADDR_PREFIX_UNIX)) {
                   #  # ]
      63         [ #  # ]:           0 :                 address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
      64                 :             :             }
      65                 :             : 
      66         [ #  # ]:           0 :             std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
      67         [ #  # ]:           0 :             notifier->SetType(entry.first);
      68         [ #  # ]:           0 :             notifier->SetAddress(address);
      69   [ #  #  #  # ]:           0 :             notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
      70         [ #  # ]:           0 :             notifiers.push_back(std::move(notifier));
      71                 :           5 :         }
      72                 :           5 :     }
      73                 :             : 
      74         [ -  + ]:           1 :     if (!notifiers.empty())
      75                 :             :     {
      76   [ #  #  #  # ]:           0 :         std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
      77                 :           0 :         notificationInterface->notifiers = std::move(notifiers);
      78                 :             : 
      79   [ #  #  #  # ]:           0 :         if (notificationInterface->Initialize()) {
      80                 :             :             return notificationInterface;
      81                 :             :         }
      82                 :           0 :     }
      83                 :             : 
      84                 :           1 :     return nullptr;
      85                 :           1 : }
      86                 :             : 
      87                 :             : // Called at startup to conditionally set up ZMQ socket(s)
      88                 :           0 : bool CZMQNotificationInterface::Initialize()
      89                 :             : {
      90                 :           0 :     int major = 0, minor = 0, patch = 0;
      91                 :           0 :     zmq_version(&major, &minor, &patch);
      92         [ #  # ]:           0 :     LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
      93                 :             : 
      94         [ #  # ]:           0 :     LogDebug(BCLog::ZMQ, "Initialize notification interface\n");
      95         [ #  # ]:           0 :     assert(!pcontext);
      96                 :             : 
      97                 :           0 :     pcontext = zmq_ctx_new();
      98                 :             : 
      99         [ #  # ]:           0 :     if (!pcontext)
     100                 :             :     {
     101         [ #  # ]:           0 :         zmqError("Unable to initialize context");
     102                 :           0 :         return false;
     103                 :             :     }
     104                 :             : 
     105         [ #  # ]:           0 :     for (auto& notifier : notifiers) {
     106         [ #  # ]:           0 :         if (notifier->Initialize(pcontext)) {
     107   [ #  #  #  #  :           0 :             LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
             #  #  #  # ]
     108                 :             :         } else {
     109   [ #  #  #  #  :           0 :             LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
             #  #  #  # ]
     110                 :           0 :             return false;
     111                 :             :         }
     112                 :             :     }
     113                 :             : 
     114                 :             :     return true;
     115                 :             : }
     116                 :             : 
     117                 :             : // Called during shutdown sequence
     118                 :           0 : void CZMQNotificationInterface::Shutdown()
     119                 :             : {
     120         [ #  # ]:           0 :     LogDebug(BCLog::ZMQ, "Shutdown notification interface\n");
     121         [ #  # ]:           0 :     if (pcontext)
     122                 :             :     {
     123         [ #  # ]:           0 :         for (auto& notifier : notifiers) {
     124   [ #  #  #  #  :           0 :             LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
             #  #  #  # ]
     125                 :           0 :             notifier->Shutdown();
     126                 :             :         }
     127                 :           0 :         zmq_ctx_term(pcontext);
     128                 :             : 
     129                 :           0 :         pcontext = nullptr;
     130                 :             :     }
     131                 :           0 : }
     132                 :             : 
     133                 :             : namespace {
     134                 :             : 
     135                 :             : template <typename Function>
     136                 :           0 : void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
     137                 :             : {
     138         [ #  # ]:           0 :     for (auto i = notifiers.begin(); i != notifiers.end(); ) {
     139                 :           0 :         CZMQAbstractNotifier* notifier = i->get();
     140         [ #  # ]:           0 :         if (func(notifier)) {
     141                 :           0 :             ++i;
     142                 :             :         } else {
     143                 :           0 :             notifier->Shutdown();
     144                 :           0 :             i = notifiers.erase(i);
     145                 :             :         }
     146                 :             :     }
     147                 :           0 : }
     148                 :             : 
     149                 :             : } // anonymous namespace
     150                 :             : 
     151                 :           0 : void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
     152                 :             : {
     153         [ #  # ]:           0 :     if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
     154                 :             :         return;
     155                 :             : 
     156                 :           0 :     TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
     157                 :           0 :         return notifier->NotifyBlock(pindexNew);
     158                 :             :     });
     159                 :             : }
     160                 :             : 
     161                 :           0 : void CZMQNotificationInterface::TransactionAddedToMempool(const NewMempoolTransactionInfo& ptx, uint64_t mempool_sequence)
     162                 :             : {
     163                 :           0 :     const CTransaction& tx = *(ptx.info.m_tx);
     164                 :             : 
     165                 :           0 :     TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
     166   [ #  #  #  # ]:           0 :         return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
     167                 :             :     });
     168                 :           0 : }
     169                 :             : 
     170                 :           0 : void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
     171                 :             : {
     172                 :             :     // Called for all non-block inclusion reasons
     173                 :           0 :     const CTransaction& tx = *ptx;
     174                 :             : 
     175                 :           0 :     TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
     176                 :           0 :         return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
     177                 :             :     });
     178                 :           0 : }
     179                 :             : 
     180                 :           0 : void CZMQNotificationInterface::BlockConnected(const ChainstateRole& role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
     181                 :             : {
     182         [ #  # ]:           0 :     if (role.historical) {
     183                 :             :         return;
     184                 :             :     }
     185         [ #  # ]:           0 :     for (const CTransactionRef& ptx : pblock->vtx) {
     186                 :           0 :         const CTransaction& tx = *ptx;
     187                 :           0 :         TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
     188                 :           0 :             return notifier->NotifyTransaction(tx);
     189                 :             :         });
     190                 :             :     }
     191                 :             : 
     192                 :             :     // Next we notify BlockConnect listeners for *all* blocks
     193                 :           0 :     TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
     194                 :           0 :         return notifier->NotifyBlockConnect(pindexConnected);
     195                 :             :     });
     196                 :             : }
     197                 :             : 
     198                 :           0 : void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
     199                 :             : {
     200         [ #  # ]:           0 :     for (const CTransactionRef& ptx : pblock->vtx) {
     201                 :           0 :         const CTransaction& tx = *ptx;
     202                 :           0 :         TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
     203                 :           0 :             return notifier->NotifyTransaction(tx);
     204                 :             :         });
     205                 :             :     }
     206                 :             : 
     207                 :             :     // Next we notify BlockDisconnect listeners for *all* blocks
     208                 :           0 :     TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
     209                 :           0 :         return notifier->NotifyBlockDisconnect(pindexDisconnected);
     210                 :             :     });
     211                 :           0 : }
     212                 :             : 
     213                 :             : std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
        

Generated by: LCOV version 2.0-1