Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef MP_PROXY_IO_H
6 : : #define MP_PROXY_IO_H
7 : :
8 : : #include <mp/proxy.h>
9 : : #include <mp/util.h>
10 : :
11 : : #include <mp/proxy.capnp.h>
12 : :
13 : : #include <capnp/rpc-twoparty.h>
14 : :
15 : : #include <assert.h>
16 : : #include <condition_variable>
17 : : #include <functional>
18 : : #include <kj/function.h>
19 : : #include <map>
20 : : #include <memory>
21 : : #include <optional>
22 : : #include <sstream>
23 : : #include <string>
24 : : #include <thread>
25 : :
26 : : namespace mp {
27 : : struct ThreadContext;
28 : :
29 : : struct InvokeContext
30 : : {
31 : : Connection& connection;
32 : : };
33 : :
34 : : struct ClientInvokeContext : InvokeContext
35 : : {
36 : : ThreadContext& thread_context;
37 : 30 : ClientInvokeContext(Connection& conn, ThreadContext& thread_context)
38 [ + - + - : 30 : : InvokeContext{conn}, thread_context{thread_context}
+ - + - +
- ][ + - +
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- + - +
- ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
39 : : {
40 : : }
41 : : };
42 : :
43 : : template <typename ProxyServer, typename CallContext_>
44 : : struct ServerInvokeContext : InvokeContext
45 : : {
46 : : using CallContext = CallContext_;
47 : :
48 : : ProxyServer& proxy_server;
49 : : CallContext& call_context;
50 : : int req;
51 : :
52 : 48 : ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
53 [ + - + - : 30 : : InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
+ - + - +
- + - ][ +
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - +
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # ]
54 : : {
55 : : }
56 : : };
57 : :
58 : : template <typename Interface, typename Params, typename Results>
59 : : using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
60 : :
61 : : template <>
62 : : struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
63 : : {
64 [ + - ]: 30 : using ProxyClientBase::ProxyClientBase;
65 : : // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
66 : : ProxyClient(const ProxyClient&) = delete;
67 : : ~ProxyClient();
68 : :
69 : : void setDisconnectCallback(const std::function<void()>& fn);
70 : :
71 : : //! Reference to callback function that is run if there is a sudden
72 : : //! disconnect and the Connection object is destroyed before this
73 : : //! ProxyClient<Thread> object. The callback will destroy this object and
74 : : //! remove its entry from the thread's request_threads or callback_threads
75 : : //! map. It will also reset m_disconnect_cb so the destructor does not
76 : : //! access it. In the normal case where there is no sudden disconnect, the
77 : : //! destructor will unregister m_disconnect_cb so the callback is never run.
78 : : //! Since this variable is accessed from multiple threads, accesses should
79 : : //! be guarded with the associated Waiter::m_mutex.
80 : : std::optional<CleanupIt> m_disconnect_cb;
81 : : };
82 : :
83 : : template <>
84 : : struct ProxyServer<Thread> final : public Thread::Server
85 : : {
86 : : public:
87 : : ProxyServer(ThreadContext& thread_context, std::thread&& thread);
88 : : ~ProxyServer();
89 : : kj::Promise<void> getName(GetNameContext context) override;
90 : : ThreadContext& m_thread_context;
91 : : std::thread m_thread;
92 : : };
93 : :
94 : : //! Handler for kj::TaskSet failed task events.
95 : : class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
96 : : {
97 : : public:
98 [ + - ]: 17 : LoggingErrorHandler(EventLoop& loop) : m_loop(loop) {}
99 : : void taskFailed(kj::Exception&& exception) override;
100 : : EventLoop& m_loop;
101 : : };
102 : :
103 : : using LogFn = std::function<void(bool raise, std::string message)>;
104 : :
105 : : class Logger
106 : : {
107 : : public:
108 : 177 : Logger(bool raise, LogFn& fn) : m_raise(raise), m_fn(fn) {}
109 : : Logger(Logger&& logger) : m_raise(logger.m_raise), m_fn(logger.m_fn), m_buffer(std::move(logger.m_buffer)) {}
110 : 177 : ~Logger() noexcept(false)
111 : : {
112 [ + - + - : 354 : if (m_fn) m_fn(m_raise, m_buffer.str());
+ - ]
113 : 177 : }
114 : :
115 : : template <typename T>
116 : 1225 : friend Logger& operator<<(Logger& logger, T&& value)
117 : : {
118 [ + - + - : 949 : if (logger.m_fn) logger.m_buffer << std::forward<T>(value);
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ + -
+ - + - +
- + - + -
+ - + - +
- - - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - - - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- - - + -
+ - + - +
- + - + -
+ - + - +
- - - ][ +
- - - - -
- - - - -
- - - - -
+ - + - +
- + - + -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
+ - - - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
- - + - +
- + - + -
+ - + - +
- + - + -
- - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ + - -
+ - - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - ]
[ - - # # ]
[ - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ + - + -
+ - + - +
- - - - -
- - + - +
- + - + -
+ - - - -
- - - + -
+ - + - +
- + - + -
+ - + - +
- - - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- + - + -
+ - + - +
- + - + -
+ - + - -
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - +
- ]
119 : 476 : return logger;
120 : : }
121 : :
122 : : template <typename T>
123 [ + - + - : 177 : friend Logger& operator<<(Logger&& logger, T&& value)
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - +
- - - + -
+ - - - +
- + - - -
+ - + - -
- + - + -
- - + - +
- - - +
- ][ - - -
- + - - -
- - - - -
- - - + -
- - + - +
- - - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- - - + -
+ - - - -
- + - + -
- - - - +
- + - - -
- - + - +
- - - - -
+ - + - +
- - - + -
+ - ][ + - ]
[ - - - -
- - + - +
- - - - -
+ - + - ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ + - - -
- - + - -
- + - - -
+ - + - -
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
- - - - -
- - - - -
- + - + -
- - - - +
- + - ]
124 : : {
125 [ - + + - : 177 : return logger << std::forward<T>(value);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- - - + -
+ - - - -
- - - + -
+ - - - -
- - - + -
+ - - - -
- - - + -
+ - - - -
- - - + -
+ - + - -
- - - + -
+ - ][ - -
- - + - +
- + - - -
+ - + - +
- + - ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ +
- - - + -
- - + - +
- + - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- - - - -
- - - - -
- - - + -
+ - - - -
- - - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# ]
126 : : }
127 : :
128 : : bool m_raise;
129 : : LogFn& m_fn;
130 : : std::ostringstream m_buffer;
131 : : };
132 : :
133 [ + - ]: 6 : struct LogOptions {
134 : :
135 : : //! External logging callback.
136 : : LogFn log_fn;
137 : :
138 : : //! Maximum number of characters to use when representing
139 : : //! request and response structs as strings.
140 : : size_t max_chars{200};
141 : : };
142 : :
143 : : std::string LongThreadName(const char* exe_name);
144 : :
145 : : //! Event loop implementation.
146 : : //!
147 : : //! Cap'n Proto threading model is very simple: all I/O operations are
148 : : //! asynchronous and must be performed on a single thread. This includes:
149 : : //!
150 : : //! - Code starting an asynchronous operation (calling a function that returns a
151 : : //! promise object)
152 : : //! - Code notifying that an asynchronous operation is complete (code using a
153 : : //! fulfiller object)
154 : : //! - Code handling a completed operation (code chaining or waiting for a promise)
155 : : //!
156 : : //! All of this code needs to access shared state, and there is no mutex that
157 : : //! can be acquired to lock this state because Cap'n Proto
158 : : //! assumes it will only be accessed from one thread. So all this code needs to
159 : : //! actually run on one thread, and the EventLoop::loop() method is the entry point for
160 : : //! this thread. ProxyClient and ProxyServer objects that use other threads and
161 : : //! need to perform I/O operations post to this thread using EventLoop::post()
162 : : //! and EventLoop::sync() methods.
163 : : //!
164 : : //! Specifically, because ProxyClient methods can be called from arbitrary
165 : : //! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
166 : : //! methods use the EventLoop thread to send requests, and ProxyServer methods
167 : : //! use the thread to return results.
168 : : //!
169 : : //! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
170 : : class EventLoop
171 : : {
172 : : public:
173 : : //! Construct event loop object.
174 : : EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr);
175 : : ~EventLoop();
176 : :
177 : : //! Run event loop. Does not return until shutdown. This should only be
178 : : //! called once from the m_thread_id thread. This will block until
179 : : //! the m_num_clients reference count is 0.
180 : : void loop();
181 : :
182 : : //! Run function on event loop thread. Does not return until function completes.
183 : : //! Must be called while the loop() function is active.
184 : : void post(kj::Function<void()> fn);
185 : :
186 : : //! Wrapper around EventLoop::post that takes advantage of the
187 : : //! fact that callable will not go out of scope to avoid requirement that it
188 : : //! be copyable.
189 : : template <typename Callable>
190 : 99 : void sync(Callable&& callable)
191 : : {
192 [ + - ]: 99 : post(std::forward<Callable>(callable));
193 : 99 : }
194 : :
195 : : //! Register cleanup function to run on asynchronous worker thread without
196 : : //! blocking the event loop thread.
197 : : void addAsyncCleanup(std::function<void()> fn);
198 : :
199 : : //! Start asynchronous worker thread if necessary. This is only done if
200 : : //! there are ProxyServerBase::m_impl objects that need to be destroyed
201 : : //! asynchronously, without tying up the event loop thread. This can happen
202 : : //! when an interface does not declare a destroy() method that would allow
203 : : //! the client to wait for the destructor to finish and run it on a
204 : : //! dedicated thread. It can also happen whenever this is a broken
205 : : //! connection and the client is no longer around to call the destructors
206 : : //! and the server objects need to be garbage collected. In both cases, it
207 : : //! is important that ProxyServer::m_impl destructors do not run on the
208 : : //! eventloop thread because they may need it to do I/O if they perform
209 : : //! other IPC calls.
210 : : void startAsyncThread() MP_REQUIRES(m_mutex);
211 : :
212 : : //! Check if loop should exit.
213 : : bool done() const MP_REQUIRES(m_mutex);
214 : :
215 : 116 : Logger log()
216 : : {
217 : 116 : Logger logger(false, m_log_opts.log_fn);
218 [ + - + - : 232 : logger << "{" << LongThreadName(m_exe_name) << "} ";
+ - + - ]
219 : 116 : return logger;
220 : 0 : }
221 [ - - - - : 61 : Logger logPlain() { return {false, m_log_opts.log_fn}; }
+ - + - +
- - - - -
+ - + - +
- - - - -
+ - + - +
- - - - -
+ - + - +
- - - - -
+ - + - +
- + - - -
+ - + - +
- ][ - - -
- + - + -
+ - - - -
- + - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- - - + -
+ - + - -
- - - + -
+ - + - ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
222 [ # # # # : 0 : Logger raise() { return {true, m_log_opts.log_fn}; }
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
223 : :
224 : : //! Process name included in thread names so combined debug output from
225 : : //! multiple processes is easier to understand.
226 : : const char* m_exe_name;
227 : :
228 : : //! ID of the event loop thread
229 : : std::thread::id m_thread_id = std::this_thread::get_id();
230 : :
231 : : //! Handle of an async worker thread. Joined on destruction. Unset if async
232 : : //! method has not been called.
233 : : std::thread m_async_thread;
234 : :
235 : : //! Callback function to run on event loop thread during post() or sync() call.
236 : : kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
237 : :
238 : : //! Callback functions to run on async thread.
239 : : std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
240 : :
241 : : //! Pipe read handle used to wake up the event loop thread.
242 : : int m_wait_fd = -1;
243 : :
244 : : //! Pipe write handle used to wake up the event loop thread.
245 : : int m_post_fd = -1;
246 : :
247 : : //! Number of clients holding references to ProxyServerBase objects that
248 : : //! reference this event loop.
249 : : int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
250 : :
251 : : //! Mutex and condition variable used to post tasks to event loop and async
252 : : //! thread.
253 : : Mutex m_mutex;
254 : : std::condition_variable m_cv;
255 : :
256 : : //! Capnp IO context.
257 : : kj::AsyncIoContext m_io_context;
258 : :
259 : : //! Capnp error handler. Needs to outlive m_task_set.
260 : : LoggingErrorHandler m_error_handler{*this};
261 : :
262 : : //! Capnp list of pending promises.
263 : : std::unique_ptr<kj::TaskSet> m_task_set;
264 : :
265 : : //! List of connections.
266 : : std::list<Connection> m_incoming_connections;
267 : :
268 : : //! Logging options
269 : : LogOptions m_log_opts;
270 : :
271 : : //! External context pointer.
272 : : void* m_context;
273 : : };
274 : :
275 : : //! Single element task queue used to handle recursive capnp calls. (If server
276 : : //! makes an callback into the client in the middle of a request, while client
277 : : //! thread is blocked waiting for server response, this is what allows the
278 : : //! client to run the request in the same thread, the same way code would run in
279 : : //! single process, with the callback sharing same thread stack as the original
280 : : //! call.
281 : 7 : struct Waiter
282 : : {
283 : 7 : Waiter() = default;
284 : :
285 : : template <typename Fn>
286 : 18 : void post(Fn&& fn)
287 : : {
288 : 18 : const std::unique_lock<std::mutex> lock(m_mutex);
289 [ - + ]: 18 : assert(!m_fn);
290 [ + - ]: 18 : m_fn = std::forward<Fn>(fn);
291 [ + - ]: 18 : m_cv.notify_all();
292 : 18 : }
293 : :
294 : : template <class Predicate>
295 : 36 : void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
296 : : {
297 : 96 : m_cv.wait(lock, [&] {
298 : : // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
299 : : // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
300 : : // after the fn() call and before the lock.lock() call in this loop
301 : : // in the case where a capnp response is sent and a brand new
302 : : // request is immediately received.
303 [ - + - + : 108 : while (m_fn) {
- + - + -
+ - + ]
[ + + ][ - +
- + # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
+ - + ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
304 : 18 : auto fn = std::move(*m_fn);
305 : 18 : m_fn.reset();
306 [ # # # # : 18 : Unlock(lock, fn);
# # # # #
# # # ]
[ + - ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
307 : : }
308 : 90 : const bool done = pred();
309 : 90 : return done;
310 : : });
311 : 36 : }
312 : :
313 : : //! Mutex mainly used internally by waiter class, but also used externally
314 : : //! to guard access to related state. Specifically, since the thread_local
315 : : //! ThreadContext struct owns a Waiter, the Waiter::m_mutex is used to guard
316 : : //! access to other parts of the struct to avoid needing to deal with more
317 : : //! mutexes than necessary. This mutex can be held at the same time as
318 : : //! EventLoop::m_mutex as long as Waiter::mutex is locked first and
319 : : //! EventLoop::m_mutex is locked second.
320 : : std::mutex m_mutex;
321 : : std::condition_variable m_cv;
322 : : std::optional<kj::Function<void()>> m_fn;
323 : : };
324 : :
325 : : //! Object holding network & rpc state associated with either an incoming server
326 : : //! connection, or an outgoing client connection. It must be created and destroyed
327 : : //! on the event loop thread.
328 : : //! In addition to Cap'n Proto state, it also holds lists of callbacks to run
329 : : //! when the connection is closed.
330 : : class Connection
331 : : {
332 : : public:
333 : 7 : Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
334 : 7 : : m_loop(loop), m_stream(kj::mv(stream_)),
335 [ + - + - ]: 7 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
336 [ + - + - : 21 : m_rpc_system(::capnp::makeRpcClient(m_network)) {}
+ - + - ]
337 : 7 : Connection(EventLoop& loop,
338 : : kj::Own<kj::AsyncIoStream>&& stream_,
339 : : const std::function<::capnp::Capability::Client(Connection&)>& make_client)
340 : 7 : : m_loop(loop), m_stream(kj::mv(stream_)),
341 [ + - + - ]: 7 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
342 [ + - + - : 21 : m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
+ - + - +
- + - ]
343 : :
344 : : //! Run cleanup functions. Must be called from the event loop thread. First
345 : : //! calls synchronous cleanup functions while blocked (to free capnp
346 : : //! Capability::Client handles owned by ProxyClient objects), then schedules
347 : : //! asynchronous cleanup functions to run in a worker thread (to run
348 : : //! destructors of m_impl instances owned by ProxyServer objects).
349 : : ~Connection();
350 : :
351 : : //! Register synchronous cleanup function to run on event loop thread (with
352 : : //! access to capnp thread local variables) when disconnect() is called.
353 : : //! any new i/o.
354 : : CleanupIt addSyncCleanup(std::function<void()> fn);
355 : : void removeSyncCleanup(CleanupIt it);
356 : :
357 : : //! Add disconnect handler.
358 : : template <typename F>
359 : 13 : void onDisconnect(F&& f)
360 : : {
361 : : // Add disconnect handler to local TaskSet to ensure it is cancelled and
362 : : // will never run after connection object is destroyed. But when disconnect
363 : : // handler fires, do not call the function f right away, instead add it
364 : : // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
365 : : // error in cases where f deletes this Connection object.
366 [ + - - + ]: 39 : m_on_disconnect.add(m_network.onDisconnect().then(
367 [ + - + - ]: 27 : [f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
[ + - ]
368 : 13 : }
369 : :
370 : : EventLoopRef m_loop;
371 : : kj::Own<kj::AsyncIoStream> m_stream;
372 : : LoggingErrorHandler m_error_handler{*m_loop};
373 : : kj::TaskSet m_on_disconnect{m_error_handler};
374 : : ::capnp::TwoPartyVatNetwork m_network;
375 : : std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
376 : :
377 : : // ThreadMap interface client, used to create a remote server thread when an
378 : : // client IPC call is being made for the first time from a new thread.
379 : : ThreadMap::Client m_thread_map{nullptr};
380 : :
381 : : //! Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by
382 : : //! ThreadMap.makeThread) used to service requests to clients.
383 : : ::capnp::CapabilityServerSet<Thread> m_threads;
384 : :
385 : : //! Cleanup functions to run if connection is broken unexpectedly. List
386 : : //! will be empty if all ProxyClient are destroyed cleanly before the
387 : : //! connection is destroyed.
388 : : CleanupList m_sync_cleanup_fns;
389 : : };
390 : :
391 : : //! Vat id for server side of connection. Required argument to RpcSystem::bootStrap()
392 : : //!
393 : : //! "Vat" is Cap'n Proto nomenclature for a host of various objects that facilitates
394 : : //! bidirectional communication with other vats; it is often but not always 1-1 with
395 : : //! processes. Cap'n Proto doesn't reference clients or servers per se; instead everything
396 : : //! is just a vat.
397 : : //!
398 : : //! See also: https://github.com/capnproto/capnproto/blob/9021f0c722b36cb11e3690b0860939255ebad39c/c%2B%2B/src/capnp/rpc.capnp#L42-L56
399 [ + - + - ]: 1 : struct ServerVatId
400 : : {
401 : : ::capnp::word scratch[4]{};
402 : : ::capnp::MallocMessageBuilder message{scratch};
403 : : ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
404 [ + - ]: 7 : ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
405 : : };
406 : :
407 : : template <typename Interface, typename Impl>
408 : 43 : ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client client,
409 : : Connection* connection,
410 : : bool destroy_connection)
411 [ + - ]: 43 : : m_client(std::move(client)), m_context(connection)
412 : :
413 : : {
414 : : // Handler for the connection getting destroyed before this client object.
415 [ + - ]: 43 : auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
416 : : // Release client capability by move-assigning to temporary.
417 : : {
418 : 0 : typename Interface::Client(std::move(m_client));
419 : : }
420 : 0 : Lock lock{m_context.loop->m_mutex};
421 : 0 : m_context.connection = nullptr;
422 : 0 : });
423 : :
424 : : // Two shutdown sequences are supported:
425 : : //
426 : : // - A normal sequence where client proxy objects are deleted by external
427 : : // code that no longer needs them
428 : : //
429 : : // - A garbage collection sequence where the connection or event loop shuts
430 : : // down while external code is still holding client references.
431 : : //
432 : : // The first case is handled here when m_context.connection is not null. The
433 : : // second case is handled by the disconnect_cb function, which sets
434 : : // m_context.connection to null so nothing happens here.
435 [ + - ]: 55 : m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
436 : : {
437 : : // If the capnp interface defines a destroy method, call it to destroy
438 : : // the remote object, waiting for it to be deleted server side. If the
439 : : // capnp interface does not define a destroy method, this will just call
440 : : // an empty stub defined in the ProxyClientBase class and do nothing.
441 : 6 : Sub::destroy(*this);
442 : :
443 : : // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
444 : 86 : m_context.loop->sync([&]() {
445 : : // Remove disconnect callback on cleanup so it doesn't run and try
446 : : // to access this object after it's destroyed. This call needs to
447 : : // run inside loop->sync() on the event loop thread because
448 : : // otherwise, if there were an ill-timed disconnect, the
449 : : // onDisconnect handler could fire and delete the Connection object
450 : : // before the removeSyncCleanup call.
451 [ + - ][ - - : 43 : if (m_context.connection) m_context.connection->removeSyncCleanup(disconnect_cb);
+ - # # ]
[ # # # # ]
452 : :
453 : : // Release client capability by move-assigning to temporary.
454 : : {
455 : 43 : typename Interface::Client(std::move(m_client));
456 : : }
457 [ + + ][ - - : 43 : if (destroy_connection) {
- + # # ]
[ # # # # ]
458 [ + - + - ]: 7 : delete m_context.connection;
[ # # # #
# # # # #
# # # ][ #
# # # # #
# # ]
459 : 7 : m_context.connection = nullptr;
460 : : }
461 : : });
462 : : }
463 : : });
464 [ + - ]: 43 : Sub::construct(*this);
465 : 43 : }
466 : :
467 : : template <typename Interface, typename Impl>
468 : 43 : ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
469 : : {
470 : 43 : CleanupRun(m_context.cleanup_fns);
471 : 43 : }
472 : :
473 : : template <typename Interface, typename Impl>
474 : 13 : ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
475 [ + - ]: 13 : : m_impl(std::move(impl)), m_context(&connection)
476 : : {
477 [ - + ]: 13 : assert(m_impl);
478 [ - - ]: 13 : }
479 : :
480 : : //! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
481 : : //! garbage collection code after there are no more references to this object.
482 : : //! This will typically happen when the corresponding ProxyClient object on the
483 : : //! other side of the connection is destroyed. It can also happen earlier if the
484 : : //! connection is broken or destroyed. In the latter case this destructor will
485 : : //! typically be called inside m_rpc_system.reset() call in the ~Connection
486 : : //! destructor while the Connection object still exists. However, because
487 : : //! ProxyServer objects are refcounted, and the Connection object could be
488 : : //! destroyed while asynchronous IPC calls are still in-flight, it's possible
489 : : //! for this destructor to be called after the Connection object no longer
490 : : //! exists, so it is NOT valid to dereference the m_context.connection pointer
491 : : //! from this function.
492 : : template <typename Interface, typename Impl>
493 : 13 : ProxyServerBase<Interface, Impl>::~ProxyServerBase()
494 : : {
495 : 13 : if (m_impl) {
496 : : // If impl is non-null at this point, it means no client is waiting for
497 : : // the m_impl server object to be destroyed synchronously. This can
498 : : // happen either if the interface did not define a "destroy" method (see
499 : : // invokeDestroy method below), or if a destroy method was defined, but
500 : : // the connection was broken before it could be called.
501 : : //
502 : : // In either case, be conservative and run the cleanup on an
503 : : // asynchronous thread, to avoid destructors or cleanup functions
504 : : // blocking or deadlocking the current EventLoop thread, since they
505 : : // could be making IPC calls.
506 : : //
507 : : // Technically this is a little too conservative since if the interface
508 : : // defines a "destroy" method, but the destroy method does not accept a
509 : : // Context parameter specifying a worker thread, the cleanup method
510 : : // would run on the EventLoop thread normally (when connection is
511 : : // unbroken), but will not run on the EventLoop thread now (when
512 : : // connection is broken). Probably some refactoring of the destructor
513 : : // and invokeDestroy function is possible to make this cleaner and more
514 : : // consistent.
515 : 7 : m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
516 : 7 : impl.reset();
517 : 7 : CleanupRun(fns);
518 : : });
519 : : }
520 [ - + ]: 13 : assert(m_context.cleanup_fns.empty());
521 [ + + - + ]: 26 : }
522 : :
523 : : //! If the capnp interface defined a special "destroy" method, as described the
524 : : //! ProxyClientBase class, this method will be called and synchronously destroy
525 : : //! m_impl before returning to the client.
526 : : //!
527 : : //! If the capnp interface does not define a "destroy" method, this will never
528 : : //! be called and the ~ProxyServerBase destructor will be responsible for
529 : : //! deleting m_impl asynchronously, whenever the ProxyServer object gets garbage
530 : : //! collected by Cap'n Proto.
531 : : //!
532 : : //! This method is called in the same way other proxy server methods are called,
533 : : //! via the serverInvoke function. Basically serverInvoke just calls this as a
534 : : //! substitute for a non-existent m_impl->destroy() method. If the destroy
535 : : //! method has any parameters or return values they will be handled in the
536 : : //! normal way by PassField/ReadField/BuildField functions. Particularly if a
537 : : //! Context.thread parameter was passed, this method will run on the worker
538 : : //! thread specified by the client. Otherwise it will run on the EventLoop
539 : : //! thread, like other server methods without an assigned thread.
540 : : template <typename Interface, typename Impl>
541 : 6 : void ProxyServerBase<Interface, Impl>::invokeDestroy()
542 : : {
543 : 6 : m_impl.reset();
544 : 6 : CleanupRun(m_context.cleanup_fns);
545 : 6 : }
546 : :
547 : : using ConnThreads = std::map<Connection*, ProxyClient<Thread>>;
548 : : using ConnThread = ConnThreads::iterator;
549 : :
550 : : // Retrieve ProxyClient<Thread> object associated with this connection from a
551 : : // map, or create a new one and insert it into the map. Return map iterator and
552 : : // inserted bool.
553 : : std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
554 : :
555 : : struct ThreadContext
556 : : {
557 : : //! Identifying string for debug.
558 : : std::string thread_name;
559 : :
560 : : //! Waiter object used to allow client threads blocked waiting for a server
561 : : //! response to execute callbacks made from the client's corresponding
562 : : //! server thread.
563 : : std::unique_ptr<Waiter> waiter = nullptr;
564 : :
565 : : //! When client is making a request to a server, this is the
566 : : //! `callbackThread` argument it passes in the request, used by the server
567 : : //! in case it needs to make callbacks into the client that need to execute
568 : : //! while the client is waiting. This will be set to a local thread object.
569 : : ConnThreads callback_threads;
570 : :
571 : : //! When client is making a request to a server, this is the `thread`
572 : : //! argument it passes in the request, used to control which thread on
573 : : //! server will be responsible for executing it. If client call is being
574 : : //! made from a local thread, this will be a remote thread object returned
575 : : //! by makeThread. If a client call is being made from a thread currently
576 : : //! handling a server request, this will be set to the `callbackThread`
577 : : //! request thread argument passed in that request.
578 : : ConnThreads request_threads;
579 : :
580 : : //! Whether this thread is a capnp event loop thread. Not really used except
581 : : //! to assert false if there's an attempt to execute a blocking operation
582 : : //! which could deadlock the thread.
583 : : bool loop_thread = false;
584 : : };
585 : :
586 : : //! Given stream file descriptor, make a new ProxyClient object to send requests
587 : : //! over the stream. Also create a new Connection object embedded in the
588 : : //! client that is freed when the client is closed.
589 : : template <typename InitInterface>
590 : 6 : std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
591 : : {
592 : 6 : typename InitInterface::Client init_client(nullptr);
593 : 6 : std::unique_ptr<Connection> connection;
594 [ + - ]: 6 : loop.sync([&] {
595 : 6 : auto stream =
596 : 6 : loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
597 [ + - ]: 6 : connection = std::make_unique<Connection>(loop, kj::mv(stream));
598 [ + - + - : 12 : init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
+ - + - +
- + - +
- ]
599 [ + - ]: 6 : Connection* connection_ptr = connection.get();
600 [ + - ]: 6 : connection->onDisconnect([&loop, connection_ptr] {
601 [ # # ]: 0 : loop.log() << "IPC client: unexpected network disconnect.";
602 [ # # # # ]: 0 : delete connection_ptr;
603 : : });
604 : 6 : });
605 : : return std::make_unique<ProxyClient<InitInterface>>(
606 [ + - ]: 6 : kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
607 [ + - ]: 6 : }
608 : :
609 : : //! Given stream and init objects, construct a new ProxyServer object that
610 : : //! handles requests from the stream by calling the init object. Embed the
611 : : //! ProxyServer in a Connection object that is stored and erased if
612 : : //! disconnected. This should be called from the event loop thread.
613 : : template <typename InitInterface, typename InitImpl>
614 : 6 : void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
615 : : {
616 : 6 : loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
617 : : // Disable deleter so proxy server object doesn't attempt to delete the
618 : : // init implementation when the proxy client is destroyed or
619 : : // disconnected.
620 [ + - - + ]: 6 : return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
621 : : });
622 : 6 : auto it = loop.m_incoming_connections.begin();
623 : 12 : it->onDisconnect([&loop, it] {
624 [ + - ]: 6 : loop.log() << "IPC server: socket disconnected.";
625 : 6 : loop.m_incoming_connections.erase(it);
626 : : });
627 : 6 : }
628 : :
629 : : //! Given connection receiver and an init object, handle incoming connections by
630 : : //! calling _Serve, to create ProxyServer objects and forward requests to the
631 : : //! init object.
632 : : template <typename InitInterface, typename InitImpl>
633 : 7 : void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
634 : : {
635 : 7 : auto* ptr = listener.get();
636 [ + - ]: 14 : loop.m_task_set->add(ptr->accept().then(
637 [ + - + - : 21 : [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
+ - - + ]
638 : 5 : _Serve<InitInterface>(loop, kj::mv(stream), init);
639 : 5 : _Listen<InitInterface>(loop, kj::mv(listener), init);
640 : : }));
641 : 7 : }
642 : :
643 : : //! Given stream file descriptor and an init object, handle requests on the
644 : : //! stream by calling methods on the Init object.
645 : : template <typename InitInterface, typename InitImpl>
646 : 1 : void ServeStream(EventLoop& loop, int fd, InitImpl& init)
647 : : {
648 [ + - ]: 1 : _Serve<InitInterface>(
649 : 1 : loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
650 : 1 : }
651 : :
652 : : //! Given listening socket file descriptor and an init object, handle incoming
653 : : //! connections and requests by calling methods on the Init object.
654 : : template <typename InitInterface, typename InitImpl>
655 : 2 : void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
656 : : {
657 : 2 : loop.sync([&]() {
658 [ + - ]: 2 : _Listen<InitInterface>(loop,
659 : 4 : loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
660 : : init);
661 : : });
662 : 2 : }
663 : :
664 [ - - - - : 84 : extern thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - ][ - -
- - - - -
- - - - -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ + - # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- - - - -
- - ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ]
665 : : // Silence nonstandard bitcoin tidy error "Variable with non-trivial destructor
666 : : // cannot be thread_local" which should not be a problem on modern platforms, and
667 : : // could lead to a small memory leak at worst on older ones.
668 : :
669 : : } // namespace mp
670 : :
671 : : #endif // MP_PROXY_IO_H
|