LCOV - code coverage report
Current view: top level - src/zmq - zmqnotificationinterface.cpp (source / functions) Coverage Total Hit
Test: total_coverage.info Lines: 95.1 % 102 97
Test Date: 2024-11-04 05:10:19 Functions: 100.0 % 21 21
Branches: 60.2 % 118 71

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) 2015-2022 The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #include <zmq/zmqnotificationinterface.h>
       6                 :             : 
       7                 :             : #include <common/args.h>
       8                 :             : #include <kernel/chain.h>
       9                 :             : #include <kernel/mempool_entry.h>
      10                 :             : #include <logging.h>
      11                 :             : #include <netbase.h>
      12                 :             : #include <primitives/block.h>
      13                 :             : #include <primitives/transaction.h>
      14                 :             : #include <validationinterface.h>
      15                 :             : #include <zmq/zmqabstractnotifier.h>
      16                 :             : #include <zmq/zmqpublishnotifier.h>
      17                 :             : #include <zmq/zmqutil.h>
      18                 :             : 
      19                 :             : #include <zmq.h>
      20                 :             : 
      21                 :             : #include <cassert>
      22                 :             : #include <map>
      23                 :             : #include <string>
      24                 :             : #include <utility>
      25                 :             : #include <vector>
      26                 :             : 
      27                 :           8 : CZMQNotificationInterface::CZMQNotificationInterface() = default;
      28                 :             : 
      29                 :          16 : CZMQNotificationInterface::~CZMQNotificationInterface()
      30                 :             : {
      31                 :           8 :     Shutdown();
      32                 :          16 : }
      33                 :             : 
      34                 :           2 : std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
      35                 :             : {
      36                 :           2 :     std::list<const CZMQAbstractNotifier*> result;
      37         [ +  + ]:          10 :     for (const auto& n : notifiers) {
      38         [ +  - ]:           8 :         result.push_back(n.get());
      39                 :             :     }
      40                 :           2 :     return result;
      41                 :           0 : }
      42                 :             : 
      43                 :         913 : std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<uint8_t>&, const CBlockIndex&)> get_block_by_index)
      44                 :             : {
      45         [ +  - ]:         913 :     std::map<std::string, CZMQNotifierFactory> factories;
      46   [ +  -  +  - ]:         913 :     factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
      47   [ +  -  +  - ]:         913 :     factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
      48   [ +  -  +  - ]:         913 :     factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
      49                 :           2 :         return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
      50                 :         913 :     };
      51   [ +  -  +  - ]:         913 :     factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
      52   [ +  -  +  - ]:         913 :     factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
      53                 :             : 
      54                 :         913 :     std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
      55         [ +  + ]:        5478 :     for (const auto& entry : factories)
      56                 :             :     {
      57         [ +  - ]:        4565 :         std::string arg("-zmq" + entry.first);
      58                 :        4565 :         const auto& factory = entry.second;
      59   [ +  -  +  + ]:        4583 :         for (std::string& address : gArgs.GetArgs(arg)) {
      60                 :             :             // libzmq uses prefix "ipc://" for UNIX domain sockets
      61   [ +  -  +  + ]:          18 :             if (address.substr(0, ADDR_PREFIX_UNIX.length()) == ADDR_PREFIX_UNIX) {
      62         [ +  - ]:           4 :                 address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
      63                 :             :             }
      64                 :             : 
      65         [ +  - ]:          18 :             std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
      66         [ +  - ]:          18 :             notifier->SetType(entry.first);
      67         [ +  - ]:          18 :             notifier->SetAddress(address);
      68   [ +  -  +  -  :          36 :             notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
                   +  - ]
      69         [ +  - ]:          18 :             notifiers.push_back(std::move(notifier));
      70                 :        4583 :         }
      71                 :        4565 :     }
      72                 :             : 
      73         [ +  + ]:         913 :     if (!notifiers.empty())
      74                 :             :     {
      75   [ +  -  +  - ]:           8 :         std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
      76                 :           8 :         notificationInterface->notifiers = std::move(notifiers);
      77                 :             : 
      78   [ +  -  +  + ]:           8 :         if (notificationInterface->Initialize()) {
      79                 :             :             return notificationInterface;
      80                 :             :         }
      81                 :           2 :     }
      82                 :             : 
      83                 :         907 :     return nullptr;
      84                 :         913 : }
      85                 :             : 
      86                 :             : // Called at startup to conditionally set up ZMQ socket(s)
      87                 :           8 : bool CZMQNotificationInterface::Initialize()
      88                 :             : {
      89                 :           8 :     int major = 0, minor = 0, patch = 0;
      90                 :           8 :     zmq_version(&major, &minor, &patch);
      91         [ +  - ]:           8 :     LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
      92                 :             : 
      93         [ +  - ]:           8 :     LogDebug(BCLog::ZMQ, "Initialize notification interface\n");
      94         [ -  + ]:           8 :     assert(!pcontext);
      95                 :             : 
      96                 :           8 :     pcontext = zmq_ctx_new();
      97                 :             : 
      98         [ -  + ]:           8 :     if (!pcontext)
      99                 :             :     {
     100         [ #  # ]:           0 :         zmqError("Unable to initialize context");
     101                 :           0 :         return false;
     102                 :             :     }
     103                 :             : 
     104         [ +  + ]:          22 :     for (auto& notifier : notifiers) {
     105         [ +  + ]:          16 :         if (notifier->Initialize(pcontext)) {
     106   [ +  -  +  -  :          28 :             LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
                   +  - ]
     107                 :             :         } else {
     108   [ +  -  +  -  :           4 :             LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
                   +  - ]
     109                 :           2 :             return false;
     110                 :             :         }
     111                 :             :     }
     112                 :             : 
     113                 :             :     return true;
     114                 :             : }
     115                 :             : 
     116                 :             : // Called during shutdown sequence
     117                 :           8 : void CZMQNotificationInterface::Shutdown()
     118                 :             : {
     119         [ +  - ]:           8 :     LogDebug(BCLog::ZMQ, "Shutdown notification interface\n");
     120         [ +  - ]:           8 :     if (pcontext)
     121                 :             :     {
     122         [ +  + ]:          26 :         for (auto& notifier : notifiers) {
     123   [ +  -  +  -  :          36 :             LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
                   +  - ]
     124                 :          18 :             notifier->Shutdown();
     125                 :             :         }
     126                 :           8 :         zmq_ctx_term(pcontext);
     127                 :             : 
     128                 :           8 :         pcontext = nullptr;
     129                 :             :     }
     130                 :           8 : }
     131                 :             : 
     132                 :             : namespace {
     133                 :             : 
     134                 :             : template <typename Function>
     135                 :         173 : void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
     136                 :             : {
     137         [ +  + ]:         522 :     for (auto i = notifiers.begin(); i != notifiers.end(); ) {
     138                 :         349 :         CZMQAbstractNotifier* notifier = i->get();
     139         [ +  - ]:         349 :         if (func(notifier)) {
     140                 :         349 :             ++i;
     141                 :             :         } else {
     142                 :           0 :             notifier->Shutdown();
     143                 :           0 :             i = notifiers.erase(i);
     144                 :             :         }
     145                 :             :     }
     146                 :         173 : }
     147                 :             : 
     148                 :             : } // anonymous namespace
     149                 :             : 
     150                 :          32 : void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
     151                 :             : {
     152         [ +  + ]:          32 :     if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
     153                 :             :         return;
     154                 :             : 
     155                 :          31 :     TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
     156                 :          79 :         return notifier->NotifyBlock(pindexNew);
     157                 :             :     });
     158                 :             : }
     159                 :             : 
     160                 :          29 : void CZMQNotificationInterface::TransactionAddedToMempool(const NewMempoolTransactionInfo& ptx, uint64_t mempool_sequence)
     161                 :             : {
     162                 :          29 :     const CTransaction& tx = *(ptx.info.m_tx);
     163                 :             : 
     164                 :          29 :     TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
     165   [ +  -  -  + ]:          38 :         return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
     166                 :             :     });
     167                 :          29 : }
     168                 :             : 
     169                 :           4 : void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
     170                 :             : {
     171                 :             :     // Called for all non-block inclusion reasons
     172                 :           4 :     const CTransaction& tx = *ptx;
     173                 :             : 
     174                 :           4 :     TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
     175                 :           4 :         return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
     176                 :             :     });
     177                 :           4 : }
     178                 :             : 
     179                 :          34 : void CZMQNotificationInterface::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
     180                 :             : {
     181         [ +  - ]:          34 :     if (role == ChainstateRole::BACKGROUND) {
     182                 :             :         return;
     183                 :             :     }
     184         [ +  + ]:          94 :     for (const CTransactionRef& ptx : pblock->vtx) {
     185                 :          60 :         const CTransaction& tx = *ptx;
     186                 :          60 :         TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
     187                 :         119 :             return notifier->NotifyTransaction(tx);
     188                 :             :         });
     189                 :             :     }
     190                 :             : 
     191                 :             :     // Next we notify BlockConnect listeners for *all* blocks
     192                 :          34 :     TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
     193                 :          84 :         return notifier->NotifyBlockConnect(pindexConnected);
     194                 :             :     });
     195                 :             : }
     196                 :             : 
     197                 :           6 : void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
     198                 :             : {
     199         [ +  + ]:          15 :     for (const CTransactionRef& ptx : pblock->vtx) {
     200                 :           9 :         const CTransaction& tx = *ptx;
     201                 :           9 :         TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
     202                 :          15 :             return notifier->NotifyTransaction(tx);
     203                 :             :         });
     204                 :             :     }
     205                 :             : 
     206                 :             :     // Next we notify BlockDisconnect listeners for *all* blocks
     207                 :           6 :     TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
     208                 :          10 :         return notifier->NotifyBlockDisconnect(pindexDisconnected);
     209                 :             :     });
     210                 :           6 : }
     211                 :             : 
     212                 :             : std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
        

Generated by: LCOV version 2.0-1