LCOV - code coverage report
Current view: top level - src/zmq - zmqnotificationinterface.cpp (source / functions) Coverage Total Hit
Test: test_bitcoin_coverage.info Lines: 17.6 % 102 18
Test Date: 2026-01-07 04:44:43 Functions: 10.0 % 20 2
Branches: 13.5 % 126 17

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) 2015-present The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #include <zmq/zmqnotificationinterface.h>
       6                 :             : 
       7                 :             : #include <common/args.h>
       8                 :             : #include <kernel/mempool_entry.h>
       9                 :             : #include <kernel/types.h>
      10                 :             : #include <logging.h>
      11                 :             : #include <netbase.h>
      12                 :             : #include <primitives/block.h>
      13                 :             : #include <primitives/transaction.h>
      14                 :             : #include <validationinterface.h>
      15                 :             : #include <zmq/zmqabstractnotifier.h>
      16                 :             : #include <zmq/zmqpublishnotifier.h>
      17                 :             : #include <zmq/zmqutil.h>
      18                 :             : 
      19                 :             : #include <zmq.h>
      20                 :             : 
      21                 :             : #include <cassert>
      22                 :             : #include <map>
      23                 :             : #include <string>
      24                 :             : #include <utility>
      25                 :             : #include <vector>
      26                 :             : 
      27                 :             : using kernel::ChainstateRole;
      28                 :             : 
      29                 :           0 : CZMQNotificationInterface::CZMQNotificationInterface() = default;
      30                 :             : 
      31                 :           0 : CZMQNotificationInterface::~CZMQNotificationInterface()
      32                 :             : {
      33                 :           0 :     Shutdown();
      34                 :           0 : }
      35                 :             : 
      36                 :           0 : std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
      37                 :             : {
      38                 :           0 :     std::list<const CZMQAbstractNotifier*> result;
      39         [ #  # ]:           0 :     for (const auto& n : notifiers) {
      40         [ #  # ]:           0 :         result.push_back(n.get());
      41                 :             :     }
      42                 :           0 :     return result;
      43                 :           0 : }
      44                 :             : 
      45                 :           1 : std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<std::byte>&, const CBlockIndex&)> get_block_by_index)
      46                 :             : {
      47         [ +  - ]:           1 :     std::map<std::string, CZMQNotifierFactory> factories;
      48   [ +  -  +  - ]:           1 :     factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
      49   [ +  -  +  - ]:           1 :     factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
      50   [ +  -  +  - ]:           1 :     factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
      51                 :           0 :         return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
      52                 :           1 :     };
      53   [ +  -  +  - ]:           1 :     factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
      54   [ +  -  +  - ]:           1 :     factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
      55                 :             : 
      56                 :           1 :     std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
      57         [ +  + ]:           6 :     for (const auto& entry : factories)
      58                 :             :     {
      59         [ +  - ]:           5 :         std::string arg("-zmq" + entry.first);
      60                 :           5 :         const auto& factory = entry.second;
      61   [ +  -  -  + ]:           5 :         for (std::string& address : gArgs.GetArgs(arg)) {
      62                 :             :             // libzmq uses prefix "ipc://" for UNIX domain sockets
      63   [ #  #  #  #  :           0 :             if (address.starts_with(ADDR_PREFIX_UNIX)) {
                   #  # ]
      64         [ #  # ]:           0 :                 address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
      65                 :             :             }
      66                 :             : 
      67         [ #  # ]:           0 :             std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
      68         [ #  # ]:           0 :             notifier->SetType(entry.first);
      69         [ #  # ]:           0 :             notifier->SetAddress(address);
      70   [ #  #  #  #  :           0 :             notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
                   #  # ]
      71         [ #  # ]:           0 :             notifiers.push_back(std::move(notifier));
      72                 :           5 :         }
      73                 :           5 :     }
      74                 :             : 
      75         [ -  + ]:           1 :     if (!notifiers.empty())
      76                 :             :     {
      77   [ #  #  #  # ]:           0 :         std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
      78                 :           0 :         notificationInterface->notifiers = std::move(notifiers);
      79                 :             : 
      80   [ #  #  #  # ]:           0 :         if (notificationInterface->Initialize()) {
      81                 :             :             return notificationInterface;
      82                 :             :         }
      83                 :           0 :     }
      84                 :             : 
      85                 :           1 :     return nullptr;
      86                 :           1 : }
      87                 :             : 
      88                 :             : // Called at startup to conditionally set up ZMQ socket(s)
      89                 :           0 : bool CZMQNotificationInterface::Initialize()
      90                 :             : {
      91                 :           0 :     int major = 0, minor = 0, patch = 0;
      92                 :           0 :     zmq_version(&major, &minor, &patch);
      93         [ #  # ]:           0 :     LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
      94                 :             : 
      95         [ #  # ]:           0 :     LogDebug(BCLog::ZMQ, "Initialize notification interface\n");
      96         [ #  # ]:           0 :     assert(!pcontext);
      97                 :             : 
      98                 :           0 :     pcontext = zmq_ctx_new();
      99                 :             : 
     100         [ #  # ]:           0 :     if (!pcontext)
     101                 :             :     {
     102         [ #  # ]:           0 :         zmqError("Unable to initialize context");
     103                 :           0 :         return false;
     104                 :             :     }
     105                 :             : 
     106         [ #  # ]:           0 :     for (auto& notifier : notifiers) {
     107         [ #  # ]:           0 :         if (notifier->Initialize(pcontext)) {
     108   [ #  #  #  #  :           0 :             LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
             #  #  #  # ]
     109                 :             :         } else {
     110   [ #  #  #  #  :           0 :             LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
             #  #  #  # ]
     111                 :           0 :             return false;
     112                 :             :         }
     113                 :             :     }
     114                 :             : 
     115                 :             :     return true;
     116                 :             : }
     117                 :             : 
     118                 :             : // Called during shutdown sequence
     119                 :           0 : void CZMQNotificationInterface::Shutdown()
     120                 :             : {
     121         [ #  # ]:           0 :     LogDebug(BCLog::ZMQ, "Shutdown notification interface\n");
     122         [ #  # ]:           0 :     if (pcontext)
     123                 :             :     {
     124         [ #  # ]:           0 :         for (auto& notifier : notifiers) {
     125   [ #  #  #  #  :           0 :             LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
             #  #  #  # ]
     126                 :           0 :             notifier->Shutdown();
     127                 :             :         }
     128                 :           0 :         zmq_ctx_term(pcontext);
     129                 :             : 
     130                 :           0 :         pcontext = nullptr;
     131                 :             :     }
     132                 :           0 : }
     133                 :             : 
     134                 :             : namespace {
     135                 :             : 
     136                 :             : template <typename Function>
     137                 :           0 : void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
     138                 :             : {
     139         [ #  # ]:           0 :     for (auto i = notifiers.begin(); i != notifiers.end(); ) {
     140                 :           0 :         CZMQAbstractNotifier* notifier = i->get();
     141         [ #  # ]:           0 :         if (func(notifier)) {
     142                 :           0 :             ++i;
     143                 :             :         } else {
     144                 :           0 :             notifier->Shutdown();
     145                 :           0 :             i = notifiers.erase(i);
     146                 :             :         }
     147                 :             :     }
     148                 :           0 : }
     149                 :             : 
     150                 :             : } // anonymous namespace
     151                 :             : 
     152                 :           0 : void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
     153                 :             : {
     154         [ #  # ]:           0 :     if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
     155                 :             :         return;
     156                 :             : 
     157                 :           0 :     TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
     158                 :           0 :         return notifier->NotifyBlock(pindexNew);
     159                 :             :     });
     160                 :             : }
     161                 :             : 
     162                 :           0 : void CZMQNotificationInterface::TransactionAddedToMempool(const NewMempoolTransactionInfo& ptx, uint64_t mempool_sequence)
     163                 :             : {
     164                 :           0 :     const CTransaction& tx = *(ptx.info.m_tx);
     165                 :             : 
     166                 :           0 :     TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
     167   [ #  #  #  # ]:           0 :         return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
     168                 :             :     });
     169                 :           0 : }
     170                 :             : 
     171                 :           0 : void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
     172                 :             : {
     173                 :             :     // Called for all non-block inclusion reasons
     174                 :           0 :     const CTransaction& tx = *ptx;
     175                 :             : 
     176                 :           0 :     TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
     177                 :           0 :         return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
     178                 :             :     });
     179                 :           0 : }
     180                 :             : 
     181                 :           0 : void CZMQNotificationInterface::BlockConnected(const ChainstateRole& role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
     182                 :             : {
     183         [ #  # ]:           0 :     if (role.historical) {
     184                 :             :         return;
     185                 :             :     }
     186         [ #  # ]:           0 :     for (const CTransactionRef& ptx : pblock->vtx) {
     187                 :           0 :         const CTransaction& tx = *ptx;
     188                 :           0 :         TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
     189                 :           0 :             return notifier->NotifyTransaction(tx);
     190                 :             :         });
     191                 :             :     }
     192                 :             : 
     193                 :             :     // Next we notify BlockConnect listeners for *all* blocks
     194                 :           0 :     TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
     195                 :           0 :         return notifier->NotifyBlockConnect(pindexConnected);
     196                 :             :     });
     197                 :             : }
     198                 :             : 
     199                 :           0 : void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
     200                 :             : {
     201         [ #  # ]:           0 :     for (const CTransactionRef& ptx : pblock->vtx) {
     202                 :           0 :         const CTransaction& tx = *ptx;
     203                 :           0 :         TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
     204                 :           0 :             return notifier->NotifyTransaction(tx);
     205                 :             :         });
     206                 :             :     }
     207                 :             : 
     208                 :             :     // Next we notify BlockDisconnect listeners for *all* blocks
     209                 :           0 :     TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
     210                 :           0 :         return notifier->NotifyBlockDisconnect(pindexDisconnected);
     211                 :             :     });
     212                 :           0 : }
     213                 :             : 
     214                 :             : std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
        

Generated by: LCOV version 2.0-1