LCOV - code coverage report
Current view: top level - src/zmq - zmqpublishnotifier.cpp (source / functions) Coverage Total Hit
Test: total_coverage.info Lines: 86.3 % 146 126
Test Date: 2025-01-19 05:08:01 Functions: 100.0 % 14 14
Branches: 53.8 % 130 70

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) 2015-2022 The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #include <zmq/zmqpublishnotifier.h>
       6                 :             : 
       7                 :             : #include <chain.h>
       8                 :             : #include <chainparams.h>
       9                 :             : #include <crypto/common.h>
      10                 :             : #include <kernel/cs_main.h>
      11                 :             : #include <logging.h>
      12                 :             : #include <netaddress.h>
      13                 :             : #include <netbase.h>
      14                 :             : #include <node/blockstorage.h>
      15                 :             : #include <primitives/block.h>
      16                 :             : #include <primitives/transaction.h>
      17                 :             : #include <rpc/server.h>
      18                 :             : #include <serialize.h>
      19                 :             : #include <streams.h>
      20                 :             : #include <sync.h>
      21                 :             : #include <uint256.h>
      22                 :             : #include <zmq/zmqutil.h>
      23                 :             : 
      24                 :             : #include <zmq.h>
      25                 :             : 
      26                 :             : #include <cassert>
      27                 :             : #include <cstdarg>
      28                 :             : #include <cstddef>
      29                 :             : #include <cstdint>
      30                 :             : #include <cstring>
      31                 :             : #include <map>
      32                 :             : #include <optional>
      33                 :             : #include <string>
      34                 :             : #include <utility>
      35                 :             : #include <vector>
      36                 :             : 
      37                 :             : namespace Consensus {
      38                 :             : struct Params;
      39                 :             : }
      40                 :             : 
      41                 :             : static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
      42                 :             : 
      43                 :             : static const char *MSG_HASHBLOCK = "hashblock";
      44                 :             : static const char *MSG_HASHTX    = "hashtx";
      45                 :             : static const char *MSG_RAWBLOCK  = "rawblock";
      46                 :             : static const char *MSG_RAWTX     = "rawtx";
      47                 :             : static const char *MSG_SEQUENCE  = "sequence";
      48                 :             : 
      49                 :             : // Internal function to send multipart message
      50                 :         127 : static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
      51                 :             : {
      52                 :         127 :     va_list args;
      53                 :         127 :     va_start(args, size);
      54                 :             : 
      55                 :         635 :     while (1)
      56                 :             :     {
      57                 :         381 :         zmq_msg_t msg;
      58                 :             : 
      59                 :         381 :         int rc = zmq_msg_init_size(&msg, size);
      60         [ -  + ]:         381 :         if (rc != 0)
      61                 :             :         {
      62         [ #  # ]:           0 :             zmqError("Unable to initialize ZMQ msg");
      63                 :           0 :             va_end(args);
      64                 :           0 :             return -1;
      65                 :             :         }
      66                 :             : 
      67                 :         381 :         void *buf = zmq_msg_data(&msg);
      68                 :         381 :         memcpy(buf, data, size);
      69                 :             : 
      70                 :         381 :         data = va_arg(args, const void*);
      71                 :             : 
      72         [ +  + ]:         508 :         rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
      73         [ -  + ]:         381 :         if (rc == -1)
      74                 :             :         {
      75         [ #  # ]:           0 :             zmqError("Unable to send ZMQ msg");
      76                 :           0 :             zmq_msg_close(&msg);
      77                 :           0 :             va_end(args);
      78                 :           0 :             return -1;
      79                 :             :         }
      80                 :             : 
      81                 :         381 :         zmq_msg_close(&msg);
      82                 :             : 
      83         [ +  + ]:         381 :         if (!data)
      84                 :             :             break;
      85                 :             : 
      86                 :         254 :         size = va_arg(args, size_t);
      87                 :         254 :     }
      88                 :         127 :     va_end(args);
      89                 :         127 :     return 0;
      90                 :             : }
      91                 :             : 
      92                 :           9 : static bool IsZMQAddressIPV6(const std::string &zmq_address)
      93                 :             : {
      94                 :           9 :     const std::string tcp_prefix = "tcp://";
      95                 :           9 :     const size_t tcp_index = zmq_address.rfind(tcp_prefix);
      96                 :           9 :     const size_t colon_index = zmq_address.rfind(':');
      97         [ +  + ]:           9 :     if (tcp_index == 0 && colon_index != std::string::npos) {
      98         [ +  - ]:           6 :         const std::string ip = zmq_address.substr(tcp_prefix.length(), colon_index - tcp_prefix.length());
      99   [ +  -  +  - ]:           6 :         const std::optional<CNetAddr> addr{LookupHost(ip, false)};
     100   [ +  -  -  + ]:           6 :         if (addr.has_value() && addr.value().IsIPv6()) return true;
     101                 :           6 :     }
     102                 :             :     return false;
     103                 :           9 : }
     104                 :             : 
     105                 :          16 : bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
     106                 :             : {
     107         [ -  + ]:          16 :     assert(!psocket);
     108                 :             : 
     109                 :             :     // check if address is being used by other publish notifier
     110                 :          16 :     std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
     111                 :             : 
     112         [ +  + ]:          16 :     if (i==mapPublishNotifiers.end())
     113                 :             :     {
     114                 :           9 :         psocket = zmq_socket(pcontext, ZMQ_PUB);
     115         [ -  + ]:           9 :         if (!psocket)
     116                 :             :         {
     117         [ #  # ]:           0 :             zmqError("Failed to create socket");
     118                 :           0 :             return false;
     119                 :             :         }
     120                 :             : 
     121         [ +  - ]:           9 :         LogDebug(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
     122                 :             : 
     123                 :           9 :         int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
     124         [ -  + ]:           9 :         if (rc != 0)
     125                 :             :         {
     126         [ #  # ]:           0 :             zmqError("Failed to set outbound message high water mark");
     127                 :           0 :             zmq_close(psocket);
     128                 :           0 :             return false;
     129                 :             :         }
     130                 :             : 
     131                 :           9 :         const int so_keepalive_option {1};
     132                 :           9 :         rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option));
     133         [ -  + ]:           9 :         if (rc != 0) {
     134         [ #  # ]:           0 :             zmqError("Failed to set SO_KEEPALIVE");
     135                 :           0 :             zmq_close(psocket);
     136                 :           0 :             return false;
     137                 :             :         }
     138                 :             : 
     139                 :             :         // On some systems (e.g. OpenBSD) the ZMQ_IPV6 must not be enabled, if the address to bind isn't IPv6
     140         [ +  - ]:           9 :         const int enable_ipv6 { IsZMQAddressIPV6(address) ? 1 : 0};
     141                 :           9 :         rc = zmq_setsockopt(psocket, ZMQ_IPV6, &enable_ipv6, sizeof(enable_ipv6));
     142         [ -  + ]:           9 :         if (rc != 0) {
     143         [ #  # ]:           0 :             zmqError("Failed to set ZMQ_IPV6");
     144                 :           0 :             zmq_close(psocket);
     145                 :           0 :             return false;
     146                 :             :         }
     147                 :             : 
     148                 :           9 :         rc = zmq_bind(psocket, address.c_str());
     149         [ +  + ]:           9 :         if (rc != 0)
     150                 :             :         {
     151         [ +  - ]:           2 :             zmqError("Failed to bind address");
     152                 :           2 :             zmq_close(psocket);
     153                 :           2 :             return false;
     154                 :             :         }
     155                 :             : 
     156                 :             :         // register this notifier for the address, so it can be reused for other publish notifier
     157         [ +  - ]:           7 :         mapPublishNotifiers.insert(std::make_pair(address, this));
     158                 :           7 :         return true;
     159                 :             :     }
     160                 :             :     else
     161                 :             :     {
     162         [ +  - ]:           7 :         LogDebug(BCLog::ZMQ, "Reusing socket for address %s\n", address);
     163         [ +  - ]:           7 :         LogDebug(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
     164                 :             : 
     165                 :           7 :         psocket = i->second->psocket;
     166         [ +  - ]:           7 :         mapPublishNotifiers.insert(std::make_pair(address, this));
     167                 :             : 
     168                 :           7 :         return true;
     169                 :             :     }
     170                 :             : }
     171                 :             : 
     172                 :          18 : void CZMQAbstractPublishNotifier::Shutdown()
     173                 :             : {
     174                 :             :     // Early return if Initialize was not called
     175         [ +  + ]:          18 :     if (!psocket) return;
     176                 :             : 
     177                 :          16 :     int count = mapPublishNotifiers.count(address);
     178                 :             : 
     179                 :             :     // remove this notifier from the list of publishers using this address
     180                 :          16 :     typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
     181                 :          16 :     std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
     182                 :             : 
     183         [ +  + ]:          16 :     for (iterator it = iterpair.first; it != iterpair.second; ++it)
     184                 :             :     {
     185         [ +  - ]:          14 :         if (it->second==this)
     186                 :             :         {
     187                 :          14 :             mapPublishNotifiers.erase(it);
     188                 :          14 :             break;
     189                 :             :         }
     190                 :             :     }
     191                 :             : 
     192         [ +  + ]:          16 :     if (count == 1)
     193                 :             :     {
     194         [ +  - ]:           7 :         LogDebug(BCLog::ZMQ, "Close socket at address %s\n", address);
     195                 :           7 :         int linger = 0;
     196                 :           7 :         zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
     197                 :           7 :         zmq_close(psocket);
     198                 :             :     }
     199                 :             : 
     200                 :          16 :     psocket = nullptr;
     201                 :             : }
     202                 :             : 
     203                 :         127 : bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size)
     204                 :             : {
     205         [ -  + ]:         127 :     assert(psocket);
     206                 :             : 
     207                 :             :     /* send three parts, command & data & a LE 4byte sequence number */
     208                 :         127 :     unsigned char msgseq[sizeof(uint32_t)];
     209                 :         127 :     WriteLE32(msgseq, nSequence);
     210                 :         127 :     int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
     211         [ +  - ]:         127 :     if (rc == -1)
     212                 :             :         return false;
     213                 :             : 
     214                 :             :     /* increment memory only sequence number after sending */
     215                 :         127 :     nSequence++;
     216                 :             : 
     217                 :         127 :     return true;
     218                 :             : }
     219                 :             : 
     220                 :          22 : bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
     221                 :             : {
     222                 :          22 :     uint256 hash = pindex->GetBlockHash();
     223   [ +  -  +  - ]:          44 :     LogDebug(BCLog::ZMQ, "Publish hashblock %s to %s\n", hash.GetHex(), this->address);
     224                 :             :     uint8_t data[32];
     225         [ +  + ]:         726 :     for (unsigned int i = 0; i < 32; i++) {
     226                 :         704 :         data[31 - i] = hash.begin()[i];
     227                 :             :     }
     228                 :          22 :     return SendZmqMessage(MSG_HASHBLOCK, data, 32);
     229                 :             : }
     230                 :             : 
     231                 :          31 : bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
     232                 :             : {
     233                 :          31 :     uint256 hash = transaction.GetHash();
     234   [ +  -  +  - ]:          62 :     LogDebug(BCLog::ZMQ, "Publish hashtx %s to %s\n", hash.GetHex(), this->address);
     235                 :             :     uint8_t data[32];
     236         [ +  + ]:        1023 :     for (unsigned int i = 0; i < 32; i++) {
     237                 :         992 :         data[31 - i] = hash.begin()[i];
     238                 :             :     }
     239                 :          31 :     return SendZmqMessage(MSG_HASHTX, data, 32);
     240                 :             : }
     241                 :             : 
     242                 :          14 : bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
     243                 :             : {
     244   [ +  -  +  - ]:          28 :     LogDebug(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
     245                 :             : 
     246                 :          14 :     std::vector<uint8_t> block{};
     247   [ +  -  -  + ]:          14 :     if (!m_get_block_by_index(block, *pindex)) {
     248   [ #  #  #  # ]:           0 :         zmqError("Can't read block from disk");
     249                 :           0 :         return false;
     250                 :             :     }
     251                 :             : 
     252         [ +  - ]:          14 :     return SendZmqMessage(MSG_RAWBLOCK, block.data(), block.size());
     253                 :          14 : }
     254                 :             : 
     255                 :          18 : bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
     256                 :             : {
     257                 :          18 :     uint256 hash = transaction.GetHash();
     258   [ +  -  +  - ]:          36 :     LogDebug(BCLog::ZMQ, "Publish rawtx %s to %s\n", hash.GetHex(), this->address);
     259                 :          18 :     DataStream ss;
     260         [ +  - ]:          18 :     ss << TX_WITH_WITNESS(transaction);
     261         [ +  - ]:          18 :     return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
     262                 :          18 : }
     263                 :             : 
     264                 :             : // Helper function to send a 'sequence' topic message with the following structure:
     265                 :             : //    <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional)
     266                 :          42 : static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional<uint64_t> sequence = {})
     267                 :             : {
     268                 :          42 :     unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)];
     269         [ +  + ]:        1386 :     for (unsigned int i = 0; i < sizeof(hash); ++i) {
     270                 :        1344 :         data[sizeof(hash) - 1 - i] = hash.begin()[i];
     271                 :             :     }
     272                 :          42 :     data[sizeof(hash)] = label;
     273         [ +  + ]:          42 :     if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence);
     274         [ +  + ]:          56 :     return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label));
     275                 :             : }
     276                 :             : 
     277                 :          12 : bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
     278                 :             : {
     279                 :          12 :     uint256 hash = pindex->GetBlockHash();
     280   [ +  -  +  - ]:          24 :     LogDebug(BCLog::ZMQ, "Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
     281                 :          12 :     return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C');
     282                 :             : }
     283                 :             : 
     284                 :           2 : bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
     285                 :             : {
     286                 :           2 :     uint256 hash = pindex->GetBlockHash();
     287   [ +  -  +  - ]:           4 :     LogDebug(BCLog::ZMQ, "Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
     288                 :           2 :     return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D');
     289                 :             : }
     290                 :             : 
     291                 :          24 : bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
     292                 :             : {
     293                 :          24 :     uint256 hash = transaction.GetHash();
     294   [ +  -  +  - ]:          48 :     LogDebug(BCLog::ZMQ, "Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
     295                 :          24 :     return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence);
     296                 :             : }
     297                 :             : 
     298                 :           4 : bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
     299                 :             : {
     300                 :           4 :     uint256 hash = transaction.GetHash();
     301   [ +  -  +  - ]:           8 :     LogDebug(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
     302                 :           4 :     return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
     303                 :             : }
        

Generated by: LCOV version 2.0-1