LCOV - code coverage report
Current view: top level - src/ipc/libmultiprocess/include/mp - type-context.h (source / functions) Coverage Total Hit
Test: total_coverage.info Lines: 87.9 % 58 51
Test Date: 2025-10-04 05:03:45 Functions: 11.0 % 436 48
Branches: 3.0 % 2056 61

             Branch data     Line data    Source code
       1                 :             : // Copyright (c) The Bitcoin Core developers
       2                 :             : // Distributed under the MIT software license, see the accompanying
       3                 :             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4                 :             : 
       5                 :             : #ifndef MP_PROXY_TYPE_CONTEXT_H
       6                 :             : #define MP_PROXY_TYPE_CONTEXT_H
       7                 :             : 
       8                 :             : #include <mp/proxy-io.h>
       9                 :             : #include <mp/util.h>
      10                 :             : 
      11                 :             : namespace mp {
      12                 :             : template <typename Output>
      13                 :          18 : void CustomBuildField(TypeList<>,
      14                 :             :     Priority<1>,
      15                 :             :     ClientInvokeContext& invoke_context,
      16                 :             :     Output&& output,
      17                 :             :     typename std::enable_if<std::is_same<decltype(output.get()), Context::Builder>::value>::type* enable = nullptr)
      18                 :             : {
      19                 :          18 :     auto& connection = invoke_context.connection;
      20                 :          18 :     auto& thread_context = invoke_context.thread_context;
      21                 :             : 
      22                 :             :     // Create local Thread::Server object corresponding to the current thread
      23                 :             :     // and pass a Thread::Client reference to it in the Context.callbackThread
      24                 :             :     // field so the function being called can make callbacks to this thread.
      25                 :             :     // Also store the Thread::Client reference in the callback_threads map so
      26                 :             :     // future calls over this connection can reuse it.
      27         [ +  - ]:          18 :     auto [callback_thread, _]{SetThread(
      28         [ +  - ]:          18 :         GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
      29   [ +  -  +  -  :          24 :         [&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
          +  -  +  -  -  
                      - ]
      30                 :             : 
      31                 :             :     // Call remote ThreadMap.makeThread function so server will create a
      32                 :             :     // dedicated worker thread to run function calls from this thread. Store the
      33                 :             :     // Thread::Client reference it returns in the request_threads map.
      34         [ -  - ]:          18 :     auto make_request_thread{[&]{
      35                 :             :         // This code will only run if an IPC client call is being made for the
      36                 :             :         // first time on this thread. After the first call, subsequent calls
      37                 :             :         // will use the existing request thread. This code will also never run at
      38                 :             :         // all if the current thread is a request thread created for a different
      39                 :             :         // IPC client, because in that case PassField code (below) will have set
      40                 :             :         // request_thread to point to the calling thread.
      41   [ -  -  +  -  :           6 :         auto request = connection.m_thread_map.makeThreadRequest();
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           #  # ][ #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           #  #  # ][ #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
             #  #  #  # ]
      42   [ -  -  -  -  :           6 :         request.setName(thread_context.thread_name);
          -  +  +  -  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           #  #  # ][ #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
             #  #  #  # ]
           [ #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                      # ]
      43   [ -  -  -  -  :           6 :         return request.send().getResult(); // Nonblocking due to capnp request pipelining.
          -  -  +  -  +  
          -  +  -  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
             #  #  #  # ]
           [ #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
      44                 :           6 :     }};
      45         [ +  - ]:          18 :     auto [request_thread, _1]{SetThread(
      46                 :          18 :         GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
      47         [ +  - ]:          36 :         &connection, make_request_thread)};
      48                 :             : 
      49                 :          18 :     auto context = output.init();
      50                 :          18 :     context.setThread(request_thread->second->m_client);
      51                 :          18 :     context.setCallbackThread(callback_thread->second->m_client);
      52                 :          18 : }
      53                 :             : 
      54                 :             : //! PassField override for mp.Context arguments. Return asynchronously and call
      55                 :             : //! function on other thread found in context.
      56                 :             : template <typename Accessor, typename ServerContext, typename Fn, typename... Args>
      57                 :          18 : auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& fn, Args&&... args) ->
      58                 :             :     typename std::enable_if<
      59                 :             :         std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
      60                 :             :         kj::Promise<typename ServerContext::CallContext>>::type
      61                 :             : {
      62                 :          18 :     const auto& params = server_context.call_context.getParams();
      63                 :          18 :     Context::Reader context_arg = Accessor::get(params);
      64                 :          18 :     auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
      65                 :          18 :     auto& server = server_context.proxy_server;
      66                 :          18 :     int req = server_context.req;
      67   [ +  -  +  -  :          72 :     auto invoke = [fulfiller = kj::mv(future.fulfiller),
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
      68                 :          18 :          call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
      69                 :          18 :                 const auto& params = call_context.getParams();
      70                 :          18 :                 Context::Reader context_arg = Accessor::get(params);
      71                 :          18 :                 ServerContext server_context{server, call_context, req};
      72                 :             :                 {
      73                 :             :                     // Before invoking the function, store a reference to the
      74                 :             :                     // callbackThread provided by the client in the
      75                 :             :                     // thread_local.request_threads map. This way, if this
      76                 :             :                     // server thread needs to execute any RPCs that call back to
      77                 :             :                     // the client, they will happen on the same client thread
      78                 :             :                     // that is waiting for this function, just like what would
      79                 :             :                     // happen if this were a normal function call made on the
      80                 :             :                     // local stack.
      81                 :             :                     //
      82                 :             :                     // If the request_threads map already has an entry for this
      83                 :             :                     // connection, it will be left unchanged, and it indicates
      84                 :             :                     // that the current thread is an RPC client thread which is
      85                 :             :                     // in the middle of an RPC call, and the current RPC call is
      86                 :             :                     // a nested call from the remote thread handling that RPC
      87                 :             :                     // call. In this case, the callbackThread value should point
      88                 :             :                     // to the same thread already in the map, so there is no
      89                 :             :                     // need to update the map.
      90                 :          18 :                     auto& thread_context = g_thread_context;
      91                 :          18 :                     auto& request_threads = thread_context.request_threads;
      92                 :          18 :                     ConnThread request_thread;
      93                 :             :                     bool inserted;
      94                 :          54 :                     server.m_context.loop->sync([&] {
      95                 :          18 :                         std::tie(request_thread, inserted) = SetThread(
      96   [ +  -  +  -  :          18 :                             GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
      97   [ +  -  +  -  :          54 :                             [&] { return context_arg.getCallbackThread(); });
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
      98                 :             :                     });
      99                 :             : 
     100                 :             :                     // If an entry was inserted into the request_threads map,
     101                 :             :                     // remove it after calling fn.invoke. If an entry was not
     102                 :             :                     // inserted, one already existed, meaning this must be a
     103                 :             :                     // recursive call (IPC call calling back to the caller which
     104                 :             :                     // makes another IPC call), so avoid modifying the map.
     105                 :          18 :                     const bool erase_thread{inserted};
     106   [ +  -  -  +  :          90 :                     KJ_DEFER(if (erase_thread) {
             +  -  +  - ]
     107                 :             :                         // Erase the request_threads entry on the event loop
     108                 :             :                         // thread with loop->sync(), so if the connection is
     109                 :             :                         // broken there is not a race between this thread and
     110                 :             :                         // the disconnect handler trying to destroy the thread
     111                 :             :                         // client object.
     112                 :             :                         server.m_context.loop->sync([&] {
     113                 :             :                             // Look up the thread again without using existing
     114                 :             :                             // iterator since entry may no longer be there after
     115                 :             :                             // a disconnect. Destroy node after releasing
     116                 :             :                             // Waiter::m_mutex, so the ProxyClient<Thread>
     117                 :             :                             // destructor is able to use EventLoop::mutex
     118                 :             :                             // without violating lock order.
     119                 :             :                             ConnThreads::node_type removed;
     120                 :             :                             {
     121                 :             :                                 Lock lock(thread_context.waiter->m_mutex);
     122                 :             :                                 removed = request_threads.extract(server.m_context.connection);
     123                 :             :                             }
     124                 :             :                         });
     125                 :             :                     });
     126   [ +  -  +  -  :          18 :                     fn.invoke(server_context, args...);
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     127                 :          18 :                 }
     128   [ +  +  -  +  :          72 :                 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
          -  +  -  +  -  
          +  -  +  -  +  
          -  +  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     129                 :             :                     server.m_context.loop->sync([&] {
     130                 :             :                         auto fulfiller_dispose = kj::mv(fulfiller);
     131                 :             :                         fulfiller_dispose->fulfill(kj::mv(call_context));
     132                 :             :                     });
     133                 :             :                 }))
     134                 :             :                 {
     135   [ #  #  #  #  :           0 :                     server.m_context.loop->sync([&]() {
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     136   [ #  #  #  #  :           0 :                         auto fulfiller_dispose = kj::mv(fulfiller);
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     137   [ #  #  #  #  :           0 :                         fulfiller_dispose->reject(kj::mv(*exception));
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     138                 :           0 :                     });
     139                 :             :                 }
     140                 :             :             };
     141                 :             : 
     142                 :             :     // Lookup Thread object specified by the client. The specified thread should
     143                 :             :     // be a local Thread::Server object, but it needs to be looked up
     144                 :             :     // asynchronously with getLocalServer().
     145         [ +  - ]:          18 :     auto thread_client = context_arg.getThread();
     146   [ +  -  +  - ]:          36 :     return server.m_context.connection->m_threads.getLocalServer(thread_client)
     147   [ +  -  +  -  :          90 :         .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
          -  +  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     148                 :             :             // Assuming the thread object is found, pass it a pointer to the
     149                 :             :             // `invoke` lambda above which will invoke the function on that
     150                 :             :             // thread.
     151   [ +  -  +  -  :          18 :             KJ_IF_MAYBE (thread_server, perhaps) {
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     152                 :          18 :                 const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
     153   [ +  -  +  -  :          36 :                 server.m_context.loop->log()
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     154   [ +  -  +  -  :          54 :                     << "IPC server post request  #" << req << " {" << thread.m_thread_context.thread_name << "}";
          +  -  +  -  +  
          -  +  -  +  -  
          +  -  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     155                 :          18 :                 thread.m_thread_context.waiter->post(std::move(invoke));
     156                 :             :             } else {
     157   [ #  #  #  #  :           0 :                 server.m_context.loop->log()
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     158   [ #  #  #  #  :           0 :                     << "IPC server error request #" << req << ", missing thread to execute request";
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     159   [ #  #  #  #  :           0 :                 throw std::runtime_error("invalid thread handle");
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     160                 :             :             }
     161                 :             :         })
     162                 :             :         // Wait for the invocation to finish before returning to the caller.
     163   [ +  +  -  +  :          90 :         .then([invoke_wait = kj::mv(future.promise)]() mutable { return kj::mv(invoke_wait); });
          -  +  -  -  +  
          -  +  -  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
           # ][ #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
          #  #  #  #  #  
                   #  # ]
     164   [ +  -  +  -  :          18 : }
                   -  - ]
     165                 :             : } // namespace mp
     166                 :             : 
     167                 :             : #endif // MP_PROXY_TYPE_CONTEXT_H
        

Generated by: LCOV version 2.0-1