Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef MP_PROXY_TYPE_CONTEXT_H
6 : : #define MP_PROXY_TYPE_CONTEXT_H
7 : :
8 : : #include <mp/proxy-io.h>
9 : : #include <mp/util.h>
10 : :
11 : : namespace mp {
12 : : template <typename Output>
13 : 18 : void CustomBuildField(TypeList<>,
14 : : Priority<1>,
15 : : ClientInvokeContext& invoke_context,
16 : : Output&& output,
17 : : typename std::enable_if<std::is_same<decltype(output.get()), Context::Builder>::value>::type* enable = nullptr)
18 : : {
19 : 18 : auto& connection = invoke_context.connection;
20 : 18 : auto& thread_context = invoke_context.thread_context;
21 : :
22 : : // Create local Thread::Server object corresponding to the current thread
23 : : // and pass a Thread::Client reference to it in the Context.callbackThread
24 : : // field so the function being called can make callbacks to this thread.
25 : : // Also store the Thread::Client reference in the callback_threads map so
26 : : // future calls over this connection can reuse it.
27 [ + - ]: 18 : auto [callback_thread, _]{SetThread(
28 [ + - ]: 18 : thread_context.callback_threads, thread_context.waiter->m_mutex, &connection,
29 [ + - + - : 24 : [&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
+ - + - -
- ]
30 : :
31 : : // Call remote ThreadMap.makeThread function so server will create a
32 : : // dedicated worker thread to run function calls from this thread. Store the
33 : : // Thread::Client reference it returns in the request_threads map.
34 [ - - ]: 18 : auto make_request_thread{[&]{
35 : : // This code will only run if an IPC client call is being made for the
36 : : // first time on this thread. After the first call, subsequent calls
37 : : // will use the existing request thread. This code will also never run at
38 : : // all if the current thread is a request thread created for a different
39 : : // IPC client, because in that case PassField code (below) will have set
40 : : // request_thread to point to the calling thread.
41 [ - - + - : 6 : auto request = connection.m_thread_map.makeThreadRequest();
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
42 [ - - - - : 6 : request.setName(thread_context.thread_name);
- + + - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
43 [ - - - - : 6 : return request.send().getResult(); // Nonblocking due to capnp request pipelining.
- - + - +
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
44 : 6 : }};
45 [ + - ]: 18 : auto [request_thread, _1]{SetThread(
46 : 18 : thread_context.request_threads, thread_context.waiter->m_mutex,
47 [ + - ]: 36 : &connection, make_request_thread)};
48 : :
49 : 18 : auto context = output.init();
50 : 18 : context.setThread(request_thread->second.m_client);
51 : 18 : context.setCallbackThread(callback_thread->second.m_client);
52 : 18 : }
53 : :
54 : : //! PassField override for mp.Context arguments. Return asynchronously and call
55 : : //! function on other thread found in context.
56 : : template <typename Accessor, typename ServerContext, typename Fn, typename... Args>
57 : 18 : auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& fn, Args&&... args) ->
58 : : typename std::enable_if<
59 : : std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
60 : : kj::Promise<typename ServerContext::CallContext>>::type
61 : : {
62 : 18 : const auto& params = server_context.call_context.getParams();
63 : 18 : Context::Reader context_arg = Accessor::get(params);
64 : 18 : auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
65 : 18 : auto& server = server_context.proxy_server;
66 : 18 : int req = server_context.req;
67 [ + - + - : 72 : auto invoke = [fulfiller = kj::mv(future.fulfiller),
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
68 : 18 : call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
69 : 18 : const auto& params = call_context.getParams();
70 : 18 : Context::Reader context_arg = Accessor::get(params);
71 : 18 : ServerContext server_context{server, call_context, req};
72 : : {
73 : : // Before invoking the function, store a reference to the
74 : : // callbackThread provided by the client in the
75 : : // thread_local.request_threads map. This way, if this
76 : : // server thread needs to execute any RPCs that call back to
77 : : // the client, they will happen on the same client thread
78 : : // that is waiting for this function, just like what would
79 : : // happen if this were a normal function call made on the
80 : : // local stack.
81 : : //
82 : : // If the request_threads map already has an entry for this
83 : : // connection, it will be left unchanged, and it indicates
84 : : // that the current thread is an RPC client thread which is
85 : : // in the middle of an RPC call, and the current RPC call is
86 : : // a nested call from the remote thread handling that RPC
87 : : // call. In this case, the callbackThread value should point
88 : : // to the same thread already in the map, so there is no
89 : : // need to update the map.
90 : 18 : auto& thread_context = g_thread_context;
91 : 18 : auto& request_threads = thread_context.request_threads;
92 : 18 : auto [request_thread, inserted]{SetThread(
93 [ + - + - : 18 : request_threads, thread_context.waiter->m_mutex,
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
94 : : server.m_context.connection,
95 [ + - + - : 36 : [&] { return context_arg.getCallbackThread(); })};
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
96 : :
97 : : // If an entry was inserted into the requests_threads map,
98 : : // remove it after calling fn.invoke. If an entry was not
99 : : // inserted, one already existed, meaning this must be a
100 : : // recursive call (IPC call calling back to the caller which
101 : : // makes another IPC call), so avoid modifying the map.
102 : 18 : const bool erase_thread{inserted};
103 [ + - + - ]: 54 : KJ_DEFER(if (erase_thread) {
104 : : std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
105 : : // Call erase here with a Connection* argument instead
106 : : // of an iterator argument, because the `request_thread`
107 : : // iterator may be invalid if the connection is closed
108 : : // during this function call. More specifically, the
109 : : // iterator may be invalid because SetThread adds a
110 : : // cleanup callback to the Connection destructor that
111 : : // erases the thread from the map, and also because the
112 : : // ProxyServer<Thread> destructor calls
113 : : // request_threads.clear().
114 : : request_threads.erase(server.m_context.connection);
115 : : });
116 [ + - + - : 18 : fn.invoke(server_context, args...);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
117 : 18 : }
118 [ + + - + : 72 : KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
- + - + -
+ - + - +
- + # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
119 : : server.m_context.loop->sync([&] {
120 : : auto fulfiller_dispose = kj::mv(fulfiller);
121 : : fulfiller_dispose->fulfill(kj::mv(call_context));
122 : : });
123 : : }))
124 : : {
125 [ # # # # : 0 : server.m_context.loop->sync([&]() {
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
126 [ # # # # : 0 : auto fulfiller_dispose = kj::mv(fulfiller);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
127 [ # # # # : 0 : fulfiller_dispose->reject(kj::mv(*exception));
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
128 : 0 : });
129 : : }
130 : : };
131 : :
132 : : // Lookup Thread object specified by the client. The specified thread should
133 : : // be a local Thread::Server object, but it needs to be looked up
134 : : // asynchronously with getLocalServer().
135 [ + - ]: 18 : auto thread_client = context_arg.getThread();
136 [ + - + - ]: 36 : return server.m_context.connection->m_threads.getLocalServer(thread_client)
137 [ + - + - : 90 : .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
- + # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
138 : : // Assuming the thread object is found, pass it a pointer to the
139 : : // `invoke` lambda above which will invoke the function on that
140 : : // thread.
141 [ + - + - : 18 : KJ_IF_MAYBE (thread_server, perhaps) {
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
142 : 18 : const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
143 [ + - + - : 36 : server.m_context.loop->log()
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
144 [ + - + - : 54 : << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
+ - + - +
- + - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
145 : 18 : thread.m_thread_context.waiter->post(std::move(invoke));
146 : : } else {
147 [ # # # # : 0 : server.m_context.loop->log()
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
148 [ # # # # : 0 : << "IPC server error request #" << req << ", missing thread to execute request";
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
149 [ # # # # : 0 : throw std::runtime_error("invalid thread handle");
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
150 : : }
151 : : })
152 : : // Wait for the invocation to finish before returning to the caller.
153 [ + + - + : 90 : .then([invoke_wait = kj::mv(future.promise)]() mutable { return kj::mv(invoke_wait); });
- + - - +
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
154 [ + - + - : 18 : }
- - ]
155 : : } // namespace mp
156 : :
157 : : #endif // MP_PROXY_TYPE_CONTEXT_H
|