Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef MP_PROXY_IO_H
6 : : #define MP_PROXY_IO_H
7 : :
8 : : #include <mp/proxy.h>
9 : : #include <mp/util.h>
10 : :
11 : : #include <mp/proxy.capnp.h>
12 : :
13 : : #include <capnp/rpc-twoparty.h>
14 : :
15 : : #include <assert.h>
16 : : #include <condition_variable>
17 : : #include <functional>
18 : : #include <kj/function.h>
19 : : #include <map>
20 : : #include <memory>
21 : : #include <optional>
22 : : #include <sstream>
23 : : #include <string>
24 : : #include <thread>
25 : :
26 : : namespace mp {
27 : : struct ThreadContext;
28 : :
29 : : struct InvokeContext
30 : : {
31 : : Connection& connection;
32 : : };
33 : :
34 : : struct ClientInvokeContext : InvokeContext
35 : : {
36 : : ThreadContext& thread_context;
37 : 30 : ClientInvokeContext(Connection& conn, ThreadContext& thread_context)
38 [ + - + - : 30 : : InvokeContext{conn}, thread_context{thread_context}
+ - + - +
- ][ + - +
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- + - +
- ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
39 : : {
40 : : }
41 : : };
42 : :
43 : : template <typename ProxyServer, typename CallContext_>
44 : : struct ServerInvokeContext : InvokeContext
45 : : {
46 : : using CallContext = CallContext_;
47 : :
48 : : ProxyServer& proxy_server;
49 : : CallContext& call_context;
50 : : int req;
51 : :
52 : 48 : ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
53 [ + - + - : 30 : : InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
+ - + - +
- + - ][ +
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - +
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # ]
54 : : {
55 : : }
56 : : };
57 : :
58 : : template <typename Interface, typename Params, typename Results>
59 : : using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
60 : :
61 : : template <>
62 : : struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
63 : : {
64 [ + - ]: 30 : using ProxyClientBase::ProxyClientBase;
65 : : // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
66 : : ProxyClient(const ProxyClient&) = delete;
67 : : ~ProxyClient();
68 : :
69 : : void setDisconnectCallback(const std::function<void()>& fn);
70 : :
71 : : //! Reference to callback function that is run if there is a sudden
72 : : //! disconnect and the Connection object is destroyed before this
73 : : //! ProxyClient<Thread> object. The callback will destroy this object and
74 : : //! remove its entry from the thread's request_threads or callback_threads
75 : : //! map. It will also reset m_disconnect_cb so the destructor does not
76 : : //! access it. In the normal case where there is no sudden disconnect, the
77 : : //! destructor will unregister m_disconnect_cb so the callback is never run.
78 : : //! Since this variable is accessed from multiple threads, accesses should
79 : : //! be guarded with the associated Waiter::m_mutex.
80 : : std::optional<CleanupIt> m_disconnect_cb;
81 : : };
82 : :
83 : : template <>
84 : : struct ProxyServer<Thread> final : public Thread::Server
85 : : {
86 : : public:
87 : : ProxyServer(ThreadContext& thread_context, std::thread&& thread);
88 : : ~ProxyServer();
89 : : kj::Promise<void> getName(GetNameContext context) override;
90 : : ThreadContext& m_thread_context;
91 : : std::thread m_thread;
92 : : };
93 : :
94 : : //! Handler for kj::TaskSet failed task events.
95 : : class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
96 : : {
97 : : public:
98 [ + - ]: 17 : LoggingErrorHandler(EventLoop& loop) : m_loop(loop) {}
99 : : void taskFailed(kj::Exception&& exception) override;
100 : : EventLoop& m_loop;
101 : : };
102 : :
103 : : using LogFn = std::function<void(bool raise, std::string message)>;
104 : :
105 : : class Logger
106 : : {
107 : : public:
108 : 177 : Logger(bool raise, LogFn& fn) : m_raise(raise), m_fn(fn) {}
109 : : Logger(Logger&& logger) : m_raise(logger.m_raise), m_fn(logger.m_fn), m_buffer(std::move(logger.m_buffer)) {}
110 : 177 : ~Logger() noexcept(false)
111 : : {
112 [ + - + - : 354 : if (m_fn) m_fn(m_raise, m_buffer.str());
+ - ]
113 : 177 : }
114 : :
115 : : template <typename T>
116 : 1225 : friend Logger& operator<<(Logger& logger, T&& value)
117 : : {
118 [ + - + - : 949 : if (logger.m_fn) logger.m_buffer << std::forward<T>(value);
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ + -
+ - + - +
- + - + -
+ - + - +
- - - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - - - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- - - + -
+ - + - +
- + - + -
+ - + - +
- - - ][ +
- - - - -
- - - - -
- - - - -
+ - + - +
- + - + -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
+ - - - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
- - + - +
- + - + -
+ - + - +
- + - + -
- - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ + - -
+ - - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - ]
[ - - # # ]
[ - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ + - + -
+ - + - +
- - - - -
- - + - +
- + - + -
+ - - - -
- - - + -
+ - + - +
- + - + -
+ - + - +
- - - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- + - + -
+ - + - +
- + - + -
+ - + - -
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - +
- ]
119 : 476 : return logger;
120 : : }
121 : :
122 : : template <typename T>
123 [ + - + - : 177 : friend Logger& operator<<(Logger&& logger, T&& value)
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - +
- - - + -
+ - - - +
- + - - -
+ - + - -
- + - + -
- - + - +
- - - +
- ][ - - -
- + - - -
- - - - -
- - - + -
- - + - +
- - - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- - - + -
+ - - - -
- + - + -
- - - - +
- + - - -
- - + - +
- - - - -
+ - + - +
- - - + -
+ - ][ + - ]
[ - - - -
- - + - +
- - - - -
+ - + - ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ + - - -
- - + - -
- + - - -
+ - + - -
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
- - - - -
- - - - -
- + - + -
- - - - +
- + - ]
124 : : {
125 [ - + + - : 177 : return logger << std::forward<T>(value);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- - - + -
+ - - - -
- - - + -
+ - - - -
- - - + -
+ - - - -
- - - + -
+ - - - -
- - - + -
+ - + - -
- - - + -
+ - ][ - -
- - + - +
- + - - -
+ - + - +
- + - ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ +
- - - + -
- - + - +
- + - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- - - - -
- - - - -
- - - + -
+ - - - -
- - - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# ]
126 : : }
127 : :
128 : : bool m_raise;
129 : : LogFn& m_fn;
130 : : std::ostringstream m_buffer;
131 : : };
132 : :
133 : : std::string LongThreadName(const char* exe_name);
134 : :
135 : : //! Event loop implementation.
136 : : //!
137 : : //! Cap'n Proto threading model is very simple: all I/O operations are
138 : : //! asynchronous and must be performed on a single thread. This includes:
139 : : //!
140 : : //! - Code starting an asynchronous operation (calling a function that returns a
141 : : //! promise object)
142 : : //! - Code notifying that an asynchronous operation is complete (code using a
143 : : //! fulfiller object)
144 : : //! - Code handling a completed operation (code chaining or waiting for a promise)
145 : : //!
146 : : //! All of this code needs to access shared state, and there is no mutex that
147 : : //! can be acquired to lock this state because Cap'n Proto
148 : : //! assumes it will only be accessed from one thread. So all this code needs to
149 : : //! actually run on one thread, and the EventLoop::loop() method is the entry point for
150 : : //! this thread. ProxyClient and ProxyServer objects that use other threads and
151 : : //! need to perform I/O operations post to this thread using EventLoop::post()
152 : : //! and EventLoop::sync() methods.
153 : : //!
154 : : //! Specifically, because ProxyClient methods can be called from arbitrary
155 : : //! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
156 : : //! methods use the EventLoop thread to send requests, and ProxyServer methods
157 : : //! use the thread to return results.
158 : : //!
159 : : //! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
160 : : class EventLoop
161 : : {
162 : : public:
163 : : //! Construct event loop object.
164 : : EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr);
165 : : ~EventLoop();
166 : :
167 : : //! Run event loop. Does not return until shutdown. This should only be
168 : : //! called once from the m_thread_id thread. This will block until
169 : : //! the m_num_clients reference count is 0.
170 : : void loop();
171 : :
172 : : //! Run function on event loop thread. Does not return until function completes.
173 : : //! Must be called while the loop() function is active.
174 : : void post(kj::Function<void()> fn);
175 : :
176 : : //! Wrapper around EventLoop::post that takes advantage of the
177 : : //! fact that callable will not go out of scope to avoid requirement that it
178 : : //! be copyable.
179 : : template <typename Callable>
180 : 99 : void sync(Callable&& callable)
181 : : {
182 [ + - ]: 99 : post(std::forward<Callable>(callable));
183 : 99 : }
184 : :
185 : : //! Register cleanup function to run on asynchronous worker thread without
186 : : //! blocking the event loop thread.
187 : : void addAsyncCleanup(std::function<void()> fn);
188 : :
189 : : //! Start asynchronous worker thread if necessary. This is only done if
190 : : //! there are ProxyServerBase::m_impl objects that need to be destroyed
191 : : //! asynchronously, without tying up the event loop thread. This can happen
192 : : //! when an interface does not declare a destroy() method that would allow
193 : : //! the client to wait for the destructor to finish and run it on a
194 : : //! dedicated thread. It can also happen whenever this is a broken
195 : : //! connection and the client is no longer around to call the destructors
196 : : //! and the server objects need to be garbage collected. In both cases, it
197 : : //! is important that ProxyServer::m_impl destructors do not run on the
198 : : //! eventloop thread because they may need it to do I/O if they perform
199 : : //! other IPC calls.
200 : : void startAsyncThread() MP_REQUIRES(m_mutex);
201 : :
202 : : //! Check if loop should exit.
203 : : bool done() const MP_REQUIRES(m_mutex);
204 : :
205 : 116 : Logger log()
206 : : {
207 : 116 : Logger logger(false, m_log_fn);
208 [ + - + - : 232 : logger << "{" << LongThreadName(m_exe_name) << "} ";
+ - + - ]
209 : 116 : return logger;
210 : 0 : }
211 [ - - - - : 61 : Logger logPlain() { return {false, m_log_fn}; }
+ - + - +
- - - - -
+ - + - +
- - - - -
+ - + - +
- - - - -
+ - + - +
- - - - -
+ - + - +
- + - - -
+ - + - +
- ][ - - -
- + - + -
+ - - - -
- + - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- - - + -
+ - + - -
- - - + -
+ - + - ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
212 [ # # # # : 0 : Logger raise() { return {true, m_log_fn}; }
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
213 : :
214 : : //! Process name included in thread names so combined debug output from
215 : : //! multiple processes is easier to understand.
216 : : const char* m_exe_name;
217 : :
218 : : //! ID of the event loop thread
219 : : std::thread::id m_thread_id = std::this_thread::get_id();
220 : :
221 : : //! Handle of an async worker thread. Joined on destruction. Unset if async
222 : : //! method has not been called.
223 : : std::thread m_async_thread;
224 : :
225 : : //! Callback function to run on event loop thread during post() or sync() call.
226 : : kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
227 : :
228 : : //! Callback functions to run on async thread.
229 : : std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
230 : :
231 : : //! Pipe read handle used to wake up the event loop thread.
232 : : int m_wait_fd = -1;
233 : :
234 : : //! Pipe write handle used to wake up the event loop thread.
235 : : int m_post_fd = -1;
236 : :
237 : : //! Number of clients holding references to ProxyServerBase objects that
238 : : //! reference this event loop.
239 : : int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
240 : :
241 : : //! Mutex and condition variable used to post tasks to event loop and async
242 : : //! thread.
243 : : Mutex m_mutex;
244 : : std::condition_variable m_cv;
245 : :
246 : : //! Capnp IO context.
247 : : kj::AsyncIoContext m_io_context;
248 : :
249 : : //! Capnp error handler. Needs to outlive m_task_set.
250 : : LoggingErrorHandler m_error_handler{*this};
251 : :
252 : : //! Capnp list of pending promises.
253 : : std::unique_ptr<kj::TaskSet> m_task_set;
254 : :
255 : : //! List of connections.
256 : : std::list<Connection> m_incoming_connections;
257 : :
258 : : //! External logging callback.
259 : : LogFn m_log_fn;
260 : :
261 : : //! External context pointer.
262 : : void* m_context;
263 : : };
264 : :
265 : : //! Single element task queue used to handle recursive capnp calls. (If server
266 : : //! makes an callback into the client in the middle of a request, while client
267 : : //! thread is blocked waiting for server response, this is what allows the
268 : : //! client to run the request in the same thread, the same way code would run in
269 : : //! single process, with the callback sharing same thread stack as the original
270 : : //! call.
271 : 7 : struct Waiter
272 : : {
273 : 7 : Waiter() = default;
274 : :
275 : : template <typename Fn>
276 : 18 : void post(Fn&& fn)
277 : : {
278 : 18 : const std::unique_lock<std::mutex> lock(m_mutex);
279 [ - + ]: 18 : assert(!m_fn);
280 [ + - ]: 18 : m_fn = std::forward<Fn>(fn);
281 [ + - ]: 18 : m_cv.notify_all();
282 : 18 : }
283 : :
284 : : template <class Predicate>
285 : 36 : void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
286 : : {
287 : 96 : m_cv.wait(lock, [&] {
288 : : // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
289 : : // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
290 : : // after the fn() call and before the lock.lock() call in this loop
291 : : // in the case where a capnp response is sent and a brand new
292 : : // request is immediately received.
293 [ - + - + : 108 : while (m_fn) {
- + - + -
+ - + ]
[ + + ][ - +
- + # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
+ - + ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
294 : 18 : auto fn = std::move(*m_fn);
295 : 18 : m_fn.reset();
296 [ # # # # : 18 : Unlock(lock, fn);
# # # # #
# # # ]
[ + - ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
297 : : }
298 : 90 : const bool done = pred();
299 : 90 : return done;
300 : : });
301 : 36 : }
302 : :
303 : : //! Mutex mainly used internally by waiter class, but also used externally
304 : : //! to guard access to related state. Specifically, since the thread_local
305 : : //! ThreadContext struct owns a Waiter, the Waiter::m_mutex is used to guard
306 : : //! access to other parts of the struct to avoid needing to deal with more
307 : : //! mutexes than necessary. This mutex can be held at the same time as
308 : : //! EventLoop::m_mutex as long as Waiter::mutex is locked first and
309 : : //! EventLoop::m_mutex is locked second.
310 : : std::mutex m_mutex;
311 : : std::condition_variable m_cv;
312 : : std::optional<kj::Function<void()>> m_fn;
313 : : };
314 : :
315 : : //! Object holding network & rpc state associated with either an incoming server
316 : : //! connection, or an outgoing client connection. It must be created and destroyed
317 : : //! on the event loop thread.
318 : : //! In addition to Cap'n Proto state, it also holds lists of callbacks to run
319 : : //! when the connection is closed.
320 : : class Connection
321 : : {
322 : : public:
323 : 7 : Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
324 : 7 : : m_loop(loop), m_stream(kj::mv(stream_)),
325 [ + - + - ]: 7 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
326 [ + - + - : 21 : m_rpc_system(::capnp::makeRpcClient(m_network)) {}
+ - + - ]
327 : 7 : Connection(EventLoop& loop,
328 : : kj::Own<kj::AsyncIoStream>&& stream_,
329 : : const std::function<::capnp::Capability::Client(Connection&)>& make_client)
330 : 7 : : m_loop(loop), m_stream(kj::mv(stream_)),
331 [ + - + - ]: 7 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
332 [ + - + - : 21 : m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
+ - + - +
- + - ]
333 : :
334 : : //! Run cleanup functions. Must be called from the event loop thread. First
335 : : //! calls synchronous cleanup functions while blocked (to free capnp
336 : : //! Capability::Client handles owned by ProxyClient objects), then schedules
337 : : //! asynchronous cleanup functions to run in a worker thread (to run
338 : : //! destructors of m_impl instances owned by ProxyServer objects).
339 : : ~Connection();
340 : :
341 : : //! Register synchronous cleanup function to run on event loop thread (with
342 : : //! access to capnp thread local variables) when disconnect() is called.
343 : : //! any new i/o.
344 : : CleanupIt addSyncCleanup(std::function<void()> fn);
345 : : void removeSyncCleanup(CleanupIt it);
346 : :
347 : : //! Add disconnect handler.
348 : : template <typename F>
349 : 13 : void onDisconnect(F&& f)
350 : : {
351 : : // Add disconnect handler to local TaskSet to ensure it is cancelled and
352 : : // will never run after connection object is destroyed. But when disconnect
353 : : // handler fires, do not call the function f right away, instead add it
354 : : // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
355 : : // error in cases where f deletes this Connection object.
356 [ + - - + ]: 39 : m_on_disconnect.add(m_network.onDisconnect().then(
357 [ + - + - ]: 27 : [f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
[ + - ]
358 : 13 : }
359 : :
360 : : EventLoopRef m_loop;
361 : : kj::Own<kj::AsyncIoStream> m_stream;
362 : : LoggingErrorHandler m_error_handler{*m_loop};
363 : : kj::TaskSet m_on_disconnect{m_error_handler};
364 : : ::capnp::TwoPartyVatNetwork m_network;
365 : : std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
366 : :
367 : : // ThreadMap interface client, used to create a remote server thread when an
368 : : // client IPC call is being made for the first time from a new thread.
369 : : ThreadMap::Client m_thread_map{nullptr};
370 : :
371 : : //! Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by
372 : : //! ThreadMap.makeThread) used to service requests to clients.
373 : : ::capnp::CapabilityServerSet<Thread> m_threads;
374 : :
375 : : //! Cleanup functions to run if connection is broken unexpectedly. List
376 : : //! will be empty if all ProxyClient are destroyed cleanly before the
377 : : //! connection is destroyed.
378 : : CleanupList m_sync_cleanup_fns;
379 : : };
380 : :
381 : : //! Vat id for server side of connection. Required argument to RpcSystem::bootStrap()
382 : : //!
383 : : //! "Vat" is Cap'n Proto nomenclature for a host of various objects that facilitates
384 : : //! bidirectional communication with other vats; it is often but not always 1-1 with
385 : : //! processes. Cap'n Proto doesn't reference clients or servers per se; instead everything
386 : : //! is just a vat.
387 : : //!
388 : : //! See also: https://github.com/capnproto/capnproto/blob/9021f0c722b36cb11e3690b0860939255ebad39c/c%2B%2B/src/capnp/rpc.capnp#L42-L56
389 [ + - + - ]: 1 : struct ServerVatId
390 : : {
391 : : ::capnp::word scratch[4]{};
392 : : ::capnp::MallocMessageBuilder message{scratch};
393 : : ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
394 [ + - ]: 7 : ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
395 : : };
396 : :
397 : : template <typename Interface, typename Impl>
398 : 43 : ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client client,
399 : : Connection* connection,
400 : : bool destroy_connection)
401 [ + - ]: 43 : : m_client(std::move(client)), m_context(connection)
402 : :
403 : : {
404 : : // Handler for the connection getting destroyed before this client object.
405 [ + - ]: 43 : auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
406 : : // Release client capability by move-assigning to temporary.
407 : : {
408 : 0 : typename Interface::Client(std::move(m_client));
409 : : }
410 : 0 : Lock lock{m_context.loop->m_mutex};
411 : 0 : m_context.connection = nullptr;
412 : 0 : });
413 : :
414 : : // Two shutdown sequences are supported:
415 : : //
416 : : // - A normal sequence where client proxy objects are deleted by external
417 : : // code that no longer needs them
418 : : //
419 : : // - A garbage collection sequence where the connection or event loop shuts
420 : : // down while external code is still holding client references.
421 : : //
422 : : // The first case is handled here when m_context.connection is not null. The
423 : : // second case is handled by the disconnect_cb function, which sets
424 : : // m_context.connection to null so nothing happens here.
425 [ + - ]: 55 : m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
426 : : {
427 : : // If the capnp interface defines a destroy method, call it to destroy
428 : : // the remote object, waiting for it to be deleted server side. If the
429 : : // capnp interface does not define a destroy method, this will just call
430 : : // an empty stub defined in the ProxyClientBase class and do nothing.
431 : 6 : Sub::destroy(*this);
432 : :
433 : : // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
434 : 86 : m_context.loop->sync([&]() {
435 : : // Remove disconnect callback on cleanup so it doesn't run and try
436 : : // to access this object after it's destroyed. This call needs to
437 : : // run inside loop->sync() on the event loop thread because
438 : : // otherwise, if there were an ill-timed disconnect, the
439 : : // onDisconnect handler could fire and delete the Connection object
440 : : // before the removeSyncCleanup call.
441 [ + - ][ - - : 43 : if (m_context.connection) m_context.connection->removeSyncCleanup(disconnect_cb);
+ - # # ]
[ # # # # ]
442 : :
443 : : // Release client capability by move-assigning to temporary.
444 : : {
445 : 43 : typename Interface::Client(std::move(m_client));
446 : : }
447 [ + + ][ - - : 43 : if (destroy_connection) {
- + # # ]
[ # # # # ]
448 [ + - + - ]: 7 : delete m_context.connection;
[ # # # #
# # # # #
# # # ][ #
# # # # #
# # ]
449 : 7 : m_context.connection = nullptr;
450 : : }
451 : : });
452 : : }
453 : : });
454 [ + - ]: 43 : Sub::construct(*this);
455 : 43 : }
456 : :
457 : : template <typename Interface, typename Impl>
458 : 43 : ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
459 : : {
460 : 43 : CleanupRun(m_context.cleanup_fns);
461 : 43 : }
462 : :
463 : : template <typename Interface, typename Impl>
464 : 13 : ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
465 [ + - ]: 13 : : m_impl(std::move(impl)), m_context(&connection)
466 : : {
467 [ - + ]: 13 : assert(m_impl);
468 [ - - ]: 13 : }
469 : :
470 : : //! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
471 : : //! garbage collection code after there are no more references to this object.
472 : : //! This will typically happen when the corresponding ProxyClient object on the
473 : : //! other side of the connection is destroyed. It can also happen earlier if the
474 : : //! connection is broken or destroyed. In the latter case this destructor will
475 : : //! typically be called inside m_rpc_system.reset() call in the ~Connection
476 : : //! destructor while the Connection object still exists. However, because
477 : : //! ProxyServer objects are refcounted, and the Connection object could be
478 : : //! destroyed while asynchronous IPC calls are still in-flight, it's possible
479 : : //! for this destructor to be called after the Connection object no longer
480 : : //! exists, so it is NOT valid to dereference the m_context.connection pointer
481 : : //! from this function.
482 : : template <typename Interface, typename Impl>
483 : 13 : ProxyServerBase<Interface, Impl>::~ProxyServerBase()
484 : : {
485 : 13 : if (m_impl) {
486 : : // If impl is non-null at this point, it means no client is waiting for
487 : : // the m_impl server object to be destroyed synchronously. This can
488 : : // happen either if the interface did not define a "destroy" method (see
489 : : // invokeDestroy method below), or if a destroy method was defined, but
490 : : // the connection was broken before it could be called.
491 : : //
492 : : // In either case, be conservative and run the cleanup on an
493 : : // asynchronous thread, to avoid destructors or cleanup functions
494 : : // blocking or deadlocking the current EventLoop thread, since they
495 : : // could be making IPC calls.
496 : : //
497 : : // Technically this is a little too conservative since if the interface
498 : : // defines a "destroy" method, but the destroy method does not accept a
499 : : // Context parameter specifying a worker thread, the cleanup method
500 : : // would run on the EventLoop thread normally (when connection is
501 : : // unbroken), but will not run on the EventLoop thread now (when
502 : : // connection is broken). Probably some refactoring of the destructor
503 : : // and invokeDestroy function is possible to make this cleaner and more
504 : : // consistent.
505 : 7 : m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
506 : 7 : impl.reset();
507 : 7 : CleanupRun(fns);
508 : : });
509 : : }
510 [ - + ]: 13 : assert(m_context.cleanup_fns.empty());
511 [ + + - + ]: 26 : }
512 : :
513 : : //! If the capnp interface defined a special "destroy" method, as described the
514 : : //! ProxyClientBase class, this method will be called and synchronously destroy
515 : : //! m_impl before returning to the client.
516 : : //!
517 : : //! If the capnp interface does not define a "destroy" method, this will never
518 : : //! be called and the ~ProxyServerBase destructor will be responsible for
519 : : //! deleting m_impl asynchronously, whenever the ProxyServer object gets garbage
520 : : //! collected by Cap'n Proto.
521 : : //!
522 : : //! This method is called in the same way other proxy server methods are called,
523 : : //! via the serverInvoke function. Basically serverInvoke just calls this as a
524 : : //! substitute for a non-existent m_impl->destroy() method. If the destroy
525 : : //! method has any parameters or return values they will be handled in the
526 : : //! normal way by PassField/ReadField/BuildField functions. Particularly if a
527 : : //! Context.thread parameter was passed, this method will run on the worker
528 : : //! thread specified by the client. Otherwise it will run on the EventLoop
529 : : //! thread, like other server methods without an assigned thread.
530 : : template <typename Interface, typename Impl>
531 : 6 : void ProxyServerBase<Interface, Impl>::invokeDestroy()
532 : : {
533 : 6 : m_impl.reset();
534 : 6 : CleanupRun(m_context.cleanup_fns);
535 : 6 : }
536 : :
537 : : using ConnThreads = std::map<Connection*, ProxyClient<Thread>>;
538 : : using ConnThread = ConnThreads::iterator;
539 : :
540 : : // Retrieve ProxyClient<Thread> object associated with this connection from a
541 : : // map, or create a new one and insert it into the map. Return map iterator and
542 : : // inserted bool.
543 : : std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
544 : :
545 : : struct ThreadContext
546 : : {
547 : : //! Identifying string for debug.
548 : : std::string thread_name;
549 : :
550 : : //! Waiter object used to allow client threads blocked waiting for a server
551 : : //! response to execute callbacks made from the client's corresponding
552 : : //! server thread.
553 : : std::unique_ptr<Waiter> waiter = nullptr;
554 : :
555 : : //! When client is making a request to a server, this is the
556 : : //! `callbackThread` argument it passes in the request, used by the server
557 : : //! in case it needs to make callbacks into the client that need to execute
558 : : //! while the client is waiting. This will be set to a local thread object.
559 : : ConnThreads callback_threads;
560 : :
561 : : //! When client is making a request to a server, this is the `thread`
562 : : //! argument it passes in the request, used to control which thread on
563 : : //! server will be responsible for executing it. If client call is being
564 : : //! made from a local thread, this will be a remote thread object returned
565 : : //! by makeThread. If a client call is being made from a thread currently
566 : : //! handling a server request, this will be set to the `callbackThread`
567 : : //! request thread argument passed in that request.
568 : : ConnThreads request_threads;
569 : :
570 : : //! Whether this thread is a capnp event loop thread. Not really used except
571 : : //! to assert false if there's an attempt to execute a blocking operation
572 : : //! which could deadlock the thread.
573 : : bool loop_thread = false;
574 : : };
575 : :
576 : : //! Given stream file descriptor, make a new ProxyClient object to send requests
577 : : //! over the stream. Also create a new Connection object embedded in the
578 : : //! client that is freed when the client is closed.
579 : : template <typename InitInterface>
580 : 6 : std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
581 : : {
582 : 6 : typename InitInterface::Client init_client(nullptr);
583 : 6 : std::unique_ptr<Connection> connection;
584 [ + - ]: 6 : loop.sync([&] {
585 : 6 : auto stream =
586 : 6 : loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
587 [ + - ]: 6 : connection = std::make_unique<Connection>(loop, kj::mv(stream));
588 [ + - + - : 12 : init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
+ - + - +
- + - +
- ]
589 [ + - ]: 6 : Connection* connection_ptr = connection.get();
590 [ + - ]: 6 : connection->onDisconnect([&loop, connection_ptr] {
591 [ # # ]: 0 : loop.log() << "IPC client: unexpected network disconnect.";
592 [ # # # # ]: 0 : delete connection_ptr;
593 : : });
594 : 6 : });
595 : : return std::make_unique<ProxyClient<InitInterface>>(
596 [ + - ]: 6 : kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
597 [ + - ]: 6 : }
598 : :
599 : : //! Given stream and init objects, construct a new ProxyServer object that
600 : : //! handles requests from the stream by calling the init object. Embed the
601 : : //! ProxyServer in a Connection object that is stored and erased if
602 : : //! disconnected. This should be called from the event loop thread.
603 : : template <typename InitInterface, typename InitImpl>
604 : 6 : void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
605 : : {
606 : 6 : loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
607 : : // Disable deleter so proxy server object doesn't attempt to delete the
608 : : // init implementation when the proxy client is destroyed or
609 : : // disconnected.
610 [ + - - + ]: 6 : return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
611 : : });
612 : 6 : auto it = loop.m_incoming_connections.begin();
613 : 12 : it->onDisconnect([&loop, it] {
614 [ + - ]: 6 : loop.log() << "IPC server: socket disconnected.";
615 : 6 : loop.m_incoming_connections.erase(it);
616 : : });
617 : 6 : }
618 : :
619 : : //! Given connection receiver and an init object, handle incoming connections by
620 : : //! calling _Serve, to create ProxyServer objects and forward requests to the
621 : : //! init object.
622 : : template <typename InitInterface, typename InitImpl>
623 : 7 : void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
624 : : {
625 : 7 : auto* ptr = listener.get();
626 [ + - ]: 14 : loop.m_task_set->add(ptr->accept().then(
627 [ + - + - : 21 : [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
+ - - + ]
628 : 5 : _Serve<InitInterface>(loop, kj::mv(stream), init);
629 : 5 : _Listen<InitInterface>(loop, kj::mv(listener), init);
630 : : }));
631 : 7 : }
632 : :
633 : : //! Given stream file descriptor and an init object, handle requests on the
634 : : //! stream by calling methods on the Init object.
635 : : template <typename InitInterface, typename InitImpl>
636 : 1 : void ServeStream(EventLoop& loop, int fd, InitImpl& init)
637 : : {
638 [ + - ]: 1 : _Serve<InitInterface>(
639 : 1 : loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
640 : 1 : }
641 : :
642 : : //! Given listening socket file descriptor and an init object, handle incoming
643 : : //! connections and requests by calling methods on the Init object.
644 : : template <typename InitInterface, typename InitImpl>
645 : 2 : void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
646 : : {
647 : 2 : loop.sync([&]() {
648 [ + - ]: 2 : _Listen<InitInterface>(loop,
649 : 4 : loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
650 : : init);
651 : : });
652 : 2 : }
653 : :
654 [ - - - - : 84 : extern thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - ][ - -
- - - - -
- - - - -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ + - # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- - - - -
- - ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ]
655 : : // Silence nonstandard bitcoin tidy error "Variable with non-trivial destructor
656 : : // cannot be thread_local" which should not be a problem on modern platforms, and
657 : : // could lead to a small memory leak at worst on older ones.
658 : :
659 : : } // namespace mp
660 : :
661 : : #endif // MP_PROXY_IO_H
|