Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef MP_PROXY_IO_H
6 : : #define MP_PROXY_IO_H
7 : :
8 : : #include <mp/proxy.h>
9 : : #include <mp/util.h>
10 : :
11 : : #include <mp/proxy.capnp.h>
12 : :
13 : : #include <capnp/rpc-twoparty.h>
14 : :
15 : : #include <assert.h>
16 : : #include <condition_variable>
17 : : #include <functional>
18 : : #include <kj/function.h>
19 : : #include <map>
20 : : #include <memory>
21 : : #include <optional>
22 : : #include <sstream>
23 : : #include <string>
24 : : #include <thread>
25 : :
26 : : namespace mp {
27 : : struct ThreadContext;
28 : :
29 : : struct InvokeContext
30 : : {
31 : : Connection& connection;
32 : : };
33 : :
34 : : struct ClientInvokeContext : InvokeContext
35 : : {
36 : : ThreadContext& thread_context;
37 : 30 : ClientInvokeContext(Connection& conn, ThreadContext& thread_context)
38 [ # # # # : 30 : : InvokeContext{conn}, thread_context{thread_context}
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
+ - + - ]
[ + - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ + - +
- + - + -
+ - ][ # #
# # # # #
# # # ]
39 : : {
40 : : }
41 : : };
42 : :
43 : : template <typename ProxyServer, typename CallContext_>
44 : : struct ServerInvokeContext : InvokeContext
45 : : {
46 : : using CallContext = CallContext_;
47 : :
48 : : ProxyServer& proxy_server;
49 : : CallContext& call_context;
50 : : int req;
51 : :
52 : 48 : ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
53 [ # # # # : 30 : : InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
+ - + - ]
[ + - + -
# # ][ + -
+ - + - +
- + - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # #
# ]
54 : : {
55 : : }
56 : : };
57 : :
58 : : template <typename Interface, typename Params, typename Results>
59 : : using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
60 : :
61 : : template <>
62 : : struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
63 : : {
64 [ + - ]: 30 : using ProxyClientBase::ProxyClientBase;
65 : : // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
66 : : ProxyClient(const ProxyClient&) = delete;
67 : : ~ProxyClient();
68 : :
69 : : //! Reference to callback function that is run if there is a sudden
70 : : //! disconnect and the Connection object is destroyed before this
71 : : //! ProxyClient<Thread> object. The callback will destroy this object and
72 : : //! remove its entry from the thread's request_threads or callback_threads
73 : : //! map. It will also reset m_disconnect_cb so the destructor does not
74 : : //! access it. In the normal case where there is no sudden disconnect, the
75 : : //! destructor will unregister m_disconnect_cb so the callback is never run.
76 : : //! Since this variable is accessed from multiple threads, accesses should
77 : : //! be guarded with the associated Waiter::m_mutex.
78 : : std::optional<CleanupIt> m_disconnect_cb;
79 : : };
80 : :
81 : : template <>
82 : : struct ProxyServer<Thread> final : public Thread::Server
83 : : {
84 : : public:
85 : : ProxyServer(ThreadContext& thread_context, std::thread&& thread);
86 : : ~ProxyServer();
87 : : kj::Promise<void> getName(GetNameContext context) override;
88 : : ThreadContext& m_thread_context;
89 : : std::thread m_thread;
90 : : };
91 : :
92 : : //! Handler for kj::TaskSet failed task events.
93 : : class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
94 : : {
95 : : public:
96 [ + - ]: 17 : LoggingErrorHandler(EventLoop& loop) : m_loop(loop) {}
97 : : void taskFailed(kj::Exception&& exception) override;
98 : : EventLoop& m_loop;
99 : : };
100 : :
101 : : //! Log flags. Update stringify function if changed!
102 : : enum class Log {
103 : : Trace = 0,
104 : : Debug,
105 : : Info,
106 : : Warning,
107 : : Error,
108 : : Raise,
109 : : };
110 : :
111 : : kj::StringPtr KJ_STRINGIFY(Log flags);
112 : :
113 [ + - ]: 594 : struct LogMessage {
114 : :
115 : : //! Message to be logged
116 : : std::string message;
117 : :
118 : : //! The severity level of this message
119 : : Log level;
120 : : };
121 : :
122 : : using LogFn = std::function<void(LogMessage)>;
123 : :
124 [ + - ]: 11 : struct LogOptions {
125 : :
126 : : //! External logging callback.
127 : : LogFn log_fn;
128 : :
129 : : //! Maximum number of characters to use when representing
130 : : //! request and response structs as strings.
131 : : size_t max_chars{200};
132 : :
133 : : //! Messages with a severity level less than log_level will not be
134 : : //! reported.
135 : : Log log_level{Log::Trace};
136 : : };
137 : :
138 : : class Logger
139 : : {
140 : : public:
141 [ + - + - : 297 : Logger(const LogOptions& options, Log log_level) : m_options(options), m_log_level(log_level) {}
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - - - -
- - - - -
# # # # #
# # # # #
# # ][ - -
- - - - -
- - - - -
+ - + - -
- - - + -
+ - ][ - -
- - + - +
- - - - -
+ - + - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ - - - -
- - - - #
# # # ][ -
- - - + -
+ - - - -
- + - + -
- - - - +
- + - - -
- - + - +
- - - - -
+ - + - -
- - - + -
+ - ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ]
142 : :
143 : : Logger(Logger&&) = delete;
144 : : Logger& operator=(Logger&&) = delete;
145 : : Logger(const Logger&) = delete;
146 : : Logger& operator=(const Logger&) = delete;
147 : :
148 : 297 : ~Logger() noexcept(false)
149 : : {
150 [ + - ]: 594 : if (enabled()) m_options.log_fn({std::move(m_buffer).str(), m_log_level});
151 [ + - ]: 594 : }
152 : :
153 : : template <typename T>
154 : 1405 : friend Logger& operator<<(Logger& logger, T&& value)
155 : : {
156 [ - + ]: 1405 : if (logger.enabled()) logger.m_buffer << std::forward<T>(value);
157 : 1405 : return logger;
158 : : }
159 : :
160 : : template <typename T>
161 : : friend Logger& operator<<(Logger&& logger, T&& value)
162 : : {
163 : : return logger << std::forward<T>(value);
164 : : }
165 : :
166 : 297 : explicit operator bool() const
167 : : {
168 [ + - + - : 600 : return enabled();
+ - + - -
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- - - - -
- - - - +
- + - + -
+ - - - -
- - - - -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
- - - - -
- - - + -
+ - + - +
- - - - -
- - - - +
- + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - - - +
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
- - - - -
- - - + -
+ - + - +
- - - - -
- - - - +
- + - ][ -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- ][ - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- - - - -
- - - - -
- - - + -
+ - + - +
- + - +
- ][ + - +
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - -
- - - + -
- - - - +
- + - - -
- - - - -
- + - + -
+ - + - -
- - - - -
- - + - +
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- - - - -
- - - - -
- - - + -
+ - + - +
- + - + -
- - - - -
- - - - -
- - + - +
- + - + -
+ - + - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - ]
169 : : }
170 : :
171 : : private:
172 : 1999 : bool enabled() const
173 : : {
174 [ + - + - : 1999 : return m_options.log_fn && m_log_level >= m_options.log_level;
+ - + - -
- - - - -
- - - - -
- - - - -
+ - + - +
- + - - -
- - - - -
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- + - + -
+ - + - -
- - - - -
- - - - -
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
- - - - ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ - - - -
- - - - -
- - - - -
- - - - -
- - - - -
+ - + - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- - - - -
+ - + - +
- + - - -
- - ][ - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
- - - - ]
[ - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- - - -
- ][ - - -
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
- - - - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ - - - -
- - - - -
- - - + -
+ - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - -
- - - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- + - - -
- - - - -
- - - - -
- - - - -
- - - -
- ][ - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- - - - -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- - - - -
- - - - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
+ - + - ]
175 : : }
176 : :
177 : : const LogOptions& m_options;
178 : : Log m_log_level;
179 : : std::ostringstream m_buffer;
180 : : };
181 : :
182 : : #define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
183 : :
184 : : #define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
185 : :
186 : : std::string LongThreadName(const char* exe_name);
187 : :
188 : : //! Event loop implementation.
189 : : //!
190 : : //! Cap'n Proto threading model is very simple: all I/O operations are
191 : : //! asynchronous and must be performed on a single thread. This includes:
192 : : //!
193 : : //! - Code starting an asynchronous operation (calling a function that returns a
194 : : //! promise object)
195 : : //! - Code notifying that an asynchronous operation is complete (code using a
196 : : //! fulfiller object)
197 : : //! - Code handling a completed operation (code chaining or waiting for a promise)
198 : : //!
199 : : //! All of this code needs to access shared state, and there is no mutex that
200 : : //! can be acquired to lock this state because Cap'n Proto
201 : : //! assumes it will only be accessed from one thread. So all this code needs to
202 : : //! actually run on one thread, and the EventLoop::loop() method is the entry point for
203 : : //! this thread. ProxyClient and ProxyServer objects that use other threads and
204 : : //! need to perform I/O operations post to this thread using EventLoop::post()
205 : : //! and EventLoop::sync() methods.
206 : : //!
207 : : //! Specifically, because ProxyClient methods can be called from arbitrary
208 : : //! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
209 : : //! methods use the EventLoop thread to send requests, and ProxyServer methods
210 : : //! use the thread to return results.
211 : : //!
212 : : //! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
213 : : class EventLoop
214 : : {
215 : : public:
216 : : //! Construct event loop object with default logging options.
217 : 1 : EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr)
218 [ + - ]: 2 : : EventLoop(exe_name, LogOptions{std::move(log_fn)}, context){}
219 : :
220 : : //! Construct event loop object with specified logging options.
221 : : EventLoop(const char* exe_name, LogOptions log_opts, void* context = nullptr);
222 : :
223 : : //! Backwards-compatible constructor for previous (deprecated) logging callback signature
224 : 1 : EventLoop(const char* exe_name, std::function<void(bool, std::string)> old_callback, void* context = nullptr)
225 : 1 : : EventLoop(exe_name,
226 [ - - + - : 110 : LogFn{[old_callback = std::move(old_callback)](LogMessage log_data) {old_callback(log_data.level == Log::Raise, std::move(log_data.message));}},
+ - ]
227 [ + - ]: 2 : context){}
228 : :
229 : : ~EventLoop();
230 : :
231 : : //! Run event loop. Does not return until shutdown. This should only be
232 : : //! called once from the m_thread_id thread. This will block until
233 : : //! the m_num_clients reference count is 0.
234 : : void loop();
235 : :
236 : : //! Run function on event loop thread. Does not return until function completes.
237 : : //! Must be called while the loop() function is active.
238 : : void post(kj::Function<void()> fn);
239 : :
240 : : //! Wrapper around EventLoop::post that takes advantage of the
241 : : //! fact that callable will not go out of scope to avoid requirement that it
242 : : //! be copyable.
243 : : template <typename Callable>
244 : 153 : void sync(Callable&& callable)
245 : : {
246 [ + - ]: 153 : post(std::forward<Callable>(callable));
247 : 153 : }
248 : :
249 : : //! Register cleanup function to run on asynchronous worker thread without
250 : : //! blocking the event loop thread.
251 : : void addAsyncCleanup(std::function<void()> fn);
252 : :
253 : : //! Start asynchronous worker thread if necessary. This is only done if
254 : : //! there are ProxyServerBase::m_impl objects that need to be destroyed
255 : : //! asynchronously, without tying up the event loop thread. This can happen
256 : : //! when an interface does not declare a destroy() method that would allow
257 : : //! the client to wait for the destructor to finish and run it on a
258 : : //! dedicated thread. It can also happen whenever this is a broken
259 : : //! connection and the client is no longer around to call the destructors
260 : : //! and the server objects need to be garbage collected. In both cases, it
261 : : //! is important that ProxyServer::m_impl destructors do not run on the
262 : : //! eventloop thread because they may need it to do I/O if they perform
263 : : //! other IPC calls.
264 : : void startAsyncThread() MP_REQUIRES(m_mutex);
265 : :
266 : : //! Check if loop should exit.
267 : : bool done() const MP_REQUIRES(m_mutex);
268 : :
269 : : //! Process name included in thread names so combined debug output from
270 : : //! multiple processes is easier to understand.
271 : : const char* m_exe_name;
272 : :
273 : : //! ID of the event loop thread
274 : : std::thread::id m_thread_id = std::this_thread::get_id();
275 : :
276 : : //! Handle of an async worker thread. Joined on destruction. Unset if async
277 : : //! method has not been called.
278 : : std::thread m_async_thread;
279 : :
280 : : //! Callback function to run on event loop thread during post() or sync() call.
281 : : kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
282 : :
283 : : //! Callback functions to run on async thread.
284 : : std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
285 : :
286 : : //! Pipe read handle used to wake up the event loop thread.
287 : : int m_wait_fd = -1;
288 : :
289 : : //! Pipe write handle used to wake up the event loop thread.
290 : : int m_post_fd = -1;
291 : :
292 : : //! Number of clients holding references to ProxyServerBase objects that
293 : : //! reference this event loop.
294 : : int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
295 : :
296 : : //! Mutex and condition variable used to post tasks to event loop and async
297 : : //! thread.
298 : : Mutex m_mutex;
299 : : std::condition_variable m_cv;
300 : :
301 : : //! Capnp IO context.
302 : : kj::AsyncIoContext m_io_context;
303 : :
304 : : //! Capnp error handler. Needs to outlive m_task_set.
305 : : LoggingErrorHandler m_error_handler{*this};
306 : :
307 : : //! Capnp list of pending promises.
308 : : std::unique_ptr<kj::TaskSet> m_task_set;
309 : :
310 : : //! List of connections.
311 : : std::list<Connection> m_incoming_connections;
312 : :
313 : : //! Logging options
314 : : LogOptions m_log_opts;
315 : :
316 : : //! External context pointer.
317 : : void* m_context;
318 : : };
319 : :
320 : : //! Single element task queue used to handle recursive capnp calls. (If server
321 : : //! makes an callback into the client in the middle of a request, while client
322 : : //! thread is blocked waiting for server response, this is what allows the
323 : : //! client to run the request in the same thread, the same way code would run in
324 : : //! single process, with the callback sharing same thread stack as the original
325 : : //! call.
326 : 7 : struct Waiter
327 : : {
328 : 7 : Waiter() = default;
329 : :
330 : : template <typename Fn>
331 : 18 : bool post(Fn&& fn)
332 : : {
333 : 18 : const Lock lock(m_mutex);
334 [ + - ]: 18 : if (m_fn) return false;
335 [ + - ]: 18 : m_fn = std::forward<Fn>(fn);
336 : 18 : m_cv.notify_all();
337 : 18 : return true;
338 : 18 : }
339 : :
340 : : template <class Predicate>
341 : 36 : void wait(Lock& lock, Predicate pred)
342 : : {
343 : 94 : m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
344 : : // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
345 : : // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
346 : : // after the fn() call and before the lock.lock() call in this loop
347 : : // in the case where a capnp response is sent and a brand new
348 : : // request is immediately received.
349 [ + + # # : 105 : while (m_fn) {
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- + - + ]
[ - + - +
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - + -
+ - + - +
- + - + ]
[ # # # #
# # # # #
# # # ]
350 : 18 : auto fn = std::move(*m_fn);
351 : 18 : m_fn.reset();
352 [ + - # # : 18 : Unlock(lock, fn);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # ]
353 : : }
354 : 87 : const bool done = pred();
355 : 87 : return done;
356 : : });
357 : 36 : }
358 : :
359 : : //! Mutex mainly used internally by waiter class, but also used externally
360 : : //! to guard access to related state. Specifically, since the thread_local
361 : : //! ThreadContext struct owns a Waiter, the Waiter::m_mutex is used to guard
362 : : //! access to other parts of the struct to avoid needing to deal with more
363 : : //! mutexes than necessary. This mutex can be held at the same time as
364 : : //! EventLoop::m_mutex as long as Waiter::mutex is locked first and
365 : : //! EventLoop::m_mutex is locked second.
366 : : Mutex m_mutex;
367 : : std::condition_variable m_cv;
368 : : std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
369 : : };
370 : :
371 : : //! Object holding network & rpc state associated with either an incoming server
372 : : //! connection, or an outgoing client connection. It must be created and destroyed
373 : : //! on the event loop thread.
374 : : //! In addition to Cap'n Proto state, it also holds lists of callbacks to run
375 : : //! when the connection is closed.
376 : : class Connection
377 : : {
378 : : public:
379 : 7 : Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
380 : 7 : : m_loop(loop), m_stream(kj::mv(stream_)),
381 [ + - + - ]: 7 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
382 [ + - + - : 21 : m_rpc_system(::capnp::makeRpcClient(m_network)) {}
+ - + - ]
383 : 7 : Connection(EventLoop& loop,
384 : : kj::Own<kj::AsyncIoStream>&& stream_,
385 : : const std::function<::capnp::Capability::Client(Connection&)>& make_client)
386 : 7 : : m_loop(loop), m_stream(kj::mv(stream_)),
387 [ + - + - ]: 7 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
388 [ + - + - : 21 : m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
+ - + - +
- + - ]
389 : :
390 : : //! Run cleanup functions. Must be called from the event loop thread. First
391 : : //! calls synchronous cleanup functions while blocked (to free capnp
392 : : //! Capability::Client handles owned by ProxyClient objects), then schedules
393 : : //! asynchronous cleanup functions to run in a worker thread (to run
394 : : //! destructors of m_impl instances owned by ProxyServer objects).
395 : : ~Connection();
396 : :
397 : : //! Register synchronous cleanup function to run on event loop thread (with
398 : : //! access to capnp thread local variables) when disconnect() is called.
399 : : //! any new i/o.
400 : : CleanupIt addSyncCleanup(std::function<void()> fn);
401 : : void removeSyncCleanup(CleanupIt it);
402 : :
403 : : //! Add disconnect handler.
404 : : template <typename F>
405 : 13 : void onDisconnect(F&& f)
406 : : {
407 : : // Add disconnect handler to local TaskSet to ensure it is cancelled and
408 : : // will never run after connection object is destroyed. But when disconnect
409 : : // handler fires, do not call the function f right away, instead add it
410 : : // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
411 : : // error in cases where f deletes this Connection object.
412 [ + - - + ]: 39 : m_on_disconnect.add(m_network.onDisconnect().then(
413 [ + - # # ]: 27 : [f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
[ + - + - ]
414 : 13 : }
415 : :
416 : : EventLoopRef m_loop;
417 : : kj::Own<kj::AsyncIoStream> m_stream;
418 : : LoggingErrorHandler m_error_handler{*m_loop};
419 : : kj::TaskSet m_on_disconnect{m_error_handler};
420 : : ::capnp::TwoPartyVatNetwork m_network;
421 : : std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
422 : :
423 : : // ThreadMap interface client, used to create a remote server thread when an
424 : : // client IPC call is being made for the first time from a new thread.
425 : : ThreadMap::Client m_thread_map{nullptr};
426 : :
427 : : //! Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by
428 : : //! ThreadMap.makeThread) used to service requests to clients.
429 : : ::capnp::CapabilityServerSet<Thread> m_threads;
430 : :
431 : : //! Cleanup functions to run if connection is broken unexpectedly. List
432 : : //! will be empty if all ProxyClient are destroyed cleanly before the
433 : : //! connection is destroyed.
434 : : CleanupList m_sync_cleanup_fns;
435 : : };
436 : :
437 : : //! Vat id for server side of connection. Required argument to RpcSystem::bootStrap()
438 : : //!
439 : : //! "Vat" is Cap'n Proto nomenclature for a host of various objects that facilitates
440 : : //! bidirectional communication with other vats; it is often but not always 1-1 with
441 : : //! processes. Cap'n Proto doesn't reference clients or servers per se; instead everything
442 : : //! is just a vat.
443 : : //!
444 : : //! See also: https://github.com/capnproto/capnproto/blob/9021f0c722b36cb11e3690b0860939255ebad39c/c%2B%2B/src/capnp/rpc.capnp#L42-L56
445 [ + - + - ]: 1 : struct ServerVatId
446 : : {
447 : : ::capnp::word scratch[4]{};
448 : : ::capnp::MallocMessageBuilder message{scratch};
449 : : ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
450 [ + - ]: 7 : ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
451 : : };
452 : :
453 : : template <typename Interface, typename Impl>
454 : 43 : ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client client,
455 : : Connection* connection,
456 : : bool destroy_connection)
457 [ + - ]: 43 : : m_client(std::move(client)), m_context(connection)
458 : :
459 : : {
460 : : // Handler for the connection getting destroyed before this client object.
461 [ + - ]: 43 : auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
462 : : // Release client capability by move-assigning to temporary.
463 : : {
464 : 0 : typename Interface::Client(std::move(m_client));
465 : : }
466 : 0 : Lock lock{m_context.loop->m_mutex};
467 : 0 : m_context.connection = nullptr;
468 : 0 : });
469 : :
470 : : // Two shutdown sequences are supported:
471 : : //
472 : : // - A normal sequence where client proxy objects are deleted by external
473 : : // code that no longer needs them
474 : : //
475 : : // - A garbage collection sequence where the connection or event loop shuts
476 : : // down while external code is still holding client references.
477 : : //
478 : : // The first case is handled here when m_context.connection is not null. The
479 : : // second case is handled by the disconnect_cb function, which sets
480 : : // m_context.connection to null so nothing happens here.
481 [ + - ]: 55 : m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
482 : : {
483 : : // If the capnp interface defines a destroy method, call it to destroy
484 : : // the remote object, waiting for it to be deleted server side. If the
485 : : // capnp interface does not define a destroy method, this will just call
486 : : // an empty stub defined in the ProxyClientBase class and do nothing.
487 : 6 : Sub::destroy(*this);
488 : :
489 : : // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
490 : 86 : m_context.loop->sync([&]() {
491 : : // Remove disconnect callback on cleanup so it doesn't run and try
492 : : // to access this object after it's destroyed. This call needs to
493 : : // run inside loop->sync() on the event loop thread because
494 : : // otherwise, if there were an ill-timed disconnect, the
495 : : // onDisconnect handler could fire and delete the Connection object
496 : : // before the removeSyncCleanup call.
497 [ + - # # : 43 : if (m_context.connection) m_context.connection->removeSyncCleanup(disconnect_cb);
# # ]
[ - - + - ]
[ # # # # ]
498 : :
499 : : // Release client capability by move-assigning to temporary.
500 : : {
501 : 43 : typename Interface::Client(std::move(m_client));
502 : : }
503 [ + + # # : 43 : if (destroy_connection) {
# # ]
[ - - - + ]
[ # # # # ]
504 [ + - + - : 7 : delete m_context.connection;
# # # # #
# # # ][ #
# # # # #
# # ][ # #
# # # # #
# ]
505 : 7 : m_context.connection = nullptr;
506 : : }
507 : : });
508 : : }
509 : : });
510 [ + - ]: 43 : Sub::construct(*this);
511 : 43 : }
512 : :
513 : : template <typename Interface, typename Impl>
514 : 43 : ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
515 : : {
516 : 43 : CleanupRun(m_context.cleanup_fns);
517 : 43 : }
518 : :
519 : : template <typename Interface, typename Impl>
520 : 13 : ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
521 [ + - ]: 13 : : m_impl(std::move(impl)), m_context(&connection)
522 : : {
523 [ - + ]: 13 : assert(m_impl);
524 [ - - ]: 13 : }
525 : :
526 : : //! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
527 : : //! garbage collection code after there are no more references to this object.
528 : : //! This will typically happen when the corresponding ProxyClient object on the
529 : : //! other side of the connection is destroyed. It can also happen earlier if the
530 : : //! connection is broken or destroyed. In the latter case this destructor will
531 : : //! typically be called inside m_rpc_system.reset() call in the ~Connection
532 : : //! destructor while the Connection object still exists. However, because
533 : : //! ProxyServer objects are refcounted, and the Connection object could be
534 : : //! destroyed while asynchronous IPC calls are still in-flight, it's possible
535 : : //! for this destructor to be called after the Connection object no longer
536 : : //! exists, so it is NOT valid to dereference the m_context.connection pointer
537 : : //! from this function.
538 : : template <typename Interface, typename Impl>
539 : 13 : ProxyServerBase<Interface, Impl>::~ProxyServerBase()
540 : : {
541 : 13 : if (m_impl) {
542 : : // If impl is non-null at this point, it means no client is waiting for
543 : : // the m_impl server object to be destroyed synchronously. This can
544 : : // happen either if the interface did not define a "destroy" method (see
545 : : // invokeDestroy method below), or if a destroy method was defined, but
546 : : // the connection was broken before it could be called.
547 : : //
548 : : // In either case, be conservative and run the cleanup on an
549 : : // asynchronous thread, to avoid destructors or cleanup functions
550 : : // blocking or deadlocking the current EventLoop thread, since they
551 : : // could be making IPC calls.
552 : : //
553 : : // Technically this is a little too conservative since if the interface
554 : : // defines a "destroy" method, but the destroy method does not accept a
555 : : // Context parameter specifying a worker thread, the cleanup method
556 : : // would run on the EventLoop thread normally (when connection is
557 : : // unbroken), but will not run on the EventLoop thread now (when
558 : : // connection is broken). Probably some refactoring of the destructor
559 : : // and invokeDestroy function is possible to make this cleaner and more
560 : : // consistent.
561 : 7 : m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
562 : 7 : impl.reset();
563 : 7 : CleanupRun(fns);
564 : : });
565 : : }
566 [ - + ]: 13 : assert(m_context.cleanup_fns.empty());
567 [ + + - + ]: 26 : }
568 : :
569 : : //! If the capnp interface defined a special "destroy" method, as described the
570 : : //! ProxyClientBase class, this method will be called and synchronously destroy
571 : : //! m_impl before returning to the client.
572 : : //!
573 : : //! If the capnp interface does not define a "destroy" method, this will never
574 : : //! be called and the ~ProxyServerBase destructor will be responsible for
575 : : //! deleting m_impl asynchronously, whenever the ProxyServer object gets garbage
576 : : //! collected by Cap'n Proto.
577 : : //!
578 : : //! This method is called in the same way other proxy server methods are called,
579 : : //! via the serverInvoke function. Basically serverInvoke just calls this as a
580 : : //! substitute for a non-existent m_impl->destroy() method. If the destroy
581 : : //! method has any parameters or return values they will be handled in the
582 : : //! normal way by PassField/ReadField/BuildField functions. Particularly if a
583 : : //! Context.thread parameter was passed, this method will run on the worker
584 : : //! thread specified by the client. Otherwise it will run on the EventLoop
585 : : //! thread, like other server methods without an assigned thread.
586 : : template <typename Interface, typename Impl>
587 : 6 : void ProxyServerBase<Interface, Impl>::invokeDestroy()
588 : : {
589 : 6 : m_impl.reset();
590 : 6 : CleanupRun(m_context.cleanup_fns);
591 : 6 : }
592 : :
593 : : //! Map from Connection to local or remote thread handle which will be used over
594 : : //! that connection. This map will typically only contain one entry, but can
595 : : //! contain multiple if a single thread makes IPC calls over multiple
596 : : //! connections. A std::optional value type is used to avoid the map needing to
597 : : //! be locked while ProxyClient<Thread> objects are constructed, see
598 : : //! ThreadContext "Synchronization note" below.
599 : : using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
600 : : using ConnThread = ConnThreads::iterator;
601 : :
602 : : // Retrieve ProxyClient<Thread> object associated with this connection from a
603 : : // map, or create a new one and insert it into the map. Return map iterator and
604 : : // inserted bool.
605 : : std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);
606 : :
607 : : //! The thread_local ThreadContext g_thread_context struct provides information
608 : : //! about individual threads and a way of communicating between them. Because
609 : : //! it's a thread local struct, each ThreadContext instance is initialized by
610 : : //! the thread that owns it.
611 : : //!
612 : : //! ThreadContext is used for any client threads created externally which make
613 : : //! IPC calls, and for server threads created by
614 : : //! ProxyServer<ThreadMap>::makeThread() which execute IPC calls for clients.
615 : : //!
616 : : //! In both cases, the struct holds information like the thread name, and a
617 : : //! Waiter object where the EventLoop can post incoming IPC requests to execute
618 : : //! on the thread. The struct also holds ConnThread maps associating the thread
619 : : //! with local and remote ProxyClient<Thread> objects.
620 : : struct ThreadContext
621 : : {
622 : : //! Identifying string for debug.
623 : : std::string thread_name;
624 : :
625 : : //! Waiter object used to allow remote clients to execute code on this
626 : : //! thread. For server threads created by
627 : : //! ProxyServer<ThreadMap>::makeThread(), this is initialized in that
628 : : //! function. Otherwise, for client threads created externally, this is
629 : : //! initialized the first time the thread tries to make an IPC call. Having
630 : : //! a waiter is necessary for threads making IPC calls in case a server they
631 : : //! are calling expects them to execute a callback during the call, before
632 : : //! it sends a response.
633 : : //!
634 : : //! For IPC client threads, the Waiter pointer is never cleared and the Waiter
635 : : //! just gets destroyed when the thread does. For server threads created by
636 : : //! makeThread(), this pointer is set to null in the ~ProxyServer<Thread> as
637 : : //! a signal for the thread to exit and destroy itself. In both cases, the
638 : : //! same Waiter object is used across different calls and only created and
639 : : //! destroyed once for the lifetime of the thread.
640 : : std::unique_ptr<Waiter> waiter = nullptr;
641 : :
642 : : //! When client is making a request to a server, this is the
643 : : //! `callbackThread` argument it passes in the request, used by the server
644 : : //! in case it needs to make callbacks into the client that need to execute
645 : : //! while the client is waiting. This will be set to a local thread object.
646 : : //!
647 : : //! Synchronization note: The callback_thread and request_thread maps are
648 : : //! only ever accessed internally by this thread's destructor and externally
649 : : //! by Cap'n Proto event loop threads. Since it's possible for IPC client
650 : : //! threads to make calls over different connections that could have
651 : : //! different event loops, these maps are guarded by Waiter::m_mutex in case
652 : : //! different event loop threads add or remove map entries simultaneously.
653 : : //! However, individual ProxyClient<Thread> objects in the maps will only be
654 : : //! associated with one event loop and guarded by EventLoop::m_mutex. So
655 : : //! Waiter::m_mutex does not need to be held while accessing individual
656 : : //! ProxyClient<Thread> instances, and may even need to be released to
657 : : //! respect lock order and avoid locking Waiter::m_mutex before
658 : : //! EventLoop::m_mutex.
659 : : ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);
660 : :
661 : : //! When client is making a request to a server, this is the `thread`
662 : : //! argument it passes in the request, used to control which thread on
663 : : //! server will be responsible for executing it. If client call is being
664 : : //! made from a local thread, this will be a remote thread object returned
665 : : //! by makeThread. If a client call is being made from a thread currently
666 : : //! handling a server request, this will be set to the `callbackThread`
667 : : //! request thread argument passed in that request.
668 : : //!
669 : : //! Synchronization note: \ref callback_threads note applies here as well.
670 : : ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
671 : :
672 : : //! Whether this thread is a capnp event loop thread. Not really used except
673 : : //! to assert false if there's an attempt to execute a blocking operation
674 : : //! which could deadlock the thread.
675 : : bool loop_thread = false;
676 : : };
677 : :
678 : : //! Given stream file descriptor, make a new ProxyClient object to send requests
679 : : //! over the stream. Also create a new Connection object embedded in the
680 : : //! client that is freed when the client is closed.
681 : : template <typename InitInterface>
682 : 6 : std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
683 : : {
684 : 6 : typename InitInterface::Client init_client(nullptr);
685 : 6 : std::unique_ptr<Connection> connection;
686 [ + - ]: 6 : loop.sync([&] {
687 : 6 : auto stream =
688 : 6 : loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
689 [ + - ]: 6 : connection = std::make_unique<Connection>(loop, kj::mv(stream));
690 [ + - + - : 12 : init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
+ - + - +
- + - +
- ]
691 [ + - ]: 6 : Connection* connection_ptr = connection.get();
692 [ + - ]: 6 : connection->onDisconnect([&loop, connection_ptr] {
693 [ # # # # : 0 : MP_LOG(loop, Log::Warning) << "IPC client: unexpected network disconnect.";
# # # # #
# # # ]
694 [ # # # # ]: 0 : delete connection_ptr;
695 : : });
696 : 6 : });
697 : : return std::make_unique<ProxyClient<InitInterface>>(
698 [ + - ]: 6 : kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
699 [ + - ]: 6 : }
700 : :
701 : : //! Given stream and init objects, construct a new ProxyServer object that
702 : : //! handles requests from the stream by calling the init object. Embed the
703 : : //! ProxyServer in a Connection object that is stored and erased if
704 : : //! disconnected. This should be called from the event loop thread.
705 : : template <typename InitInterface, typename InitImpl>
706 : 6 : void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
707 : : {
708 : 6 : loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
709 : : // Disable deleter so proxy server object doesn't attempt to delete the
710 : : // init implementation when the proxy client is destroyed or
711 : : // disconnected.
712 [ + - - + ]: 6 : return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
713 : : });
714 : 6 : auto it = loop.m_incoming_connections.begin();
715 : 12 : it->onDisconnect([&loop, it] {
716 [ + - + - : 18 : MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
+ - + - +
- + - ]
717 : 6 : loop.m_incoming_connections.erase(it);
718 : : });
719 : 6 : }
720 : :
721 : : //! Given connection receiver and an init object, handle incoming connections by
722 : : //! calling _Serve, to create ProxyServer objects and forward requests to the
723 : : //! init object.
724 : : template <typename InitInterface, typename InitImpl>
725 : 7 : void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
726 : : {
727 : 7 : auto* ptr = listener.get();
728 [ + - ]: 14 : loop.m_task_set->add(ptr->accept().then(
729 [ + - + - : 21 : [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
+ - - + ]
730 : 5 : _Serve<InitInterface>(loop, kj::mv(stream), init);
731 : 5 : _Listen<InitInterface>(loop, kj::mv(listener), init);
732 : : }));
733 : 7 : }
734 : :
735 : : //! Given stream file descriptor and an init object, handle requests on the
736 : : //! stream by calling methods on the Init object.
737 : : template <typename InitInterface, typename InitImpl>
738 : 1 : void ServeStream(EventLoop& loop, int fd, InitImpl& init)
739 : : {
740 [ + - ]: 1 : _Serve<InitInterface>(
741 : 1 : loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
742 : 1 : }
743 : :
744 : : //! Given listening socket file descriptor and an init object, handle incoming
745 : : //! connections and requests by calling methods on the Init object.
746 : : template <typename InitInterface, typename InitImpl>
747 : 2 : void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
748 : : {
749 : 2 : loop.sync([&]() {
750 [ + - ]: 2 : _Listen<InitInterface>(loop,
751 : 4 : loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
752 : : init);
753 : : });
754 : 2 : }
755 : :
756 [ # # # # : 84 : extern thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ - - - -
- - - - -
- - - - -
- - - - ]
[ - - - -
- - - - -
- - - # #
# # # # ]
[ + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - +
- + - +
- ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
757 : : // Silence nonstandard bitcoin tidy error "Variable with non-trivial destructor
758 : : // cannot be thread_local" which should not be a problem on modern platforms, and
759 : : // could lead to a small memory leak at worst on older ones.
760 : :
761 : : } // namespace mp
762 : :
763 : : #endif // MP_PROXY_IO_H
|