Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef MP_PROXY_IO_H
6 : : #define MP_PROXY_IO_H
7 : :
8 : : #include <mp/proxy.h>
9 : : #include <mp/util.h>
10 : :
11 : : #include <mp/proxy.capnp.h>
12 : :
13 : : #include <capnp/rpc-twoparty.h>
14 : :
15 : : #include <assert.h>
16 : : #include <condition_variable>
17 : : #include <functional>
18 : : #include <kj/function.h>
19 : : #include <map>
20 : : #include <memory>
21 : : #include <optional>
22 : : #include <sstream>
23 : : #include <string>
24 : : #include <thread>
25 : :
26 : : namespace mp {
27 : : struct ThreadContext;
28 : :
29 : : struct InvokeContext
30 : : {
31 : : Connection& connection;
32 : : };
33 : :
34 : : struct ClientInvokeContext : InvokeContext
35 : : {
36 : : ThreadContext& thread_context;
37 : 30 : ClientInvokeContext(Connection& conn, ThreadContext& thread_context)
38 [ + - + - : 30 : : InvokeContext{conn}, thread_context{thread_context}
+ - + - +
- ][ + - +
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
+ - + - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
39 : : {
40 : : }
41 : : };
42 : :
43 : : template <typename ProxyServer, typename CallContext_>
44 : : struct ServerInvokeContext : InvokeContext
45 : : {
46 : : using CallContext = CallContext_;
47 : :
48 : : ProxyServer& proxy_server;
49 : : CallContext& call_context;
50 : : int req;
51 : : //! For IPC methods that execute asynchronously, not on the event-loop
52 : : //! thread: lock preventing the event-loop thread from freeing the params or
53 : : //! results structs if the request is canceled while the worker thread is
54 : : //! reading params (`call_context.getParams()`) or writing results
55 : : //! (`call_context.getResults()`).
56 : : Lock* cancel_lock{nullptr};
57 : : //! For IPC methods that execute asynchronously, not on the event-loop
58 : : //! thread, this is set to true if the IPC call was canceled by the client
59 : : //! or canceled by a disconnection. If the call runs on the event-loop
60 : : //! thread, it can't be canceled. This should be accessed with cancel_lock
61 : : //! held if it is not null, since in the asynchronous case it is accessed
62 : : //! from multiple threads.
63 : : bool request_canceled{false};
64 : :
65 : 48 : ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
66 [ + - + - : 30 : : InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
+ - + - +
- + - # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
- + - +
- ][ + - +
- # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ]
67 : : {
68 : : }
69 : : };
70 : :
71 : : template <typename Interface, typename Params, typename Results>
72 : : using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
73 : :
74 : : template <>
75 : : struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
76 : : {
77 [ + - ]: 30 : using ProxyClientBase::ProxyClientBase;
78 : : // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
79 : : ProxyClient(const ProxyClient&) = delete;
80 : : ~ProxyClient();
81 : :
82 : : //! Reference to callback function that is run if there is a sudden
83 : : //! disconnect and the Connection object is destroyed before this
84 : : //! ProxyClient<Thread> object. The callback will destroy this object and
85 : : //! remove its entry from the thread's request_threads or callback_threads
86 : : //! map. It will also reset m_disconnect_cb so the destructor does not
87 : : //! access it. In the normal case where there is no sudden disconnect, the
88 : : //! destructor will unregister m_disconnect_cb so the callback is never run.
89 : : //! Since this variable is accessed from multiple threads, accesses should
90 : : //! be guarded with the associated Waiter::m_mutex.
91 : : std::optional<CleanupIt> m_disconnect_cb;
92 : : };
93 : :
94 : : template <>
95 : : struct ProxyServer<Thread> final : public Thread::Server
96 : : {
97 : : public:
98 : : ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread);
99 : : ~ProxyServer();
100 : : kj::Promise<void> getName(GetNameContext context) override;
101 : :
102 : : //! Run a callback function fn returning T on this thread. The function will
103 : : //! be queued and executed as soon as the thread is idle, and when fn
104 : : //! returns, the promise returned by this method will be fulfilled with the
105 : : //! value fn returned.
106 : : template<typename T, typename Fn>
107 : : kj::Promise<T> post(Fn&& fn);
108 : :
109 : : EventLoopRef m_loop;
110 : : ThreadContext& m_thread_context;
111 : : std::thread m_thread;
112 : : //! Promise signaled when m_thread_context.waiter is ready and there is no
113 : : //! post() callback function waiting to execute.
114 : : kj::Promise<void> m_thread_ready{kj::READY_NOW};
115 : : };
116 : :
117 : : //! Handler for kj::TaskSet failed task events.
118 : : class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
119 : : {
120 : : public:
121 [ + - ]: 17 : LoggingErrorHandler(EventLoop& loop) : m_loop(loop) {}
122 : : void taskFailed(kj::Exception&& exception) override;
123 : : EventLoop& m_loop;
124 : : };
125 : :
126 : : //! Log flags. Update stringify function if changed!
127 : : enum class Log {
128 : : Trace = 0,
129 : : Debug,
130 : : Info,
131 : : Warning,
132 : : Error,
133 : : Raise,
134 : : };
135 : :
136 : : kj::StringPtr KJ_STRINGIFY(Log flags);
137 : :
138 [ + - ]: 630 : struct LogMessage {
139 : :
140 : : //! Message to be logged
141 : : std::string message;
142 : :
143 : : //! The severity level of this message
144 : : Log level;
145 : : };
146 : :
147 : : using LogFn = std::function<void(LogMessage)>;
148 : :
149 [ + - ]: 11 : struct LogOptions {
150 : :
151 : : //! External logging callback.
152 : : LogFn log_fn;
153 : :
154 : : //! Maximum number of characters to use when representing
155 : : //! request and response structs as strings.
156 : : size_t max_chars{200};
157 : :
158 : : //! Messages with a severity level less than log_level will not be
159 : : //! reported.
160 : : Log log_level{Log::Trace};
161 : : };
162 : :
163 : : class Logger
164 : : {
165 : : public:
166 [ - - - - : 315 : Logger(const LogOptions& options, Log log_level) : m_options(options), m_log_level(log_level) {}
- - - - -
- - - - -
- - - - -
- - - - -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ - - - -
+ - + - -
- - - + -
+ - - - -
- + - + -
- - - - +
- + - - -
- - + - +
- - - - -
+ - + - ]
[ # # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ - - - -
+ - + - -
- - - + -
+ - # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- - - - -
- - - - -
- - - - -
- - + - +
- - - - -
+ - + - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- - - - -
- - - - ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
167 : :
168 : : Logger(Logger&&) = delete;
169 : : Logger& operator=(Logger&&) = delete;
170 : : Logger(const Logger&) = delete;
171 : : Logger& operator=(const Logger&) = delete;
172 : :
173 : 315 : ~Logger() noexcept(false)
174 : : {
175 [ + - ]: 630 : if (enabled()) m_options.log_fn({std::move(m_buffer).str(), m_log_level});
176 [ + - ]: 630 : }
177 : :
178 : : template <typename T>
179 : 1495 : friend Logger& operator<<(Logger& logger, T&& value)
180 : : {
181 [ - + ]: 1495 : if (logger.enabled()) logger.m_buffer << std::forward<T>(value);
182 : 1495 : return logger;
183 : : }
184 : :
185 : : template <typename T>
186 : : friend Logger& operator<<(Logger&& logger, T&& value)
187 : : {
188 : : return logger << std::forward<T>(value);
189 : : }
190 : :
191 : 315 : explicit operator bool() const
192 : : {
193 [ + - + - : 636 : return enabled();
- - - - -
- - - + -
+ - + - +
- - - - -
- - - - +
- + - + -
+ - - - -
- - - - -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - +
- + - - -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- - - - -
- - - - -
- - - + -
+ - + - +
- + - + -
- - - - -
- - - - -
- - + - +
- + - + -
+ - + - +
- - - - -
- - - - -
- + - + -
+ - + - +
- + - ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # ][ + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - - - -
- - - - -
- - - - -
- - - + -
- - - - -
- - - + -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
+ - + - -
- - - - -
- - + - +
- + - + -
- - - - -
- - - + -
+ - ][ + -
- - - - -
- - - + -
- - - - -
- - - + -
- - - - -
- - - + -
- - - - -
- - - + -
+ - - - -
- - - - -
+ - + - +
- + - - -
- - - - -
- + - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ + - +
- + - + -
- - # # #
# # # ]
194 : : }
195 : :
196 : : private:
197 : 2125 : bool enabled() const
198 : : {
199 [ + - + - : 2125 : return m_options.log_fn && m_log_level >= m_options.log_level;
+ - + - +
- + - + -
+ - + - +
- - - - -
+ - + - +
- + - - -
- - - - -
- - - - -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - -
- - - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- - - - -
- - - - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- - - - -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- + - +
- ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
- - - - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - + - +
- - - - -
- - - - -
- - - - -
- - + - +
- + - + -
- - - - -
- - - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- - - - -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - -
- - - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - - - -
- - - - -
- - - - +
- + - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
- - - - -
- - - - -
- - + - +
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - -
- - - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- # # #
# ][ - - -
- + - - -
- - - - -
- - - - -
- - - - -
- - - - -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
- - - - -
- - - - -
- - - - -
- - + - +
- - - - -
- - - - -
- - - + -
+ - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
- - - - -
- - - - -
- - + - +
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - + - +
- + - + -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
- - - - ]
200 : : }
201 : :
202 : : const LogOptions& m_options;
203 : : Log m_log_level;
204 : : std::ostringstream m_buffer;
205 : : };
206 : :
207 : : #define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
208 : :
209 : : #define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
210 : :
211 : : std::string LongThreadName(const char* exe_name);
212 : :
213 : : //! Event loop implementation.
214 : : //!
215 : : //! Cap'n Proto threading model is very simple: all I/O operations are
216 : : //! asynchronous and must be performed on a single thread. This includes:
217 : : //!
218 : : //! - Code starting an asynchronous operation (calling a function that returns a
219 : : //! promise object)
220 : : //! - Code notifying that an asynchronous operation is complete (code using a
221 : : //! fulfiller object)
222 : : //! - Code handling a completed operation (code chaining or waiting for a promise)
223 : : //!
224 : : //! All of this code needs to access shared state, and there is no mutex that
225 : : //! can be acquired to lock this state because Cap'n Proto
226 : : //! assumes it will only be accessed from one thread. So all this code needs to
227 : : //! actually run on one thread, and the EventLoop::loop() method is the entry point for
228 : : //! this thread. ProxyClient and ProxyServer objects that use other threads and
229 : : //! need to perform I/O operations post to this thread using EventLoop::post()
230 : : //! and EventLoop::sync() methods.
231 : : //!
232 : : //! Specifically, because ProxyClient methods can be called from arbitrary
233 : : //! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
234 : : //! methods use the EventLoop thread to send requests, and ProxyServer methods
235 : : //! use the thread to return results.
236 : : //!
237 : : //! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
238 : : class EventLoop
239 : : {
240 : : public:
241 : : //! Construct event loop object with default logging options.
242 : 1 : EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr)
243 [ + - ]: 2 : : EventLoop(exe_name, LogOptions{std::move(log_fn)}, context){}
244 : :
245 : : //! Construct event loop object with specified logging options.
246 : : EventLoop(const char* exe_name, LogOptions log_opts, void* context = nullptr);
247 : :
248 : : //! Backwards-compatible constructor for previous (deprecated) logging callback signature
249 : 1 : EventLoop(const char* exe_name, std::function<void(bool, std::string)> old_callback, void* context = nullptr)
250 : 1 : : EventLoop(exe_name,
251 [ - - + - : 110 : LogFn{[old_callback = std::move(old_callback)](LogMessage log_data) {old_callback(log_data.level == Log::Raise, std::move(log_data.message));}},
+ - ]
252 [ + - ]: 2 : context){}
253 : :
254 : : ~EventLoop();
255 : :
256 : : //! Run event loop. Does not return until shutdown. This should only be
257 : : //! called once from the m_thread_id thread. This will block until
258 : : //! the m_num_clients reference count is 0.
259 : : void loop();
260 : :
261 : : //! Run function on event loop thread. Does not return until function completes.
262 : : //! Must be called while the loop() function is active.
263 : : void post(kj::Function<void()> fn);
264 : :
265 : : //! Wrapper around EventLoop::post that takes advantage of the
266 : : //! fact that callable will not go out of scope to avoid requirement that it
267 : : //! be copyable.
268 : : template <typename Callable>
269 : 171 : void sync(Callable&& callable)
270 : : {
271 [ + - ]: 171 : post(std::forward<Callable>(callable));
272 : 171 : }
273 : :
274 : : //! Register cleanup function to run on asynchronous worker thread without
275 : : //! blocking the event loop thread.
276 : : void addAsyncCleanup(std::function<void()> fn);
277 : :
278 : : //! Start asynchronous worker thread if necessary. This is only done if
279 : : //! there are ProxyServerBase::m_impl objects that need to be destroyed
280 : : //! asynchronously, without tying up the event loop thread. This can happen
281 : : //! when an interface does not declare a destroy() method that would allow
282 : : //! the client to wait for the destructor to finish and run it on a
283 : : //! dedicated thread. It can also happen whenever this is a broken
284 : : //! connection and the client is no longer around to call the destructors
285 : : //! and the server objects need to be garbage collected. In both cases, it
286 : : //! is important that ProxyServer::m_impl destructors do not run on the
287 : : //! eventloop thread because they may need it to do I/O if they perform
288 : : //! other IPC calls.
289 : : void startAsyncThread() MP_REQUIRES(m_mutex);
290 : :
291 : : //! Check if loop should exit.
292 : : bool done() const MP_REQUIRES(m_mutex);
293 : :
294 : : //! Process name included in thread names so combined debug output from
295 : : //! multiple processes is easier to understand.
296 : : const char* m_exe_name;
297 : :
298 : : //! ID of the event loop thread
299 : : std::thread::id m_thread_id = std::this_thread::get_id();
300 : :
301 : : //! Handle of an async worker thread. Joined on destruction. Unset if async
302 : : //! method has not been called.
303 : : std::thread m_async_thread;
304 : :
305 : : //! Callback function to run on event loop thread during post() or sync() call.
306 : : kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
307 : :
308 : : //! Callback functions to run on async thread.
309 : : std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
310 : :
311 : : //! Pipe read handle used to wake up the event loop thread.
312 : : int m_wait_fd = -1;
313 : :
314 : : //! Pipe write handle used to wake up the event loop thread.
315 : : int m_post_fd = -1;
316 : :
317 : : //! Number of clients holding references to ProxyServerBase objects that
318 : : //! reference this event loop.
319 : : int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
320 : :
321 : : //! Mutex and condition variable used to post tasks to event loop and async
322 : : //! thread.
323 : : Mutex m_mutex;
324 : : std::condition_variable m_cv;
325 : :
326 : : //! Capnp IO context.
327 : : kj::AsyncIoContext m_io_context;
328 : :
329 : : //! Capnp error handler. Needs to outlive m_task_set.
330 : : LoggingErrorHandler m_error_handler{*this};
331 : :
332 : : //! Capnp list of pending promises.
333 : : std::unique_ptr<kj::TaskSet> m_task_set;
334 : :
335 : : //! List of connections.
336 : : std::list<Connection> m_incoming_connections;
337 : :
338 : : //! Logging options
339 : : LogOptions m_log_opts;
340 : :
341 : : //! External context pointer.
342 : : void* m_context;
343 : : };
344 : :
345 : : //! Single element task queue used to handle recursive capnp calls. (If the
346 : : //! server makes a callback into the client in the middle of a request, while the client
347 : : //! thread is blocked waiting for server response, this is what allows the
348 : : //! client to run the request in the same thread, the same way code would run in a
349 : : //! single process, with the callback sharing the same thread stack as the original
350 : : //! call.) To support this, the clientInvoke function calls Waiter::wait() to
351 : : //! block the client IPC thread while initial request is in progress. Then if
352 : : //! there is a callback, it is executed with Waiter::post().
353 : : //!
354 : : //! The Waiter class is also used server-side by `ProxyServer<Thread>::post()`
355 : : //! to execute IPC calls on worker threads.
356 : 7 : struct Waiter
357 : : {
358 : 7 : Waiter() = default;
359 : :
360 : : template <typename Fn>
361 : 18 : bool post(Fn&& fn)
362 : : {
363 : 18 : const Lock lock(m_mutex);
364 [ + - ]: 18 : if (m_fn) return false;
365 [ + - ]: 18 : m_fn = std::forward<Fn>(fn);
366 : 18 : m_cv.notify_all();
367 : 18 : return true;
368 : 18 : }
369 : :
370 : : template <class Predicate>
371 : 36 : void wait(Lock& lock, Predicate pred)
372 : : {
373 : 95 : m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
374 : : // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
375 : : // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
376 : : // after the fn() call and before the lock.lock() call in this loop
377 : : // in the case where a capnp response is sent and a brand new
378 : : // request is immediately received.
379 [ - + - + : 107 : while (m_fn) {
- + - + -
+ - + ][ -
+ - + # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - +
- + # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
[ + + ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
380 : 18 : auto fn = std::move(*m_fn);
381 : 18 : m_fn.reset();
382 [ # # # # : 18 : Unlock(lock, fn);
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
[ + - ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
383 : : }
384 : 89 : const bool done = pred();
385 : 89 : return done;
386 : : });
387 : 36 : }
388 : :
389 : : //! Mutex mainly used internally by waiter class, but also used externally
390 : : //! to guard access to related state. Specifically, since the thread_local
391 : : //! ThreadContext struct owns a Waiter, the Waiter::m_mutex is used to guard
392 : : //! access to other parts of the struct to avoid needing to deal with more
393 : : //! mutexes than necessary. This mutex can be held at the same time as
394 : : //! EventLoop::m_mutex as long as Waiter::mutex is locked first and
395 : : //! EventLoop::m_mutex is locked second.
396 : : Mutex m_mutex;
397 : : std::condition_variable m_cv;
398 : : std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
399 : : };
400 : :
401 : : //! Object holding network & rpc state associated with either an incoming server
402 : : //! connection, or an outgoing client connection. It must be created and destroyed
403 : : //! on the event loop thread.
404 : : //! In addition to Cap'n Proto state, it also holds lists of callbacks to run
405 : : //! when the connection is closed.
406 : : class Connection
407 : : {
408 : : public:
409 : 7 : Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
410 : 7 : : m_loop(loop), m_stream(kj::mv(stream_)),
411 [ + - + - ]: 7 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
412 [ + - + - : 21 : m_rpc_system(::capnp::makeRpcClient(m_network)) {}
+ - + - ]
413 : 7 : Connection(EventLoop& loop,
414 : : kj::Own<kj::AsyncIoStream>&& stream_,
415 : : const std::function<::capnp::Capability::Client(Connection&)>& make_client)
416 : 7 : : m_loop(loop), m_stream(kj::mv(stream_)),
417 [ + - + - ]: 7 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
418 [ + - + - : 21 : m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
+ - + - +
- + - ]
419 : :
420 : : //! Run cleanup functions. Must be called from the event loop thread. First
421 : : //! calls synchronous cleanup functions while blocked (to free capnp
422 : : //! Capability::Client handles owned by ProxyClient objects), then schedules
423 : : //! asynchronous cleanup functions to run in a worker thread (to run
424 : : //! destructors of m_impl instances owned by ProxyServer objects).
425 : : ~Connection();
426 : :
427 : : //! Register synchronous cleanup function to run on event loop thread (with
428 : : //! access to capnp thread local variables) when disconnect() is called.
429 : : //! any new i/o.
430 : : CleanupIt addSyncCleanup(std::function<void()> fn);
431 : : void removeSyncCleanup(CleanupIt it);
432 : :
433 : : //! Add disconnect handler.
434 : : template <typename F>
435 : 13 : void onDisconnect(F&& f)
436 : : {
437 : : // Add disconnect handler to local TaskSet to ensure it is canceled and
438 : : // will never run after connection object is destroyed. But when disconnect
439 : : // handler fires, do not call the function f right away, instead add it
440 : : // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
441 : : // error in the typical case where f deletes this Connection object.
442 [ + - - + ]: 39 : m_on_disconnect.add(m_network.onDisconnect().then(
443 [ + - + - ]: 27 : [f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
[ + - ]
444 : 13 : }
445 : :
446 : : EventLoopRef m_loop;
447 : : kj::Own<kj::AsyncIoStream> m_stream;
448 : : LoggingErrorHandler m_error_handler{*m_loop};
449 : : //! TaskSet used to cancel the m_network.onDisconnect() handler for remote
450 : : //! disconnections, if the connection is closed locally first by deleting
451 : : //! this Connection object.
452 : : kj::TaskSet m_on_disconnect{m_error_handler};
453 : : ::capnp::TwoPartyVatNetwork m_network;
454 : : std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
455 : :
456 : : // ThreadMap interface client, used to create a remote server thread when an
457 : : // client IPC call is being made for the first time from a new thread.
458 : : ThreadMap::Client m_thread_map{nullptr};
459 : :
460 : : //! Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by
461 : : //! ThreadMap.makeThread) used to service requests to clients.
462 : : ::capnp::CapabilityServerSet<Thread> m_threads;
463 : :
464 : : //! Canceler for canceling promises that we want to discard when the
465 : : //! connection is destroyed. This is used to interrupt method calls that are
466 : : //! still executing at time of disconnection.
467 : : kj::Canceler m_canceler;
468 : :
469 : : //! Cleanup functions to run if connection is broken unexpectedly. List
470 : : //! will be empty if all ProxyClient are destroyed cleanly before the
471 : : //! connection is destroyed.
472 : : CleanupList m_sync_cleanup_fns;
473 : : };
474 : :
475 : : //! Vat id for server side of connection. Required argument to RpcSystem::bootStrap()
476 : : //!
477 : : //! "Vat" is Cap'n Proto nomenclature for a host of various objects that facilitates
478 : : //! bidirectional communication with other vats; it is often but not always 1-1 with
479 : : //! processes. Cap'n Proto doesn't reference clients or servers per se; instead everything
480 : : //! is just a vat.
481 : : //!
482 : : //! See also: https://github.com/capnproto/capnproto/blob/9021f0c722b36cb11e3690b0860939255ebad39c/c%2B%2B/src/capnp/rpc.capnp#L42-L56
483 [ + - + - ]: 1 : struct ServerVatId
484 : : {
485 : : ::capnp::word scratch[4]{};
486 : : ::capnp::MallocMessageBuilder message{scratch};
487 : : ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
488 [ + - ]: 7 : ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
489 : : };
490 : :
491 : : template <typename Interface, typename Impl>
492 : 43 : ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client client,
493 : : Connection* connection,
494 : : bool destroy_connection)
495 [ + - ]: 43 : : m_client(std::move(client)), m_context(connection)
496 : :
497 : : {
498 : : // Handler for the connection getting destroyed before this client object.
499 [ + - ]: 43 : auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
500 : : // Release client capability by move-assigning to temporary.
501 : : {
502 : 0 : typename Interface::Client(std::move(m_client));
503 : : }
504 : 0 : Lock lock{m_context.loop->m_mutex};
505 : 0 : m_context.connection = nullptr;
506 : 0 : });
507 : :
508 : : // Two shutdown sequences are supported:
509 : : //
510 : : // - A normal sequence where client proxy objects are deleted by external
511 : : // code that no longer needs them
512 : : //
513 : : // - A garbage collection sequence where the connection or event loop shuts
514 : : // down while external code is still holding client references.
515 : : //
516 : : // The first case is handled here when m_context.connection is not null. The
517 : : // second case is handled by the disconnect_cb function, which sets
518 : : // m_context.connection to null so nothing happens here.
519 [ + - ]: 55 : m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
520 : : {
521 : : // If the capnp interface defines a destroy method, call it to destroy
522 : : // the remote object, waiting for it to be deleted server side. If the
523 : : // capnp interface does not define a destroy method, this will just call
524 : : // an empty stub defined in the ProxyClientBase class and do nothing.
525 : 6 : Sub::destroy(*this);
526 : :
527 : : // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
528 : 86 : m_context.loop->sync([&]() {
529 : : // Remove disconnect callback on cleanup so it doesn't run and try
530 : : // to access this object after it's destroyed. This call needs to
531 : : // run inside loop->sync() on the event loop thread because
532 : : // otherwise, if there were an ill-timed disconnect, the
533 : : // onDisconnect handler could fire and delete the Connection object
534 : : // before the removeSyncCleanup call.
535 [ + - # # ]: 43 : if (m_context.connection) m_context.connection->removeSyncCleanup(disconnect_cb);
[ - - + - ]
[ # # # #
# # ]
536 : :
537 : : // Release client capability by move-assigning to temporary.
538 : : {
539 : 43 : typename Interface::Client(std::move(m_client));
540 : : }
541 [ + + # # ]: 43 : if (destroy_connection) {
[ - - - + ]
[ # # # #
# # ]
542 [ + - + - : 7 : delete m_context.connection;
# # # # ]
[ # # # #
# # # # ]
[ # # # #
# # # # #
# # # ]
543 : 7 : m_context.connection = nullptr;
544 : : }
545 : : });
546 : : }
547 : : });
548 [ + - ]: 43 : Sub::construct(*this);
549 : 43 : }
550 : :
551 : : template <typename Interface, typename Impl>
552 : 43 : ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
553 : : {
554 : 43 : CleanupRun(m_context.cleanup_fns);
555 : 43 : }
556 : :
557 : : template <typename Interface, typename Impl>
558 : 13 : ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
559 [ + - ]: 13 : : m_impl(std::move(impl)), m_context(&connection)
560 : : {
561 [ - + ]: 13 : assert(m_impl);
562 [ - - ]: 13 : }
563 : :
564 : : //! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
565 : : //! garbage collection code after there are no more references to this object.
566 : : //! This will typically happen when the corresponding ProxyClient object on the
567 : : //! other side of the connection is destroyed. It can also happen earlier if the
568 : : //! connection is broken or destroyed. In the latter case this destructor will
569 : : //! typically be called inside m_rpc_system.reset() call in the ~Connection
570 : : //! destructor while the Connection object still exists. However, because
571 : : //! ProxyServer objects are refcounted, and the Connection object could be
572 : : //! destroyed while asynchronous IPC calls are still in-flight, it's possible
573 : : //! for this destructor to be called after the Connection object no longer
574 : : //! exists, so it is NOT valid to dereference the m_context.connection pointer
575 : : //! from this function.
576 : : template <typename Interface, typename Impl>
577 : 13 : ProxyServerBase<Interface, Impl>::~ProxyServerBase()
578 : : {
579 : 13 : if (m_impl) {
580 : : // If impl is non-null at this point, it means no client is waiting for
581 : : // the m_impl server object to be destroyed synchronously. This can
582 : : // happen either if the interface did not define a "destroy" method (see
583 : : // invokeDestroy method below), or if a destroy method was defined, but
584 : : // the connection was broken before it could be called.
585 : : //
586 : : // In either case, be conservative and run the cleanup on an
587 : : // asynchronous thread, to avoid destructors or cleanup functions
588 : : // blocking or deadlocking the current EventLoop thread, since they
589 : : // could be making IPC calls.
590 : : //
591 : : // Technically this is a little too conservative since if the interface
592 : : // defines a "destroy" method, but the destroy method does not accept a
593 : : // Context parameter specifying a worker thread, the cleanup method
594 : : // would run on the EventLoop thread normally (when connection is
595 : : // unbroken), but will not run on the EventLoop thread now (when
596 : : // connection is broken). Probably some refactoring of the destructor
597 : : // and invokeDestroy function is possible to make this cleaner and more
598 : : // consistent.
599 : 7 : m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
600 : 7 : impl.reset();
601 : 7 : CleanupRun(fns);
602 : : });
603 : : }
604 [ - + ]: 13 : assert(m_context.cleanup_fns.empty());
605 [ + + - + ]: 26 : }
606 : :
607 : : //! If the capnp interface defined a special "destroy" method, as described the
608 : : //! ProxyClientBase class, this method will be called and synchronously destroy
609 : : //! m_impl before returning to the client.
610 : : //!
611 : : //! If the capnp interface does not define a "destroy" method, this will never
612 : : //! be called and the ~ProxyServerBase destructor will be responsible for
613 : : //! deleting m_impl asynchronously, whenever the ProxyServer object gets garbage
614 : : //! collected by Cap'n Proto.
615 : : //!
616 : : //! This method is called in the same way other proxy server methods are called,
617 : : //! via the serverInvoke function. Basically serverInvoke just calls this as a
618 : : //! substitute for a non-existent m_impl->destroy() method. If the destroy
619 : : //! method has any parameters or return values they will be handled in the
620 : : //! normal way by PassField/ReadField/BuildField functions. Particularly if a
621 : : //! Context.thread parameter was passed, this method will run on the worker
622 : : //! thread specified by the client. Otherwise it will run on the EventLoop
623 : : //! thread, like other server methods without an assigned thread.
624 : : template <typename Interface, typename Impl>
625 : 6 : void ProxyServerBase<Interface, Impl>::invokeDestroy()
626 : : {
627 : 6 : m_impl.reset();
628 : 6 : CleanupRun(m_context.cleanup_fns);
629 : 6 : }
630 : :
631 : : //! Map from Connection to local or remote thread handle which will be used over
632 : : //! that connection. This map will typically only contain one entry, but can
633 : : //! contain multiple if a single thread makes IPC calls over multiple
634 : : //! connections. A std::optional value type is used to avoid the map needing to
635 : : //! be locked while ProxyClient<Thread> objects are constructed, see
636 : : //! ThreadContext "Synchronization note" below.
637 : : using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
638 : : using ConnThread = ConnThreads::iterator;
639 : :
640 : : // Retrieve ProxyClient<Thread> object associated with this connection from a
641 : : // map, or create a new one and insert it into the map. Return map iterator and
642 : : // inserted bool.
643 : : std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);
644 : :
645 : : //! The thread_local ThreadContext g_thread_context struct provides information
646 : : //! about individual threads and a way of communicating between them. Because
647 : : //! it's a thread local struct, each ThreadContext instance is initialized by
648 : : //! the thread that owns it.
649 : : //!
650 : : //! ThreadContext is used for any client threads created externally which make
651 : : //! IPC calls, and for server threads created by
652 : : //! ProxyServer<ThreadMap>::makeThread() which execute IPC calls for clients.
653 : : //!
654 : : //! In both cases, the struct holds information like the thread name, and a
655 : : //! Waiter object where the EventLoop can post incoming IPC requests to execute
656 : : //! on the thread. The struct also holds ConnThread maps associating the thread
657 : : //! with local and remote ProxyClient<Thread> objects.
658 : : struct ThreadContext
659 : : {
660 : : //! Identifying string for debug.
661 : : std::string thread_name;
662 : :
663 : : //! Waiter object used to allow remote clients to execute code on this
664 : : //! thread. For server threads created by
665 : : //! ProxyServer<ThreadMap>::makeThread(), this is initialized in that
666 : : //! function. Otherwise, for client threads created externally, this is
667 : : //! initialized the first time the thread tries to make an IPC call. Having
668 : : //! a waiter is necessary for threads making IPC calls in case a server they
669 : : //! are calling expects them to execute a callback during the call, before
670 : : //! it sends a response.
671 : : //!
672 : : //! For IPC client threads, the Waiter pointer is never cleared and the Waiter
673 : : //! just gets destroyed when the thread does. For server threads created by
674 : : //! makeThread(), this pointer is set to null in the ~ProxyServer<Thread> as
675 : : //! a signal for the thread to exit and destroy itself. In both cases, the
676 : : //! same Waiter object is used across different calls and only created and
677 : : //! destroyed once for the lifetime of the thread.
678 : : std::unique_ptr<Waiter> waiter = nullptr;
679 : :
680 : : //! When client is making a request to a server, this is the
681 : : //! `callbackThread` argument it passes in the request, used by the server
682 : : //! in case it needs to make callbacks into the client that need to execute
683 : : //! while the client is waiting. This will be set to a local thread object.
684 : : //!
685 : : //! Synchronization note: The callback_thread and request_thread maps are
686 : : //! only ever accessed internally by this thread's destructor and externally
687 : : //! by Cap'n Proto event loop threads. Since it's possible for IPC client
688 : : //! threads to make calls over different connections that could have
689 : : //! different event loops, these maps are guarded by Waiter::m_mutex in case
690 : : //! different event loop threads add or remove map entries simultaneously.
691 : : //! However, individual ProxyClient<Thread> objects in the maps will only be
692 : : //! associated with one event loop and guarded by EventLoop::m_mutex. So
693 : : //! Waiter::m_mutex does not need to be held while accessing individual
694 : : //! ProxyClient<Thread> instances, and may even need to be released to
695 : : //! respect lock order and avoid locking Waiter::m_mutex before
696 : : //! EventLoop::m_mutex.
697 : : ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);
698 : :
699 : : //! When client is making a request to a server, this is the `thread`
700 : : //! argument it passes in the request, used to control which thread on
701 : : //! server will be responsible for executing it. If client call is being
702 : : //! made from a local thread, this will be a remote thread object returned
703 : : //! by makeThread. If a client call is being made from a thread currently
704 : : //! handling a server request, this will be set to the `callbackThread`
705 : : //! request thread argument passed in that request.
706 : : //!
707 : : //! Synchronization note: \ref callback_threads note applies here as well.
708 : : ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
709 : :
710 : : //! Whether this thread is a capnp event loop thread. Not really used except
711 : : //! to assert false if there's an attempt to execute a blocking operation
712 : : //! which could deadlock the thread.
713 : : bool loop_thread = false;
714 : : };
715 : :
716 : : template<typename T, typename Fn>
717 : 18 : kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
718 : : {
719 : 18 : auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is ready to post again.
720 [ + - ]: 18 : auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
721 : 18 : CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
722 : : // Keep a reference to the ProxyServer<Thread> instance by assigning it to
723 : : // the self variable. ProxyServer instances are reference-counted and if the
724 : : // client drops its reference, this variable keeps the instance alive until
725 : : // the thread finishes executing. The self variable needs to be destroyed on
726 : : // the event loop thread so it is freed in a sync() call below.
727 [ + - ]: 18 : auto self = thisCap();
728 [ + - + - ]: 54 : auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
729 : 18 : auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
730 [ # # # # : 54 : bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- + - +
- ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
731 : : // Fulfill ready.promise now, as soon as the Waiter starts executing
732 : : // this lambda, so the next ProxyServer<Thread>::post() call can
733 : : // immediately call waiter->post(). It is important to do this
734 : : // before calling fn() because fn() can make an IPC call back to the
735 : : // client, which can make another IPC call to this server thread.
736 : : // (This typically happens when IPC methods take std::function
737 : : // parameters.) When this happens the second call to the server
738 : : // thread should not be blocked waiting for the first call.
739 [ # # # # : 72 : m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- + - +
- ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
740 : 18 : ready_fulfiller->fulfill();
741 : 18 : ready_fulfiller = nullptr;
742 : : });
743 : 18 : std::optional<T> result_value;
744 [ - + ]: 36 : kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
745 [ # # # # : 54 : m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- + - +
- ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
746 : : // Destroy CancelMonitor here before fulfilling or rejecting the
747 : : // promise so it doesn't get triggered when the promise is
748 : : // destroyed.
749 : 18 : cancel_monitor_ptr = nullptr;
750 : : // Send results to the fulfiller. Technically it would be ok to
751 : : // skip this if promise was canceled, but it's simpler to just
752 : : // do it unconditionally.
753 [ # # # # : 18 : KJ_IF_MAYBE(e, exception) {
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ + - + - ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
754 [ # # # # : 0 : assert(!result_value);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ # # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
755 : 0 : result_fulfiller->reject(kj::mv(*e));
756 : : } else {
757 [ # # # # : 18 : assert(result_value);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ - + - + ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
758 : 18 : result_fulfiller->fulfill(kj::mv(*result_value));
759 [ # # # # : 18 : result_value.reset();
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ + - + - ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
760 : : }
761 : 18 : result_fulfiller = nullptr;
762 : : // Use evalLater to destroy the ProxyServer<Thread> self
763 : : // reference, if it is the last reference, because the
764 : : // ProxyServer<Thread> destructor needs to join the thread,
765 : : // which can't happen until this sync() block has exited.
766 [ # # # # : 72 : m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- + - + -
+ - + - ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
767 : : });
768 : 18 : });
769 : : // Assert that calling Waiter::post did not fail. It could only return
770 : : // false if a new function was posted before the previous one finished
771 : : // executing, but new functions are only posted when m_thread_ready is
772 : : // signaled, so this should never happen.
773 [ # # # # : 18 : assert(posted);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ - + - + ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
774 [ # # # # : 18 : return kj::mv(result.promise);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ + - + - ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
775 [ # # # # : 36 : }).attach(kj::heap<CancelProbe>(cancel_monitor));
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ + - + - ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
776 [ - + ]: 18 : m_thread_ready = kj::mv(ready.promise);
777 [ + - ]: 36 : return ret;
778 [ + - + - : 18 : }
- - ]
779 : :
780 : : //! Given stream file descriptor, make a new ProxyClient object to send requests
781 : : //! over the stream. Also create a new Connection object embedded in the
782 : : //! client that is freed when the client is closed.
783 : : template <typename InitInterface>
784 : 6 : std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
785 : : {
786 : 6 : typename InitInterface::Client init_client(nullptr);
787 : 6 : std::unique_ptr<Connection> connection;
788 [ + - ]: 6 : loop.sync([&] {
789 : 6 : auto stream =
790 : 6 : loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
791 [ + - ]: 6 : connection = std::make_unique<Connection>(loop, kj::mv(stream));
792 [ + - + - : 12 : init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
+ - + - +
- + - +
- ]
793 [ + - ]: 6 : Connection* connection_ptr = connection.get();
794 [ + - ]: 6 : connection->onDisconnect([&loop, connection_ptr] {
795 [ # # # # : 0 : MP_LOG(loop, Log::Warning) << "IPC client: unexpected network disconnect.";
# # # # #
# # # ]
796 [ # # # # ]: 0 : delete connection_ptr;
797 : : });
798 : 6 : });
799 : : return std::make_unique<ProxyClient<InitInterface>>(
800 [ + - ]: 6 : kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
801 [ + - ]: 6 : }
802 : :
803 : : //! Given stream and init objects, construct a new ProxyServer object that
804 : : //! handles requests from the stream by calling the init object. Embed the
805 : : //! ProxyServer in a Connection object that is stored and erased if
806 : : //! disconnected. This should be called from the event loop thread.
807 : : template <typename InitInterface, typename InitImpl>
808 : 6 : void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
809 : : {
810 : 6 : loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
811 : : // Disable deleter so proxy server object doesn't attempt to delete the
812 : : // init implementation when the proxy client is destroyed or
813 : : // disconnected.
814 [ + - - + ]: 6 : return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
815 : : });
816 : 6 : auto it = loop.m_incoming_connections.begin();
817 : 12 : it->onDisconnect([&loop, it] {
818 [ + - + - : 18 : MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
+ - + - +
- + - ]
819 : 6 : loop.m_incoming_connections.erase(it);
820 : : });
821 : 6 : }
822 : :
823 : : //! Given connection receiver and an init object, handle incoming connections by
824 : : //! calling _Serve, to create ProxyServer objects and forward requests to the
825 : : //! init object.
826 : : template <typename InitInterface, typename InitImpl>
827 : 7 : void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
828 : : {
829 : 7 : auto* ptr = listener.get();
830 [ + - ]: 14 : loop.m_task_set->add(ptr->accept().then(
831 [ + - + - : 21 : [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
+ - - + ]
832 : 5 : _Serve<InitInterface>(loop, kj::mv(stream), init);
833 : 5 : _Listen<InitInterface>(loop, kj::mv(listener), init);
834 : : }));
835 : 7 : }
836 : :
837 : : //! Given stream file descriptor and an init object, handle requests on the
838 : : //! stream by calling methods on the Init object.
839 : : template <typename InitInterface, typename InitImpl>
840 : 1 : void ServeStream(EventLoop& loop, int fd, InitImpl& init)
841 : : {
842 [ + - ]: 1 : _Serve<InitInterface>(
843 : 1 : loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
844 : 1 : }
845 : :
846 : : //! Given listening socket file descriptor and an init object, handle incoming
847 : : //! connections and requests by calling methods on the Init object.
848 : : template <typename InitInterface, typename InitImpl>
849 : 2 : void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
850 : : {
851 : 2 : loop.sync([&]() {
852 [ + - ]: 2 : _Listen<InitInterface>(loop,
853 : 4 : loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
854 : : init);
855 : : });
856 : 2 : }
857 : :
858 [ - - - - : 84 : extern thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - ][ - -
- - - - -
- - - - -
# # # # #
# # # # #
# # ][ - -
- - - - -
- - - - -
- - - - -
- - - - -
- - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ + - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
859 : : // Silence nonstandard bitcoin tidy error "Variable with non-trivial destructor
860 : : // cannot be thread_local" which should not be a problem on modern platforms, and
861 : : // could lead to a small memory leak at worst on older ones.
862 : :
863 : : } // namespace mp
864 : :
865 : : #endif // MP_PROXY_IO_H
|