Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef MP_PROXY_IO_H
6 : : #define MP_PROXY_IO_H
7 : :
8 : : #include <mp/proxy.h>
9 : : #include <mp/util.h>
10 : :
11 : : #include <mp/proxy.capnp.h>
12 : :
13 : : #include <capnp/rpc-twoparty.h>
14 : :
15 : : #include <assert.h>
16 : : #include <condition_variable>
17 : : #include <functional>
18 : : #include <kj/function.h>
19 : : #include <map>
20 : : #include <memory>
21 : : #include <optional>
22 : : #include <sstream>
23 : : #include <string>
24 : : #include <thread>
25 : :
26 : : namespace mp {
27 : : struct ThreadContext;
28 : :
29 : : struct InvokeContext
30 : : {
31 : : Connection& connection;
32 : : };
33 : :
34 : : struct ClientInvokeContext : InvokeContext
35 : : {
36 : : ThreadContext& thread_context;
37 : 30 : ClientInvokeContext(Connection& conn, ThreadContext& thread_context)
38 [ + - + - : 30 : : InvokeContext{conn}, thread_context{thread_context}
+ - + - +
- ][ - - +
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ +
- + - #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ # # # # ]
39 : : {
40 : : }
41 : : };
42 : :
43 : : template <typename ProxyServer, typename CallContext_>
44 : : struct ServerInvokeContext : InvokeContext
45 : : {
46 : : using CallContext = CallContext_;
47 : :
48 : : ProxyServer& proxy_server;
49 : : CallContext& call_context;
50 : : int req;
51 : :
52 : 48 : ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
53 [ + - + - : 30 : : InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
+ - + - +
- + - ][ -
- + - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # ]
54 : : {
55 : : }
56 : : };
57 : :
58 : : template <typename Interface, typename Params, typename Results>
59 : : using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
60 : :
61 : : template <>
62 : : struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
63 : : {
64 [ + - ]: 30 : using ProxyClientBase::ProxyClientBase;
65 : : // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
66 : : ProxyClient(const ProxyClient&) = delete;
67 : : ~ProxyClient();
68 : :
69 : : //! Reference to callback function that is run if there is a sudden
70 : : //! disconnect and the Connection object is destroyed before this
71 : : //! ProxyClient<Thread> object. The callback will destroy this object and
72 : : //! remove its entry from the thread's request_threads or callback_threads
73 : : //! map. It will also reset m_disconnect_cb so the destructor does not
74 : : //! access it. In the normal case where there is no sudden disconnect, the
75 : : //! destructor will unregister m_disconnect_cb so the callback is never run.
76 : : //! Since this variable is accessed from multiple threads, accesses should
77 : : //! be guarded with the associated Waiter::m_mutex.
78 : : std::optional<CleanupIt> m_disconnect_cb;
79 : : };
80 : :
81 : : template <>
82 : : struct ProxyServer<Thread> final : public Thread::Server
83 : : {
84 : : public:
85 : : ProxyServer(ThreadContext& thread_context, std::thread&& thread);
86 : : ~ProxyServer();
87 : : kj::Promise<void> getName(GetNameContext context) override;
88 : : ThreadContext& m_thread_context;
89 : : std::thread m_thread;
90 : : };
91 : :
92 : : //! Handler for kj::TaskSet failed task events.
93 : : class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
94 : : {
95 : : public:
96 [ + - ]: 17 : LoggingErrorHandler(EventLoop& loop) : m_loop(loop) {}
97 : : void taskFailed(kj::Exception&& exception) override;
98 : : EventLoop& m_loop;
99 : : };
100 : :
101 : : using LogFn = std::function<void(bool raise, std::string message)>;
102 : :
103 : : class Logger
104 : : {
105 : : public:
106 : 177 : Logger(bool raise, LogFn& fn) : m_raise(raise), m_fn(fn) {}
107 : : Logger(Logger&& logger) : m_raise(logger.m_raise), m_fn(logger.m_fn), m_buffer(std::move(logger.m_buffer)) {}
108 : 177 : ~Logger() noexcept(false)
109 : : {
110 [ + - + - : 354 : if (m_fn) m_fn(m_raise, m_buffer.str());
+ - ]
111 : 177 : }
112 : :
113 : : template <typename T>
114 : 1225 : friend Logger& operator<<(Logger& logger, T&& value)
115 : : {
116 [ + - + - : 949 : if (logger.m_fn) logger.m_buffer << std::forward<T>(value);
+ - + - +
- + - + -
+ - + - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - - - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- - - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- + - + -
+ - + - +
- + - + -
+ - + - -
- ][ + - -
+ - - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ + - + -
+ - ]
[ - - # # ]
[ + - - -
- - - - -
- - - - -
- - + - +
- + - + -
+ - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - + - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - - - +
- + - + -
+ - + - +
- + - + -
+ - - - ]
[ + - + -
+ - + - +
- - - - -
- - + - +
- + - + -
+ - - - -
- - - + -
+ - + - +
- + - + -
+ - + - +
- - - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- + - + -
+ - + - +
- + - + -
+ - + - -
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
117 : 476 : return logger;
118 : : }
119 : :
120 : : template <typename T>
121 [ - - + - : 177 : friend Logger& operator<<(Logger&& logger, T&& value)
- - + - +
- - - + -
+ - - - +
- + - - -
+ - + - -
- + - + -
- - + - #
# # # # #
# # # # #
# ][ - - -
- - - + -
+ - - - -
- + - + -
- - - - +
- + - - -
- - + - +
- - - - -
+ - + - +
- - - + -
+ - ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # #
# ]
[ + - + - ]
[ + - ][ - -
- - + - -
- - - - -
- - - - +
- - - + -
+ - - - +
- ][ + - -
- - - + -
- - + - -
- + - + -
- - + - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- - - - -
- - - - -
- + - + -
- - - - +
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ - - - -
- - + - +
- - - - -
+ - + - #
# # # ]
122 : : {
123 [ + - + - : 177 : return logger << std::forward<T>(value);
+ - + - +
- + - + -
+ - + - +
- + - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
- - - + -
+ - - - -
- - - + -
+ - - - -
- - - + -
+ - - - -
- - - + -
+ - - - -
- - - + -
+ - + - -
- - - + -
+ - ][ # #
# # # # #
# ]
[ - + + - ]
[ - - - -
+ - + - +
- - - + -
+ - + - +
- ][ + - -
- + - - -
+ - + - +
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
- - - - -
- - - - -
- - - + -
+ - - - -
- - - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
124 : : }
125 : :
126 : : bool m_raise;
127 : : LogFn& m_fn;
128 : : std::ostringstream m_buffer;
129 : : };
130 : :
131 [ + - ]: 6 : struct LogOptions {
132 : :
133 : : //! External logging callback.
134 : : LogFn log_fn;
135 : :
136 : : //! Maximum number of characters to use when representing
137 : : //! request and response structs as strings.
138 : : size_t max_chars{200};
139 : : };
140 : :
141 : : std::string LongThreadName(const char* exe_name);
142 : :
143 : : //! Event loop implementation.
144 : : //!
145 : : //! Cap'n Proto threading model is very simple: all I/O operations are
146 : : //! asynchronous and must be performed on a single thread. This includes:
147 : : //!
148 : : //! - Code starting an asynchronous operation (calling a function that returns a
149 : : //! promise object)
150 : : //! - Code notifying that an asynchronous operation is complete (code using a
151 : : //! fulfiller object)
152 : : //! - Code handling a completed operation (code chaining or waiting for a promise)
153 : : //!
154 : : //! All of this code needs to access shared state, and there is no mutex that
155 : : //! can be acquired to lock this state because Cap'n Proto
156 : : //! assumes it will only be accessed from one thread. So all this code needs to
157 : : //! actually run on one thread, and the EventLoop::loop() method is the entry point for
158 : : //! this thread. ProxyClient and ProxyServer objects that use other threads and
159 : : //! need to perform I/O operations post to this thread using EventLoop::post()
160 : : //! and EventLoop::sync() methods.
161 : : //!
162 : : //! Specifically, because ProxyClient methods can be called from arbitrary
163 : : //! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
164 : : //! methods use the EventLoop thread to send requests, and ProxyServer methods
165 : : //! use the thread to return results.
166 : : //!
167 : : //! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
168 : : class EventLoop
169 : : {
170 : : public:
171 : : //! Construct event loop object.
172 : : EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr);
173 : : ~EventLoop();
174 : :
175 : : //! Run event loop. Does not return until shutdown. This should only be
176 : : //! called once from the m_thread_id thread. This will block until
177 : : //! the m_num_clients reference count is 0.
178 : : void loop();
179 : :
180 : : //! Run function on event loop thread. Does not return until function completes.
181 : : //! Must be called while the loop() function is active.
182 : : void post(kj::Function<void()> fn);
183 : :
184 : : //! Wrapper around EventLoop::post that takes advantage of the
185 : : //! fact that callable will not go out of scope to avoid requirement that it
186 : : //! be copyable.
187 : : template <typename Callable>
188 : 153 : void sync(Callable&& callable)
189 : : {
190 [ + - ]: 153 : post(std::forward<Callable>(callable));
191 : 153 : }
192 : :
193 : : //! Register cleanup function to run on asynchronous worker thread without
194 : : //! blocking the event loop thread.
195 : : void addAsyncCleanup(std::function<void()> fn);
196 : :
197 : : //! Start asynchronous worker thread if necessary. This is only done if
198 : : //! there are ProxyServerBase::m_impl objects that need to be destroyed
199 : : //! asynchronously, without tying up the event loop thread. This can happen
200 : : //! when an interface does not declare a destroy() method that would allow
201 : : //! the client to wait for the destructor to finish and run it on a
202 : : //! dedicated thread. It can also happen whenever this is a broken
203 : : //! connection and the client is no longer around to call the destructors
204 : : //! and the server objects need to be garbage collected. In both cases, it
205 : : //! is important that ProxyServer::m_impl destructors do not run on the
206 : : //! eventloop thread because they may need it to do I/O if they perform
207 : : //! other IPC calls.
208 : : void startAsyncThread() MP_REQUIRES(m_mutex);
209 : :
210 : : //! Check if loop should exit.
211 : : bool done() const MP_REQUIRES(m_mutex);
212 : :
213 : 116 : Logger log()
214 : : {
215 : 116 : Logger logger(false, m_log_opts.log_fn);
216 [ + - + - : 232 : logger << "{" << LongThreadName(m_exe_name) << "} ";
+ - + - ]
217 : 116 : return logger;
218 : 0 : }
219 [ - - - - : 61 : Logger logPlain() { return {false, m_log_opts.log_fn}; }
+ - + - +
- - - - -
+ - + - +
- - - - -
+ - + - +
- - - - -
+ - + - +
- - - - -
+ - + - +
- + - - -
+ - + - +
- ][ - - -
- - - - -
- - - - -
- + - + -
+ - - - -
- + - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - + - +
- + - - -
- - + - +
- + - # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# ]
220 [ # # # # : 0 : Logger raise() { return {true, m_log_opts.log_fn}; }
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # #
# ]
221 : :
222 : : //! Process name included in thread names so combined debug output from
223 : : //! multiple processes is easier to understand.
224 : : const char* m_exe_name;
225 : :
226 : : //! ID of the event loop thread
227 : : std::thread::id m_thread_id = std::this_thread::get_id();
228 : :
229 : : //! Handle of an async worker thread. Joined on destruction. Unset if async
230 : : //! method has not been called.
231 : : std::thread m_async_thread;
232 : :
233 : : //! Callback function to run on event loop thread during post() or sync() call.
234 : : kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
235 : :
236 : : //! Callback functions to run on async thread.
237 : : std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
238 : :
239 : : //! Pipe read handle used to wake up the event loop thread.
240 : : int m_wait_fd = -1;
241 : :
242 : : //! Pipe write handle used to wake up the event loop thread.
243 : : int m_post_fd = -1;
244 : :
245 : : //! Number of clients holding references to ProxyServerBase objects that
246 : : //! reference this event loop.
247 : : int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
248 : :
249 : : //! Mutex and condition variable used to post tasks to event loop and async
250 : : //! thread.
251 : : Mutex m_mutex;
252 : : std::condition_variable m_cv;
253 : :
254 : : //! Capnp IO context.
255 : : kj::AsyncIoContext m_io_context;
256 : :
257 : : //! Capnp error handler. Needs to outlive m_task_set.
258 : : LoggingErrorHandler m_error_handler{*this};
259 : :
260 : : //! Capnp list of pending promises.
261 : : std::unique_ptr<kj::TaskSet> m_task_set;
262 : :
263 : : //! List of connections.
264 : : std::list<Connection> m_incoming_connections;
265 : :
266 : : //! Logging options
267 : : LogOptions m_log_opts;
268 : :
269 : : //! External context pointer.
270 : : void* m_context;
271 : : };
272 : :
273 : : //! Single element task queue used to handle recursive capnp calls. (If server
274 : : //! makes an callback into the client in the middle of a request, while client
275 : : //! thread is blocked waiting for server response, this is what allows the
276 : : //! client to run the request in the same thread, the same way code would run in
277 : : //! single process, with the callback sharing same thread stack as the original
278 : : //! call.
279 : 7 : struct Waiter
280 : : {
281 : 7 : Waiter() = default;
282 : :
283 : : template <typename Fn>
284 : 18 : void post(Fn&& fn)
285 : : {
286 : 18 : const Lock lock(m_mutex);
287 [ - + ]: 18 : assert(!m_fn);
288 [ + - ]: 18 : m_fn = std::forward<Fn>(fn);
289 [ + - ]: 18 : m_cv.notify_all();
290 : 18 : }
291 : :
292 : : template <class Predicate>
293 : 36 : void wait(Lock& lock, Predicate pred)
294 : : {
295 : 96 : m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
296 : : // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
297 : : // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
298 : : // after the fn() call and before the lock.lock() call in this loop
299 : : // in the case where a capnp response is sent and a brand new
300 : : // request is immediately received.
301 [ - + - + : 108 : while (m_fn) {
- + - + -
+ - + ]
[ + + ][ - -
- + - + #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - + -
+ # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
[ # # # # ]
302 : 18 : auto fn = std::move(*m_fn);
303 : 18 : m_fn.reset();
304 [ # # # # : 18 : Unlock(lock, fn);
# # # # #
# # # ]
[ + - ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
[ # # # # ]
305 : : }
306 : 90 : const bool done = pred();
307 : 90 : return done;
308 : : });
309 : 36 : }
310 : :
311 : : //! Mutex mainly used internally by waiter class, but also used externally
312 : : //! to guard access to related state. Specifically, since the thread_local
313 : : //! ThreadContext struct owns a Waiter, the Waiter::m_mutex is used to guard
314 : : //! access to other parts of the struct to avoid needing to deal with more
315 : : //! mutexes than necessary. This mutex can be held at the same time as
316 : : //! EventLoop::m_mutex as long as Waiter::mutex is locked first and
317 : : //! EventLoop::m_mutex is locked second.
318 : : Mutex m_mutex;
319 : : std::condition_variable m_cv;
320 : : std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
321 : : };
322 : :
323 : : //! Object holding network & rpc state associated with either an incoming server
324 : : //! connection, or an outgoing client connection. It must be created and destroyed
325 : : //! on the event loop thread.
326 : : //! In addition to Cap'n Proto state, it also holds lists of callbacks to run
327 : : //! when the connection is closed.
328 : : class Connection
329 : : {
330 : : public:
331 : 7 : Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
332 : 7 : : m_loop(loop), m_stream(kj::mv(stream_)),
333 [ + - + - ]: 7 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
334 [ + - + - : 21 : m_rpc_system(::capnp::makeRpcClient(m_network)) {}
+ - + - ]
335 : 7 : Connection(EventLoop& loop,
336 : : kj::Own<kj::AsyncIoStream>&& stream_,
337 : : const std::function<::capnp::Capability::Client(Connection&)>& make_client)
338 : 7 : : m_loop(loop), m_stream(kj::mv(stream_)),
339 [ + - + - ]: 7 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
340 [ + - + - : 21 : m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
+ - + - +
- + - ]
341 : :
342 : : //! Run cleanup functions. Must be called from the event loop thread. First
343 : : //! calls synchronous cleanup functions while blocked (to free capnp
344 : : //! Capability::Client handles owned by ProxyClient objects), then schedules
345 : : //! asynchronous cleanup functions to run in a worker thread (to run
346 : : //! destructors of m_impl instances owned by ProxyServer objects).
347 : : ~Connection();
348 : :
349 : : //! Register synchronous cleanup function to run on event loop thread (with
350 : : //! access to capnp thread local variables) when disconnect() is called.
351 : : //! any new i/o.
352 : : CleanupIt addSyncCleanup(std::function<void()> fn);
353 : : void removeSyncCleanup(CleanupIt it);
354 : :
355 : : //! Add disconnect handler.
356 : : template <typename F>
357 : 13 : void onDisconnect(F&& f)
358 : : {
359 : : // Add disconnect handler to local TaskSet to ensure it is cancelled and
360 : : // will never run after connection object is destroyed. But when disconnect
361 : : // handler fires, do not call the function f right away, instead add it
362 : : // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
363 : : // error in cases where f deletes this Connection object.
364 [ + - - + ]: 39 : m_on_disconnect.add(m_network.onDisconnect().then(
365 [ + - + - ]: 27 : [f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
[ + - ]
366 : 13 : }
367 : :
368 : : EventLoopRef m_loop;
369 : : kj::Own<kj::AsyncIoStream> m_stream;
370 : : LoggingErrorHandler m_error_handler{*m_loop};
371 : : kj::TaskSet m_on_disconnect{m_error_handler};
372 : : ::capnp::TwoPartyVatNetwork m_network;
373 : : std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
374 : :
375 : : // ThreadMap interface client, used to create a remote server thread when an
376 : : // client IPC call is being made for the first time from a new thread.
377 : : ThreadMap::Client m_thread_map{nullptr};
378 : :
379 : : //! Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by
380 : : //! ThreadMap.makeThread) used to service requests to clients.
381 : : ::capnp::CapabilityServerSet<Thread> m_threads;
382 : :
383 : : //! Cleanup functions to run if connection is broken unexpectedly. List
384 : : //! will be empty if all ProxyClient are destroyed cleanly before the
385 : : //! connection is destroyed.
386 : : CleanupList m_sync_cleanup_fns;
387 : : };
388 : :
389 : : //! Vat id for server side of connection. Required argument to RpcSystem::bootStrap()
390 : : //!
391 : : //! "Vat" is Cap'n Proto nomenclature for a host of various objects that facilitates
392 : : //! bidirectional communication with other vats; it is often but not always 1-1 with
393 : : //! processes. Cap'n Proto doesn't reference clients or servers per se; instead everything
394 : : //! is just a vat.
395 : : //!
396 : : //! See also: https://github.com/capnproto/capnproto/blob/9021f0c722b36cb11e3690b0860939255ebad39c/c%2B%2B/src/capnp/rpc.capnp#L42-L56
397 [ + - + - ]: 1 : struct ServerVatId
398 : : {
399 : : ::capnp::word scratch[4]{};
400 : : ::capnp::MallocMessageBuilder message{scratch};
401 : : ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
402 [ + - ]: 7 : ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
403 : : };
404 : :
405 : : template <typename Interface, typename Impl>
406 : 43 : ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client client,
407 : : Connection* connection,
408 : : bool destroy_connection)
409 [ + - ]: 43 : : m_client(std::move(client)), m_context(connection)
410 : :
411 : : {
412 : : // Handler for the connection getting destroyed before this client object.
413 [ + - ]: 43 : auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
414 : : // Release client capability by move-assigning to temporary.
415 : : {
416 : 0 : typename Interface::Client(std::move(m_client));
417 : : }
418 : 0 : Lock lock{m_context.loop->m_mutex};
419 : 0 : m_context.connection = nullptr;
420 : 0 : });
421 : :
422 : : // Two shutdown sequences are supported:
423 : : //
424 : : // - A normal sequence where client proxy objects are deleted by external
425 : : // code that no longer needs them
426 : : //
427 : : // - A garbage collection sequence where the connection or event loop shuts
428 : : // down while external code is still holding client references.
429 : : //
430 : : // The first case is handled here when m_context.connection is not null. The
431 : : // second case is handled by the disconnect_cb function, which sets
432 : : // m_context.connection to null so nothing happens here.
433 [ + - ]: 55 : m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
434 : : {
435 : : // If the capnp interface defines a destroy method, call it to destroy
436 : : // the remote object, waiting for it to be deleted server side. If the
437 : : // capnp interface does not define a destroy method, this will just call
438 : : // an empty stub defined in the ProxyClientBase class and do nothing.
439 : 6 : Sub::destroy(*this);
440 : :
441 : : // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
442 : 86 : m_context.loop->sync([&]() {
443 : : // Remove disconnect callback on cleanup so it doesn't run and try
444 : : // to access this object after it's destroyed. This call needs to
445 : : // run inside loop->sync() on the event loop thread because
446 : : // otherwise, if there were an ill-timed disconnect, the
447 : : // onDisconnect handler could fire and delete the Connection object
448 : : // before the removeSyncCleanup call.
449 [ + - ][ - - : 43 : if (m_context.connection) m_context.connection->removeSyncCleanup(disconnect_cb);
+ - # # ]
[ # # # # ]
450 : :
451 : : // Release client capability by move-assigning to temporary.
452 : : {
453 : 43 : typename Interface::Client(std::move(m_client));
454 : : }
455 [ + + ][ - - : 43 : if (destroy_connection) {
- + # # ]
[ # # # # ]
456 [ + - + - ]: 7 : delete m_context.connection;
[ # # # #
# # # # #
# # # ][ #
# # # # #
# # ]
457 : 7 : m_context.connection = nullptr;
458 : : }
459 : : });
460 : : }
461 : : });
462 [ + - ]: 43 : Sub::construct(*this);
463 : 43 : }
464 : :
465 : : template <typename Interface, typename Impl>
466 : 43 : ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
467 : : {
468 : 43 : CleanupRun(m_context.cleanup_fns);
469 : 43 : }
470 : :
471 : : template <typename Interface, typename Impl>
472 : 13 : ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
473 [ + - ]: 13 : : m_impl(std::move(impl)), m_context(&connection)
474 : : {
475 [ - + ]: 13 : assert(m_impl);
476 [ - - ]: 13 : }
477 : :
478 : : //! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
479 : : //! garbage collection code after there are no more references to this object.
480 : : //! This will typically happen when the corresponding ProxyClient object on the
481 : : //! other side of the connection is destroyed. It can also happen earlier if the
482 : : //! connection is broken or destroyed. In the latter case this destructor will
483 : : //! typically be called inside m_rpc_system.reset() call in the ~Connection
484 : : //! destructor while the Connection object still exists. However, because
485 : : //! ProxyServer objects are refcounted, and the Connection object could be
486 : : //! destroyed while asynchronous IPC calls are still in-flight, it's possible
487 : : //! for this destructor to be called after the Connection object no longer
488 : : //! exists, so it is NOT valid to dereference the m_context.connection pointer
489 : : //! from this function.
490 : : template <typename Interface, typename Impl>
491 : 13 : ProxyServerBase<Interface, Impl>::~ProxyServerBase()
492 : : {
493 : 13 : if (m_impl) {
494 : : // If impl is non-null at this point, it means no client is waiting for
495 : : // the m_impl server object to be destroyed synchronously. This can
496 : : // happen either if the interface did not define a "destroy" method (see
497 : : // invokeDestroy method below), or if a destroy method was defined, but
498 : : // the connection was broken before it could be called.
499 : : //
500 : : // In either case, be conservative and run the cleanup on an
501 : : // asynchronous thread, to avoid destructors or cleanup functions
502 : : // blocking or deadlocking the current EventLoop thread, since they
503 : : // could be making IPC calls.
504 : : //
505 : : // Technically this is a little too conservative since if the interface
506 : : // defines a "destroy" method, but the destroy method does not accept a
507 : : // Context parameter specifying a worker thread, the cleanup method
508 : : // would run on the EventLoop thread normally (when connection is
509 : : // unbroken), but will not run on the EventLoop thread now (when
510 : : // connection is broken). Probably some refactoring of the destructor
511 : : // and invokeDestroy function is possible to make this cleaner and more
512 : : // consistent.
513 : 7 : m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
514 : 7 : impl.reset();
515 : 7 : CleanupRun(fns);
516 : : });
517 : : }
518 [ - + ]: 13 : assert(m_context.cleanup_fns.empty());
519 [ + + - + ]: 26 : }
520 : :
521 : : //! If the capnp interface defined a special "destroy" method, as described the
522 : : //! ProxyClientBase class, this method will be called and synchronously destroy
523 : : //! m_impl before returning to the client.
524 : : //!
525 : : //! If the capnp interface does not define a "destroy" method, this will never
526 : : //! be called and the ~ProxyServerBase destructor will be responsible for
527 : : //! deleting m_impl asynchronously, whenever the ProxyServer object gets garbage
528 : : //! collected by Cap'n Proto.
529 : : //!
530 : : //! This method is called in the same way other proxy server methods are called,
531 : : //! via the serverInvoke function. Basically serverInvoke just calls this as a
532 : : //! substitute for a non-existent m_impl->destroy() method. If the destroy
533 : : //! method has any parameters or return values they will be handled in the
534 : : //! normal way by PassField/ReadField/BuildField functions. Particularly if a
535 : : //! Context.thread parameter was passed, this method will run on the worker
536 : : //! thread specified by the client. Otherwise it will run on the EventLoop
537 : : //! thread, like other server methods without an assigned thread.
538 : : template <typename Interface, typename Impl>
539 : 6 : void ProxyServerBase<Interface, Impl>::invokeDestroy()
540 : : {
541 : 6 : m_impl.reset();
542 : 6 : CleanupRun(m_context.cleanup_fns);
543 : 6 : }
544 : :
545 : : //! Map from Connection to local or remote thread handle which will be used over
546 : : //! that connection. This map will typically only contain one entry, but can
547 : : //! contain multiple if a single thread makes IPC calls over multiple
548 : : //! connections. A std::optional value type is used to avoid the map needing to
549 : : //! be locked while ProxyClient<Thread> objects are constructed, see
550 : : //! ThreadContext "Synchronization note" below.
551 : : using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
552 : : using ConnThread = ConnThreads::iterator;
553 : :
554 : : // Retrieve ProxyClient<Thread> object associated with this connection from a
555 : : // map, or create a new one and insert it into the map. Return map iterator and
556 : : // inserted bool.
557 : : std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);
558 : :
559 : : //! The thread_local ThreadContext g_thread_context struct provides information
560 : : //! about individual threads and a way of communicating between them. Because
561 : : //! it's a thread local struct, each ThreadContext instance is initialized by
562 : : //! the thread that owns it.
563 : : //!
564 : : //! ThreadContext is used for any client threads created externally which make
565 : : //! IPC calls, and for server threads created by
566 : : //! ProxyServer<ThreadMap>::makeThread() which execute IPC calls for clients.
567 : : //!
568 : : //! In both cases, the struct holds information like the thread name, and a
569 : : //! Waiter object where the EventLoop can post incoming IPC requests to execute
570 : : //! on the thread. The struct also holds ConnThread maps associating the thread
571 : : //! with local and remote ProxyClient<Thread> objects.
572 : : struct ThreadContext
573 : : {
574 : : //! Identifying string for debug.
575 : : std::string thread_name;
576 : :
577 : : //! Waiter object used to allow remote clients to execute code on this
578 : : //! thread. For server threads created by
579 : : //! ProxyServer<ThreadMap>::makeThread(), this is initialized in that
580 : : //! function. Otherwise, for client threads created externally, this is
581 : : //! initialized the first time the thread tries to make an IPC call. Having
582 : : //! a waiter is necessary for threads making IPC calls in case a server they
583 : : //! are calling expects them to execute a callback during the call, before
584 : : //! it sends a response.
585 : : //!
586 : : //! For IPC client threads, the Waiter pointer is never cleared and the Waiter
587 : : //! just gets destroyed when the thread does. For server threads created by
588 : : //! makeThread(), this pointer is set to null in the ~ProxyServer<Thread> as
589 : : //! a signal for the thread to exit and destroy itself. In both cases, the
590 : : //! same Waiter object is used across different calls and only created and
591 : : //! destroyed once for the lifetime of the thread.
592 : : std::unique_ptr<Waiter> waiter = nullptr;
593 : :
594 : : //! When client is making a request to a server, this is the
595 : : //! `callbackThread` argument it passes in the request, used by the server
596 : : //! in case it needs to make callbacks into the client that need to execute
597 : : //! while the client is waiting. This will be set to a local thread object.
598 : : //!
599 : : //! Synchronization note: The callback_thread and request_thread maps are
600 : : //! only ever accessed internally by this thread's destructor and externally
601 : : //! by Cap'n Proto event loop threads. Since it's possible for IPC client
602 : : //! threads to make calls over different connections that could have
603 : : //! different event loops, these maps are guarded by Waiter::m_mutex in case
604 : : //! different event loop threads add or remove map entries simultaneously.
605 : : //! However, individual ProxyClient<Thread> objects in the maps will only be
606 : : //! associated with one event loop and guarded by EventLoop::m_mutex. So
607 : : //! Waiter::m_mutex does not need to be held while accessing individual
608 : : //! ProxyClient<Thread> instances, and may even need to be released to
609 : : //! respect lock order and avoid locking Waiter::m_mutex before
610 : : //! EventLoop::m_mutex.
611 : : ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);
612 : :
613 : : //! When client is making a request to a server, this is the `thread`
614 : : //! argument it passes in the request, used to control which thread on
615 : : //! server will be responsible for executing it. If client call is being
616 : : //! made from a local thread, this will be a remote thread object returned
617 : : //! by makeThread. If a client call is being made from a thread currently
618 : : //! handling a server request, this will be set to the `callbackThread`
619 : : //! request thread argument passed in that request.
620 : : //!
621 : : //! Synchronization note: \ref callback_threads note applies here as well.
622 : : ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
623 : :
624 : : //! Whether this thread is a capnp event loop thread. Not really used except
625 : : //! to assert false if there's an attempt to execute a blocking operation
626 : : //! which could deadlock the thread.
627 : : bool loop_thread = false;
628 : : };
629 : :
630 : : //! Given stream file descriptor, make a new ProxyClient object to send requests
631 : : //! over the stream. Also create a new Connection object embedded in the
632 : : //! client that is freed when the client is closed.
633 : : template <typename InitInterface>
634 : 6 : std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
635 : : {
636 : 6 : typename InitInterface::Client init_client(nullptr);
637 : 6 : std::unique_ptr<Connection> connection;
638 [ + - ]: 6 : loop.sync([&] {
639 : 6 : auto stream =
640 : 6 : loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
641 [ + - ]: 6 : connection = std::make_unique<Connection>(loop, kj::mv(stream));
642 [ + - + - : 12 : init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
+ - + - +
- + - +
- ]
643 [ + - ]: 6 : Connection* connection_ptr = connection.get();
644 [ + - ]: 6 : connection->onDisconnect([&loop, connection_ptr] {
645 [ # # ]: 0 : loop.log() << "IPC client: unexpected network disconnect.";
646 [ # # # # ]: 0 : delete connection_ptr;
647 : : });
648 : 6 : });
649 : : return std::make_unique<ProxyClient<InitInterface>>(
650 [ + - ]: 6 : kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
651 [ + - ]: 6 : }
652 : :
653 : : //! Given stream and init objects, construct a new ProxyServer object that
654 : : //! handles requests from the stream by calling the init object. Embed the
655 : : //! ProxyServer in a Connection object that is stored and erased if
656 : : //! disconnected. This should be called from the event loop thread.
657 : : template <typename InitInterface, typename InitImpl>
658 : 6 : void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
659 : : {
660 : 6 : loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
661 : : // Disable deleter so proxy server object doesn't attempt to delete the
662 : : // init implementation when the proxy client is destroyed or
663 : : // disconnected.
664 [ + - - + ]: 6 : return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
665 : : });
666 : 6 : auto it = loop.m_incoming_connections.begin();
667 : 12 : it->onDisconnect([&loop, it] {
668 [ + - ]: 6 : loop.log() << "IPC server: socket disconnected.";
669 : 6 : loop.m_incoming_connections.erase(it);
670 : : });
671 : 6 : }
672 : :
673 : : //! Given connection receiver and an init object, handle incoming connections by
674 : : //! calling _Serve, to create ProxyServer objects and forward requests to the
675 : : //! init object.
676 : : template <typename InitInterface, typename InitImpl>
677 : 7 : void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
678 : : {
679 : 7 : auto* ptr = listener.get();
680 [ + - ]: 14 : loop.m_task_set->add(ptr->accept().then(
681 [ + - + - : 21 : [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
+ - - + ]
682 : 5 : _Serve<InitInterface>(loop, kj::mv(stream), init);
683 : 5 : _Listen<InitInterface>(loop, kj::mv(listener), init);
684 : : }));
685 : 7 : }
686 : :
687 : : //! Given stream file descriptor and an init object, handle requests on the
688 : : //! stream by calling methods on the Init object.
689 : : template <typename InitInterface, typename InitImpl>
690 : 1 : void ServeStream(EventLoop& loop, int fd, InitImpl& init)
691 : : {
692 [ + - ]: 1 : _Serve<InitInterface>(
693 : 1 : loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
694 : 1 : }
695 : :
696 : : //! Given listening socket file descriptor and an init object, handle incoming
697 : : //! connections and requests by calling methods on the Init object.
698 : : template <typename InitInterface, typename InitImpl>
699 : 2 : void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
700 : : {
701 : 2 : loop.sync([&]() {
702 [ + - ]: 2 : _Listen<InitInterface>(loop,
703 : 4 : loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
704 : : init);
705 : : });
706 : 2 : }
707 : :
708 [ - - - - : 84 : extern thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - ][ - -
- - - - -
- - - - -
- - - - -
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ + - ][ - -
- - - - -
- - - - -
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # ]
709 : : // Silence nonstandard bitcoin tidy error "Variable with non-trivial destructor
710 : : // cannot be thread_local" which should not be a problem on modern platforms, and
711 : : // could lead to a small memory leak at worst on older ones.
712 : :
713 : : } // namespace mp
714 : :
715 : : #endif // MP_PROXY_IO_H
|