Branch data Line data Source code
1 : : // Copyright (c) The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #ifndef MP_PROXY_IO_H
6 : : #define MP_PROXY_IO_H
7 : :
8 : : #include <mp/proxy.h>
9 : : #include <mp/util.h>
10 : :
11 : : #include <mp/proxy.capnp.h>
12 : :
13 : : #include <capnp/rpc-twoparty.h>
14 : :
15 : : #include <assert.h>
16 : : #include <condition_variable>
17 : : #include <functional>
18 : : #include <kj/function.h>
19 : : #include <map>
20 : : #include <memory>
21 : : #include <optional>
22 : : #include <sstream>
23 : : #include <string>
24 : : #include <thread>
25 : :
26 : : namespace mp {
27 : : struct ThreadContext;
28 : :
29 : : struct InvokeContext
30 : : {
31 : : Connection& connection;
32 : : };
33 : :
34 : : struct ClientInvokeContext : InvokeContext
35 : : {
36 : : ThreadContext& thread_context;
37 : 42 : ClientInvokeContext(Connection& conn, ThreadContext& thread_context)
38 [ + - + - : 42 : : InvokeContext{conn}, thread_context{thread_context}
+ - + - +
- ][ + - +
- - - +
- ][ + - -
- + - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
39 : : {
40 : : }
41 : : };
42 : :
43 : : template <typename ProxyServer, typename CallContext_>
44 : : struct ServerInvokeContext : InvokeContext
45 : : {
46 : : using CallContext = CallContext_;
47 : :
48 : : ProxyServer& proxy_server;
49 : : CallContext& call_context;
50 : : int req;
51 : : //! For IPC methods that execute asynchronously, not on the event-loop
52 : : //! thread: lock preventing the event-loop thread from freeing the params or
53 : : //! results structs if the request is canceled while the worker thread is
54 : : //! reading params (`call_context.getParams()`) or writing results
55 : : //! (`call_context.getResults()`).
56 : : Lock* cancel_lock{nullptr};
57 : : //! For IPC methods that execute asynchronously, not on the event-loop
58 : : //! thread, this is set to true if the IPC call was canceled by the client
59 : : //! or canceled by a disconnection. If the call runs on the event-loop
60 : : //! thread, it can't be canceled. This should be accessed with cancel_lock
61 : : //! held if it is not null, since in the asynchronous case it is accessed
62 : : //! from multiple threads.
63 : : bool request_canceled{false};
64 : :
65 : 68 : ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
66 [ + - + - : 42 : : InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
+ - + - +
- + - # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ + - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - -
- - - - -
+ - ][ - -
- - - - +
- + - ][ +
- + - # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
67 : : {
68 : : }
69 : : };
70 : :
71 : : template <typename Interface, typename Params, typename Results>
72 : : using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
73 : :
74 : : template <>
75 : : struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
76 : : {
77 [ + - ]: 46 : using ProxyClientBase::ProxyClientBase;
78 : : // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
79 : : ProxyClient(const ProxyClient&) = delete;
80 : : ~ProxyClient();
81 : :
82 : : //! Reference to callback function that is run if there is a sudden
83 : : //! disconnect and the Connection object is destroyed before this
84 : : //! ProxyClient<Thread> object. The callback will destroy this object and
85 : : //! remove its entry from the thread's request_threads or callback_threads
86 : : //! map. It will also reset m_disconnect_cb so the destructor does not
87 : : //! access it. In the normal case where there is no sudden disconnect, the
88 : : //! destructor will unregister m_disconnect_cb so the callback is never run.
89 : : //! Since this variable is accessed from multiple threads, accesses should
90 : : //! be guarded with the associated Waiter::m_mutex.
91 : : std::optional<CleanupIt> m_disconnect_cb;
92 : : };
93 : :
94 : : template <>
95 : : struct ProxyServer<Thread> final : public Thread::Server
96 : : {
97 : : public:
98 : : ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread);
99 : : ~ProxyServer();
100 : : kj::Promise<void> getName(GetNameContext context) override;
101 : :
102 : : //! Run a callback function fn returning T on this thread. The function will
103 : : //! be queued and executed as soon as the thread is idle, and when fn
104 : : //! returns, the promise returned by this method will be fulfilled with the
105 : : //! value fn returned.
106 : : template<typename T, typename Fn>
107 : : kj::Promise<T> post(Fn&& fn);
108 : :
109 : : EventLoopRef m_loop;
110 : : ThreadContext& m_thread_context;
111 : : std::thread m_thread;
112 : : //! Promise signaled when m_thread_context.waiter is ready and there is no
113 : : //! post() callback function waiting to execute.
114 : : kj::Promise<void> m_thread_ready{kj::READY_NOW};
115 : : };
116 : :
117 : : //! Handler for kj::TaskSet failed task events.
118 : : class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
119 : : {
120 : : public:
121 [ + - ]: 30 : LoggingErrorHandler(EventLoop& loop) : m_loop(loop) {}
122 : : void taskFailed(kj::Exception&& exception) override;
123 : : EventLoop& m_loop;
124 : : };
125 : :
126 : : //! Log flags. Update stringify function if changed!
127 : : enum class Log {
128 : : Trace = 0,
129 : : Debug,
130 : : Info,
131 : : Warning,
132 : : Error,
133 : : Raise,
134 : : };
135 : :
136 : : kj::StringPtr KJ_STRINGIFY(Log flags);
137 : :
138 [ + - ]: 830 : struct LogMessage {
139 : :
140 : : //! Message to be logged
141 : : std::string message;
142 : :
143 : : //! The severity level of this message
144 : : Log level;
145 : : };
146 : :
147 : : using LogFn = std::function<void(LogMessage)>;
148 : :
149 [ + - ]: 31 : struct LogOptions {
150 : :
151 : : //! External logging callback.
152 : : LogFn log_fn;
153 : :
154 : : //! Maximum number of characters to use when representing
155 : : //! request and response structs as strings.
156 : : size_t max_chars{200};
157 : :
158 : : //! Messages with a severity level less than log_level will not be
159 : : //! reported.
160 : : Log log_level{Log::Trace};
161 : : };
162 : :
163 : : class Logger
164 : : {
165 : : public:
166 [ - - - - : 471 : Logger(const LogOptions& options, Log log_level) : m_options(options), m_log_level(log_level) {}
- - - - -
- - - - -
- - - - -
- - - - -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ - - - -
+ - + - -
- - - + -
+ - - - -
- + - + -
- - - - +
- + - - -
- - + - +
- - - - -
+ - + - ]
[ + - + -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
[ # # # # ]
[ - - - -
- - - - -
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ - - - -
+ - + - -
- - - + -
+ - - - -
- - - - -
- - - - -
- - - - -
- - - - ]
[ - - - -
+ - + - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - + - +
- ][ - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - -
- ][ - - -
- + - + -
- - - - -
- - - - -
- - - - -
- - - - -
+ - + - -
- - - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- - - - -
- - - - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ + - +
- # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ]
167 : :
168 : : Logger(Logger&&) = delete;
169 : : Logger& operator=(Logger&&) = delete;
170 : : Logger(const Logger&) = delete;
171 : : Logger& operator=(const Logger&) = delete;
172 : :
173 : 471 : ~Logger() noexcept(false)
174 : : {
175 [ + - ]: 830 : if (enabled()) m_options.log_fn({std::move(m_buffer).str(), m_log_level});
176 [ + - ]: 886 : }
177 : :
178 : : template <typename T>
179 : 2035 : friend Logger& operator<<(Logger& logger, T&& value)
180 : : {
181 [ - + ]: 2035 : if (logger.enabled()) logger.m_buffer << std::forward<T>(value);
182 : 2035 : return logger;
183 : : }
184 : :
185 : : template <typename T>
186 : : friend Logger& operator<<(Logger&& logger, T&& value)
187 : : {
188 : : return logger << std::forward<T>(value);
189 : : }
190 : :
191 : 471 : explicit operator bool() const
192 : : {
193 [ + - + - : 906 : return enabled();
- - - - -
- - - + -
+ - + - +
- - - - -
- - - - +
- + - + -
+ - - - -
- - - - -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
- - - - -
- - - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- - - - -
- - - - -
- - - + -
+ - + - +
- + - + -
- - - - -
- - - - -
- - + - +
- + - + -
+ - + - +
- - - - -
- - - - -
- + - + -
+ - + - +
- + - ][ +
- + - + -
+ - - - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- + - + -
- - # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # ][ + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ + - - -
- - - - -
- + - - -
- - - - -
- + - + -
- - - - -
- - - + -
+ - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ +
- - - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- - - - -
- - - - -
- - - + -
+ - + - +
- + - + -
- - - - -
- - - - -
- - + - +
- - - - -
- - - - +
- + - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
- - - - -
- - - + -
+ - ][ - -
- - - - -
- - - - -
- - - - -
- + - - -
+ - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - +
- - - - -
- - - - -
- - - - -
- - + - -
- + - ][ -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - +
- - - - -
- - - - +
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - + - +
- - - - -
- - - - +
- + - + -
+ - - - -
- - - - -
+ - + - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - - - -
- - - - -
- - - - -
- + - - -
+ - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - + - +
- + - + -
+ - + - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - ][ +
- - - - -
- - - - +
- - - - -
- - - - +
- - - - -
- - - - +
- - - - -
- - - - +
- + - - -
- - - - -
- + - + -
+ - + - -
- - - - -
- - + - +
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
194 : : }
195 : :
196 : : private:
197 : 2977 : bool enabled() const
198 : : {
199 [ + - + - : 2977 : return m_options.log_fn && m_log_level >= m_options.log_level;
+ - + - +
- + - + -
+ - + - +
- - - - -
+ - + - +
- + - - -
- - - - -
- - - - -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - -
- - - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- - - - -
- - - - -
- - - - -
- - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- - - - -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - +
- + - - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - + - +
- + - +
- ][ - - -
- - - - -
- - - - -
- - - - -
- - + - +
- - - - -
- - - - -
- - - + -
+ - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
- - - - -
- - - - -
- - + - +
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - + - +
- + - + -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
- - - - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ][ - -
- - + - +
- + - + -
- - - - -
- - - + -
+ - + - +
- - - - -
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - + -
+ - - - -
- - - - -
- - - - +
- + - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - +
- + - + -
+ - - - -
- - - - -
+ - + - +
- + - - -
- - # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ -
- - - + -
+ - - - -
- - - - -
- - - - +
- + - + -
+ + + - +
- - - - -
- - - - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ - - -
- - - - -
- - - - -
- - - + -
+ - + - +
- - - - -
- - - - -
- - - + -
+ - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- + - + -
- - - - +
- + - + -
+ - + - +
- - - - -
+ - + - +
- + - - -
- - - - -
- - - - -
+ - + - +
- + - - -
- - - - -
- + - + -
+ - + - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
- - - - ]
[ - - - -
- - - - -
- - - - -
- - + - +
- - - - -
- - - - +
- + - + -
+ - - - -
- - - - -
- - - - -
- - - + -
- + + - -
+ + - - +
+ - - + -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
+ - + - -
- - - - -
- - - - -
- + - - +
+ - - + +
- - + + -
- + - - -
- ][ + - +
- + - + -
+ - + - +
- + - + -
+ - + - +
- - - - -
- - + - +
+ - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - + -
+ - - - -
- - - - -
- - - - +
- + - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - + - +
- + - + -
- - - - -
- - - + -
+ - + - +
- + - + -
+ - + - -
- - - - -
- - + - +
- + - + -
- - - - ]
[ - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - +
- + - + -
+ - + - +
- + - + -
- - - - -
- - - - -
- - - - -
- + - + -
+ - + - +
- + - + -
+ - - - -
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ - - -
- + - + -
+ - + - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - +
- - + + -
- + + - -
+ + - - +
- - - - ]
200 : : }
201 : :
202 : : const LogOptions& m_options;
203 : : Log m_log_level;
204 : : std::ostringstream m_buffer;
205 : : };
206 : :
207 : : #define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
208 : :
209 : : #define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
210 : :
211 : : std::string LongThreadName(const char* exe_name);
212 : :
213 : : //! Event loop implementation.
214 : : //!
215 : : //! Cap'n Proto threading model is very simple: all I/O operations are
216 : : //! asynchronous and must be performed on a single thread. This includes:
217 : : //!
218 : : //! - Code starting an asynchronous operation (calling a function that returns a
219 : : //! promise object)
220 : : //! - Code notifying that an asynchronous operation is complete (code using a
221 : : //! fulfiller object)
222 : : //! - Code handling a completed operation (code chaining or waiting for a promise)
223 : : //!
224 : : //! All of this code needs to access shared state, and there is no mutex that
225 : : //! can be acquired to lock this state because Cap'n Proto
226 : : //! assumes it will only be accessed from one thread. So all this code needs to
227 : : //! actually run on one thread, and the EventLoop::loop() method is the entry point for
228 : : //! this thread. ProxyClient and ProxyServer objects that use other threads and
229 : : //! need to perform I/O operations post to this thread using EventLoop::post()
230 : : //! and EventLoop::sync() methods.
231 : : //!
232 : : //! Specifically, because ProxyClient methods can be called from arbitrary
233 : : //! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
234 : : //! methods use the EventLoop thread to send requests, and ProxyServer methods
235 : : //! use the thread to return results.
236 : : //!
237 : : //! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
238 : : class EventLoop
239 : : {
240 : : public:
241 : : //! Construct event loop object with default logging options.
242 : 1 : EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr)
243 [ + - ]: 2 : : EventLoop(exe_name, LogOptions{std::move(log_fn)}, context){}
244 : :
245 : : //! Construct event loop object with specified logging options.
246 : : EventLoop(const char* exe_name, LogOptions log_opts, void* context = nullptr);
247 : :
248 : : //! Backwards-compatible constructor for previous (deprecated) logging callback signature
249 : 1 : EventLoop(const char* exe_name, std::function<void(bool, std::string)> old_callback, void* context = nullptr)
250 : 1 : : EventLoop(exe_name,
251 [ - - + - : 110 : LogFn{[old_callback = std::move(old_callback)](LogMessage log_data) {old_callback(log_data.level == Log::Raise, std::move(log_data.message));}},
+ - ]
252 [ + - ]: 2 : context){}
253 : :
254 : : ~EventLoop();
255 : :
256 : : //! Run event loop. Does not return until shutdown. This should only be
257 : : //! called once from the m_thread_id thread. This will block until
258 : : //! the m_num_clients reference count is 0.
259 : : void loop();
260 : :
261 : : //! Run function on event loop thread. Does not return until function completes.
262 : : //! Must be called while the loop() function is active.
263 : : void post(kj::Function<void()> fn);
264 : :
265 : : //! Wrapper around EventLoop::post that takes advantage of the
266 : : //! fact that callable will not go out of scope to avoid requirement that it
267 : : //! be copyable.
268 : : template <typename Callable>
269 : 253 : void sync(Callable&& callable)
270 : : {
271 [ + - ]: 253 : post(std::forward<Callable>(callable));
272 : 253 : }
273 : :
274 : : //! Register cleanup function to run on asynchronous worker thread without
275 : : //! blocking the event loop thread.
276 : : void addAsyncCleanup(std::function<void()> fn);
277 : :
278 : : //! Start asynchronous worker thread if necessary. This is only done if
279 : : //! there are ProxyServerBase::m_impl objects that need to be destroyed
280 : : //! asynchronously, without tying up the event loop thread. This can happen
281 : : //! when an interface does not declare a destroy() method that would allow
282 : : //! the client to wait for the destructor to finish and run it on a
283 : : //! dedicated thread. It can also happen whenever this is a broken
284 : : //! connection and the client is no longer around to call the destructors
285 : : //! and the server objects need to be garbage collected. In both cases, it
286 : : //! is important that ProxyServer::m_impl destructors do not run on the
287 : : //! eventloop thread because they may need it to do I/O if they perform
288 : : //! other IPC calls.
289 : : void startAsyncThread() MP_REQUIRES(m_mutex);
290 : :
291 : : //! Check if loop should exit.
292 : : bool done() const MP_REQUIRES(m_mutex);
293 : :
294 : : //! Process name included in thread names so combined debug output from
295 : : //! multiple processes is easier to understand.
296 : : const char* m_exe_name;
297 : :
298 : : //! ID of the event loop thread
299 : : std::thread::id m_thread_id = std::this_thread::get_id();
300 : :
301 : : //! Handle of an async worker thread. Joined on destruction. Unset if async
302 : : //! method has not been called.
303 : : std::thread m_async_thread;
304 : :
305 : : //! Callback function to run on event loop thread during post() or sync() call.
306 : : kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
307 : :
308 : : //! Callback functions to run on async thread.
309 : : std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
310 : :
311 : : //! Pipe read handle used to wake up the event loop thread.
312 : : int m_wait_fd = -1;
313 : :
314 : : //! Pipe write handle used to wake up the event loop thread.
315 : : int m_post_fd = -1;
316 : :
317 : : //! Number of clients holding references to ProxyServerBase objects that
318 : : //! reference this event loop.
319 : : int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
320 : :
321 : : //! Mutex and condition variable used to post tasks to event loop and async
322 : : //! thread.
323 : : Mutex m_mutex;
324 : : std::condition_variable m_cv;
325 : :
326 : : //! Capnp IO context.
327 : : kj::AsyncIoContext m_io_context;
328 : :
329 : : //! Capnp error handler. Needs to outlive m_task_set.
330 : : LoggingErrorHandler m_error_handler{*this};
331 : :
332 : : //! Capnp list of pending promises.
333 : : std::unique_ptr<kj::TaskSet> m_task_set;
334 : :
335 : : //! List of connections.
336 : : std::list<Connection> m_incoming_connections;
337 : :
338 : : //! Logging options
339 : : LogOptions m_log_opts;
340 : :
341 : : //! External context pointer.
342 : : void* m_context;
343 : :
344 : : //! Hook called when ProxyServer<ThreadMap>::makeThread() is called.
345 : : std::function<void()> testing_hook_makethread;
346 : :
347 : : //! Hook called on the worker thread inside makeThread(), after the thread
348 : : //! context is set up and thread_context promise is fulfilled, but before it
349 : : //! starts waiting for requests.
350 : : std::function<void()> testing_hook_makethread_created;
351 : :
352 : : //! Hook called on the worker thread when it starts to execute an async
353 : : //! request. Used by tests to control timing or inject behavior at this
354 : : //! point in execution.
355 : : std::function<void()> testing_hook_async_request_start;
356 : :
357 : : //! Hook called on the worker thread just before returning results.
358 : : std::function<void()> testing_hook_async_request_done;
359 : : };
360 : :
361 : : //! Single element task queue used to handle recursive capnp calls. (If the
362 : : //! server makes a callback into the client in the middle of a request, while the client
363 : : //! thread is blocked waiting for server response, this is what allows the
364 : : //! client to run the request in the same thread, the same way code would run in a
365 : : //! single process, with the callback sharing the same thread stack as the original
366 : : //! call.) To support this, the clientInvoke function calls Waiter::wait() to
367 : : //! block the client IPC thread while initial request is in progress. Then if
368 : : //! there is a callback, it is executed with Waiter::post().
369 : : //!
370 : : //! The Waiter class is also used server-side by `ProxyServer<Thread>::post()`
371 : : //! to execute IPC calls on worker threads.
372 : 15 : struct Waiter
373 : : {
374 : 15 : Waiter() = default;
375 : :
376 : : template <typename Fn>
377 : 26 : bool post(Fn&& fn)
378 : : {
379 : 26 : const Lock lock(m_mutex);
380 [ + - ]: 26 : if (m_fn) return false;
381 [ + - ]: 26 : m_fn = std::forward<Fn>(fn);
382 : 26 : m_cv.notify_all();
383 : 26 : return true;
384 : 26 : }
385 : :
386 : : template <class Predicate>
387 : 52 : void wait(Lock& lock, Predicate pred)
388 : : {
389 : 128 : m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
390 : : // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
391 : : // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
392 : : // after the fn() call and before the lock.lock() call in this loop
393 : : // in the case where a capnp response is sent and a brand new
394 : : // request is immediately received.
395 [ - + - + : 156 : while (m_fn) {
- + - + -
+ - + ][ -
+ - + - -
- - - + ]
[ + + - -
- - - + -
+ # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ + + #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
396 : 26 : auto fn = std::move(*m_fn);
397 : 26 : m_fn.reset();
398 [ # # # # : 26 : Unlock(lock, fn);
# # # # #
# # # ][ #
# # # # #
# # # # ]
[ + - # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ][ + - #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
399 : : }
400 : 130 : const bool done = pred();
401 : 130 : return done;
402 : : });
403 : 52 : }
404 : :
405 : : //! Mutex mainly used internally by waiter class, but also used externally
406 : : //! to guard access to related state. Specifically, since the thread_local
407 : : //! ThreadContext struct owns a Waiter, the Waiter::m_mutex is used to guard
408 : : //! access to other parts of the struct to avoid needing to deal with more
409 : : //! mutexes than necessary. This mutex can be held at the same time as
410 : : //! EventLoop::m_mutex as long as Waiter::mutex is locked first and
411 : : //! EventLoop::m_mutex is locked second.
412 : : Mutex m_mutex;
413 : : std::condition_variable m_cv;
414 : : std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
415 : : };
416 : :
417 : : //! Object holding network & rpc state associated with either an incoming server
418 : : //! connection, or an outgoing client connection. It must be created and destroyed
419 : : //! on the event loop thread.
420 : : //! In addition to Cap'n Proto state, it also holds lists of callbacks to run
421 : : //! when the connection is closed.
422 : : class Connection
423 : : {
424 : : public:
425 : 11 : Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
426 : 11 : : m_loop(loop), m_stream(kj::mv(stream_)),
427 [ + - + - ]: 11 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
428 [ + - + - : 33 : m_rpc_system(::capnp::makeRpcClient(m_network)) {}
+ - + - ]
429 : 11 : Connection(EventLoop& loop,
430 : : kj::Own<kj::AsyncIoStream>&& stream_,
431 : : const std::function<::capnp::Capability::Client(Connection&)>& make_client)
432 : 11 : : m_loop(loop), m_stream(kj::mv(stream_)),
433 [ + - + - ]: 11 : m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
434 [ + - + - : 33 : m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
+ - + - +
- + - ]
435 : :
436 : : //! Run cleanup functions. Must be called from the event loop thread. First
437 : : //! calls synchronous cleanup functions while blocked (to free capnp
438 : : //! Capability::Client handles owned by ProxyClient objects), then schedules
439 : : //! asynchronous cleanup functions to run in a worker thread (to run
440 : : //! destructors of m_impl instances owned by ProxyServer objects).
441 : : ~Connection();
442 : :
443 : : //! Register synchronous cleanup function to run on event loop thread (with
444 : : //! access to capnp thread local variables) when disconnect() is called.
445 : : //! any new i/o.
446 : : CleanupIt addSyncCleanup(std::function<void()> fn);
447 : : void removeSyncCleanup(CleanupIt it);
448 : :
449 : : //! Add disconnect handler.
450 : : template <typename F>
451 : 21 : void onDisconnect(F&& f)
452 : : {
453 : : // Add disconnect handler to local TaskSet to ensure it is canceled and
454 : : // will never run after connection object is destroyed. But when disconnect
455 : : // handler fires, do not call the function f right away, instead add it
456 : : // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
457 : : // error in the typical case where f deletes this Connection object.
458 [ + - - + ]: 63 : m_on_disconnect.add(m_network.onDisconnect().then(
459 [ + - + - ]: 43 : [f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
[ + - ]
460 : 21 : }
461 : :
462 : : EventLoopRef m_loop;
463 : : kj::Own<kj::AsyncIoStream> m_stream;
464 : : LoggingErrorHandler m_error_handler{*m_loop};
465 : : //! TaskSet used to cancel the m_network.onDisconnect() handler for remote
466 : : //! disconnections, if the connection is closed locally first by deleting
467 : : //! this Connection object.
468 : : kj::TaskSet m_on_disconnect{m_error_handler};
469 : : ::capnp::TwoPartyVatNetwork m_network;
470 : : std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
471 : :
472 : : // ThreadMap interface client, used to create a remote server thread when an
473 : : // client IPC call is being made for the first time from a new thread.
474 : : ThreadMap::Client m_thread_map{nullptr};
475 : :
476 : : //! Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by
477 : : //! ThreadMap.makeThread) used to service requests to clients.
478 : : ::capnp::CapabilityServerSet<Thread> m_threads;
479 : :
480 : : //! Canceler for canceling promises that we want to discard when the
481 : : //! connection is destroyed. This is used to interrupt method calls that are
482 : : //! still executing at time of disconnection.
483 : : kj::Canceler m_canceler;
484 : :
485 : : //! Cleanup functions to run if connection is broken unexpectedly. List
486 : : //! will be empty if all ProxyClient are destroyed cleanly before the
487 : : //! connection is destroyed.
488 : : CleanupList m_sync_cleanup_fns;
489 : : };
490 : :
491 : : //! Vat id for server side of connection. Required argument to RpcSystem::bootStrap()
492 : : //!
493 : : //! "Vat" is Cap'n Proto nomenclature for a host of various objects that facilitates
494 : : //! bidirectional communication with other vats; it is often but not always 1-1 with
495 : : //! processes. Cap'n Proto doesn't reference clients or servers per se; instead everything
496 : : //! is just a vat.
497 : : //!
498 : : //! See also: https://github.com/capnproto/capnproto/blob/9021f0c722b36cb11e3690b0860939255ebad39c/c%2B%2B/src/capnp/rpc.capnp#L42-L56
499 [ + - + - ]: 1 : struct ServerVatId
500 : : {
501 : : ::capnp::word scratch[4]{};
502 : : ::capnp::MallocMessageBuilder message{scratch};
503 : : ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
504 [ + - ]: 11 : ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
505 : : };
506 : :
507 : : template <typename Interface, typename Impl>
508 : 67 : ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client client,
509 : : Connection* connection,
510 : : bool destroy_connection)
511 [ + - ]: 67 : : m_client(std::move(client)), m_context(connection)
512 : :
513 : : {
514 : : // Handler for the connection getting destroyed before this client object.
515 [ + - ]: 67 : auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
516 : : // Release client capability by move-assigning to temporary.
517 : : {
518 : 0 : typename Interface::Client(std::move(m_client));
519 : : }
520 : 0 : Lock lock{m_context.loop->m_mutex};
521 : 0 : m_context.connection = nullptr;
522 : 0 : });
523 : :
524 : : // Two shutdown sequences are supported:
525 : : //
526 : : // - A normal sequence where client proxy objects are deleted by external
527 : : // code that no longer needs them
528 : : //
529 : : // - A garbage collection sequence where the connection or event loop shuts
530 : : // down while external code is still holding client references.
531 : : //
532 : : // The first case is handled here when m_context.connection is not null. The
533 : : // second case is handled by the disconnect_cb function, which sets
534 : : // m_context.connection to null so nothing happens here.
535 [ + - ]: 87 : m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
536 : : {
537 : : // If the capnp interface defines a destroy method, call it to destroy
538 : : // the remote object, waiting for it to be deleted server side. If the
539 : : // capnp interface does not define a destroy method, this will just call
540 : : // an empty stub defined in the ProxyClientBase class and do nothing.
541 : 6 : Sub::destroy(*this);
542 : :
543 : : // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
544 : 134 : m_context.loop->sync([&]() {
545 : : // Remove disconnect callback on cleanup so it doesn't run and try
546 : : // to access this object after it's destroyed. This call needs to
547 : : // run inside loop->sync() on the event loop thread because
548 : : // otherwise, if there were an ill-timed disconnect, the
549 : : // onDisconnect handler could fire and delete the Connection object
550 : : // before the removeSyncCleanup call.
551 [ + - - - : 67 : if (m_context.connection) m_context.connection->removeSyncCleanup(disconnect_cb);
- - ][ + -
- - + - ]
552 : :
553 : : // Release client capability by move-assigning to temporary.
554 : : {
555 : 67 : typename Interface::Client(std::move(m_client));
556 : : }
557 [ + + - - : 67 : if (destroy_connection) {
- - ][ + +
- - - + ]
558 [ + - + - : 11 : delete m_context.connection;
# # # # #
# # # ][ +
- + - # #
# # # # #
# ]
559 : 11 : m_context.connection = nullptr;
560 : : }
561 : : });
562 : : }
563 : : });
564 [ + - ]: 67 : Sub::construct(*this);
565 : 67 : }
566 : :
567 : : template <typename Interface, typename Impl>
568 : 67 : ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
569 : : {
570 : 67 : CleanupRun(m_context.cleanup_fns);
571 : 67 : }
572 : :
573 : : template <typename Interface, typename Impl>
574 : 21 : ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
575 [ + - ]: 21 : : m_impl(std::move(impl)), m_context(&connection)
576 : : {
577 [ - + ]: 21 : assert(m_impl);
578 [ - - ]: 21 : }
579 : :
580 : : //! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
581 : : //! garbage collection code after there are no more references to this object.
582 : : //! This will typically happen when the corresponding ProxyClient object on the
583 : : //! other side of the connection is destroyed. It can also happen earlier if the
584 : : //! connection is broken or destroyed. In the latter case this destructor will
585 : : //! typically be called inside m_rpc_system.reset() call in the ~Connection
586 : : //! destructor while the Connection object still exists. However, because
587 : : //! ProxyServer objects are refcounted, and the Connection object could be
588 : : //! destroyed while asynchronous IPC calls are still in-flight, it's possible
589 : : //! for this destructor to be called after the Connection object no longer
590 : : //! exists, so it is NOT valid to dereference the m_context.connection pointer
591 : : //! from this function.
592 : : template <typename Interface, typename Impl>
593 : 21 : ProxyServerBase<Interface, Impl>::~ProxyServerBase()
594 : : {
595 : 21 : if (m_impl) {
596 : : // If impl is non-null at this point, it means no client is waiting for
597 : : // the m_impl server object to be destroyed synchronously. This can
598 : : // happen either if the interface did not define a "destroy" method (see
599 : : // invokeDestroy method below), or if a destroy method was defined, but
600 : : // the connection was broken before it could be called.
601 : : //
602 : : // In either case, be conservative and run the cleanup on an
603 : : // asynchronous thread, to avoid destructors or cleanup functions
604 : : // blocking or deadlocking the current EventLoop thread, since they
605 : : // could be making IPC calls.
606 : : //
607 : : // Technically this is a little too conservative since if the interface
608 : : // defines a "destroy" method, but the destroy method does not accept a
609 : : // Context parameter specifying a worker thread, the cleanup method
610 : : // would run on the EventLoop thread normally (when connection is
611 : : // unbroken), but will not run on the EventLoop thread now (when
612 : : // connection is broken). Probably some refactoring of the destructor
613 : : // and invokeDestroy function is possible to make this cleaner and more
614 : : // consistent.
615 : 15 : m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
616 : 15 : impl.reset();
617 : 15 : CleanupRun(fns);
618 : : });
619 : : }
620 [ - + ]: 21 : assert(m_context.cleanup_fns.empty());
621 [ + + - + ]: 42 : }
622 : :
623 : : //! If the capnp interface defined a special "destroy" method, as described the
624 : : //! ProxyClientBase class, this method will be called and synchronously destroy
625 : : //! m_impl before returning to the client.
626 : : //!
627 : : //! If the capnp interface does not define a "destroy" method, this will never
628 : : //! be called and the ~ProxyServerBase destructor will be responsible for
629 : : //! deleting m_impl asynchronously, whenever the ProxyServer object gets garbage
630 : : //! collected by Cap'n Proto.
631 : : //!
632 : : //! This method is called in the same way other proxy server methods are called,
633 : : //! via the serverInvoke function. Basically serverInvoke just calls this as a
634 : : //! substitute for a non-existent m_impl->destroy() method. If the destroy
635 : : //! method has any parameters or return values they will be handled in the
636 : : //! normal way by PassField/ReadField/BuildField functions. Particularly if a
637 : : //! Context.thread parameter was passed, this method will run on the worker
638 : : //! thread specified by the client. Otherwise it will run on the EventLoop
639 : : //! thread, like other server methods without an assigned thread.
640 : : template <typename Interface, typename Impl>
641 : 6 : void ProxyServerBase<Interface, Impl>::invokeDestroy()
642 : : {
643 : 6 : m_impl.reset();
644 : 6 : CleanupRun(m_context.cleanup_fns);
645 : 6 : }
646 : :
647 : : //! Map from Connection to local or remote thread handle which will be used over
648 : : //! that connection. This map will typically only contain one entry, but can
649 : : //! contain multiple if a single thread makes IPC calls over multiple
650 : : //! connections. A std::optional value type is used to avoid the map needing to
651 : : //! be locked while ProxyClient<Thread> objects are constructed, see
652 : : //! ThreadContext "Synchronization note" below.
653 : : using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
654 : : using ConnThread = ConnThreads::iterator;
655 : :
656 : : // Retrieve ProxyClient<Thread> object associated with this connection from a
657 : : // map, or create a new one and insert it into the map. Return map iterator and
658 : : // inserted bool.
659 : : std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);
660 : :
661 : : //! The thread_local ThreadContext g_thread_context struct provides information
662 : : //! about individual threads and a way of communicating between them. Because
663 : : //! it's a thread local struct, each ThreadContext instance is initialized by
664 : : //! the thread that owns it.
665 : : //!
666 : : //! ThreadContext is used for any client threads created externally which make
667 : : //! IPC calls, and for server threads created by
668 : : //! ProxyServer<ThreadMap>::makeThread() which execute IPC calls for clients.
669 : : //!
670 : : //! In both cases, the struct holds information like the thread name, and a
671 : : //! Waiter object where the EventLoop can post incoming IPC requests to execute
672 : : //! on the thread. The struct also holds ConnThread maps associating the thread
673 : : //! with local and remote ProxyClient<Thread> objects.
674 : : struct ThreadContext
675 : : {
676 : : //! Identifying string for debug.
677 : : std::string thread_name;
678 : :
679 : : //! Waiter object used to allow remote clients to execute code on this
680 : : //! thread. For server threads created by
681 : : //! ProxyServer<ThreadMap>::makeThread(), this is initialized in that
682 : : //! function. Otherwise, for client threads created externally, this is
683 : : //! initialized the first time the thread tries to make an IPC call. Having
684 : : //! a waiter is necessary for threads making IPC calls in case a server they
685 : : //! are calling expects them to execute a callback during the call, before
686 : : //! it sends a response.
687 : : //!
688 : : //! For IPC client threads, the Waiter pointer is never cleared and the Waiter
689 : : //! just gets destroyed when the thread does. For server threads created by
690 : : //! makeThread(), this pointer is set to null in the ~ProxyServer<Thread> as
691 : : //! a signal for the thread to exit and destroy itself. In both cases, the
692 : : //! same Waiter object is used across different calls and only created and
693 : : //! destroyed once for the lifetime of the thread.
694 : : std::unique_ptr<Waiter> waiter = nullptr;
695 : :
696 : : //! When client is making a request to a server, this is the
697 : : //! `callbackThread` argument it passes in the request, used by the server
698 : : //! in case it needs to make callbacks into the client that need to execute
699 : : //! while the client is waiting. This will be set to a local thread object.
700 : : //!
701 : : //! Synchronization note: The callback_thread and request_thread maps are
702 : : //! only ever accessed internally by this thread's destructor and externally
703 : : //! by Cap'n Proto event loop threads. Since it's possible for IPC client
704 : : //! threads to make calls over different connections that could have
705 : : //! different event loops, these maps are guarded by Waiter::m_mutex in case
706 : : //! different event loop threads add or remove map entries simultaneously.
707 : : //! However, individual ProxyClient<Thread> objects in the maps will only be
708 : : //! associated with one event loop and guarded by EventLoop::m_mutex. So
709 : : //! Waiter::m_mutex does not need to be held while accessing individual
710 : : //! ProxyClient<Thread> instances, and may even need to be released to
711 : : //! respect lock order and avoid locking Waiter::m_mutex before
712 : : //! EventLoop::m_mutex.
713 : : ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);
714 : :
715 : : //! When client is making a request to a server, this is the `thread`
716 : : //! argument it passes in the request, used to control which thread on
717 : : //! server will be responsible for executing it. If client call is being
718 : : //! made from a local thread, this will be a remote thread object returned
719 : : //! by makeThread. If a client call is being made from a thread currently
720 : : //! handling a server request, this will be set to the `callbackThread`
721 : : //! request thread argument passed in that request.
722 : : //!
723 : : //! Synchronization note: \ref callback_threads note applies here as well.
724 : : ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
725 : :
726 : : //! Whether this thread is a capnp event loop thread. Not really used except
727 : : //! to assert false if there's an attempt to execute a blocking operation
728 : : //! which could deadlock the thread.
729 : : bool loop_thread = false;
730 : : };
731 : :
732 : : template<typename T, typename Fn>
733 : 26 : kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
734 : : {
735 : 26 : auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is ready to post again.
736 [ + - ]: 26 : auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
737 : 26 : CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
738 : : // Keep a reference to the ProxyServer<Thread> instance by assigning it to
739 : : // the self variable. ProxyServer instances are reference-counted and if the
740 : : // client drops its reference, this variable keeps the instance alive until
741 : : // the thread finishes executing. The self variable needs to be destroyed on
742 : : // the event loop thread so it is freed in a sync() call below.
743 [ + - ]: 26 : auto self = thisCap();
744 [ + - + - ]: 74 : auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
745 : 26 : auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
746 [ # # # # : 70 : bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- # # #
# ][ + - +
- - - - -
+ - + - ]
[ + - + -
+ - + - #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
747 : : // Fulfill ready.promise now, as soon as the Waiter starts executing
748 : : // this lambda, so the next ProxyServer<Thread>::post() call can
749 : : // immediately call waiter->post(). It is important to do this
750 : : // before calling fn() because fn() can make an IPC call back to the
751 : : // client, which can make another IPC call to this server thread.
752 : : // (This typically happens when IPC methods take std::function
753 : : // parameters.) When this happens the second call to the server
754 : : // thread should not be blocked waiting for the first call.
755 [ # # # # : 96 : m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- # # #
# ][ + - -
- + - + -
- - + - ]
[ + - + -
+ - + - #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
756 : 26 : ready_fulfiller->fulfill();
757 : 26 : ready_fulfiller = nullptr;
758 : : });
759 : 26 : std::optional<T> result_value;
760 [ - + ]: 52 : kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
761 [ # # # # : 70 : m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- # # #
# ][ + - +
- - - - -
+ - + - ]
[ + - + -
+ - + - #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
762 : : // Destroy CancelMonitor here before fulfilling or rejecting the
763 : : // promise so it doesn't get triggered when the promise is
764 : : // destroyed.
765 : 26 : cancel_monitor_ptr = nullptr;
766 : : // Send results to the fulfiller. Technically it would be ok to
767 : : // skip this if promise was canceled, but it's simpler to just
768 : : // do it unconditionally.
769 [ # # # # : 26 : KJ_IF_MAYBE(e, exception) {
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ + - # # ]
[ + - - -
+ - ][ + -
+ - # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
770 [ # # # # : 0 : assert(!result_value);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ # # # # ]
[ # # # #
# # ][ # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
771 : 0 : result_fulfiller->reject(kj::mv(*e));
772 : : } else {
773 [ # # # # : 26 : assert(result_value);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ - + # # ]
[ - + - -
- + ][ - +
- + # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
774 : 26 : result_fulfiller->fulfill(kj::mv(*result_value));
775 [ # # # # : 26 : result_value.reset();
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ + - # # ]
[ + - - -
+ - ][ + -
+ - # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
776 : : }
777 : 26 : result_fulfiller = nullptr;
778 : : // Use evalLater to destroy the ProxyServer<Thread> self
779 : : // reference, if it is the last reference, because the
780 : : // ProxyServer<Thread> destructor needs to join the thread,
781 : : // which can't happen until this sync() block has exited.
782 [ # # # # : 104 : m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ + - +
- + - # #
# # # # ]
[ + - - -
+ - + - +
- - - - -
+ - + - ]
[ + - + -
+ - + - +
- + - # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
783 : : });
784 : 26 : });
785 : : // Assert that calling Waiter::post did not fail. It could only return
786 : : // false if a new function was posted before the previous one finished
787 : : // executing, but new functions are only posted when m_thread_ready is
788 : : // signaled, so this should never happen.
789 [ # # # # : 26 : assert(posted);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ - + # # ]
[ - + - -
- + ][ - +
- + # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
790 [ # # # # : 26 : return kj::mv(result.promise);
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ + - # # ]
[ + - - -
+ - ][ + -
+ - # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ]
791 [ # # # # : 52 : }).attach(kj::heap<CancelProbe>(cancel_monitor));
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ]
[ + - + - ]
[ + - - -
+ - ][ # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # #
# ]
792 [ - + ]: 26 : m_thread_ready = kj::mv(ready.promise);
793 [ + - ]: 52 : return ret;
794 [ + - + - : 26 : }
- - ]
795 : :
796 : : //! Given stream file descriptor, make a new ProxyClient object to send requests
797 : : //! over the stream. Also create a new Connection object embedded in the
798 : : //! client that is freed when the client is closed.
799 : : template <typename InitInterface>
800 : 10 : std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
801 : : {
802 : 10 : typename InitInterface::Client init_client(nullptr);
803 : 10 : std::unique_ptr<Connection> connection;
804 [ + - ]: 10 : loop.sync([&] {
805 : 10 : auto stream =
806 : 10 : loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
807 [ + - ]: 10 : connection = std::make_unique<Connection>(loop, kj::mv(stream));
808 [ + - + - : 20 : init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
+ - + - +
- + - +
- ]
809 [ + - ]: 10 : Connection* connection_ptr = connection.get();
810 [ + - ]: 10 : connection->onDisconnect([&loop, connection_ptr] {
811 [ # # # # : 0 : MP_LOG(loop, Log::Warning) << "IPC client: unexpected network disconnect.";
# # # # #
# # # ]
812 [ # # # # ]: 0 : delete connection_ptr;
813 : : });
814 : 10 : });
815 : : return std::make_unique<ProxyClient<InitInterface>>(
816 [ + - ]: 10 : kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
817 [ + - ]: 10 : }
818 : :
819 : : //! Given stream and init objects, construct a new ProxyServer object that
820 : : //! handles requests from the stream by calling the init object. Embed the
821 : : //! ProxyServer in a Connection object that is stored and erased if
822 : : //! disconnected. This should be called from the event loop thread.
823 : : template <typename InitInterface, typename InitImpl>
824 : 10 : void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
825 : : {
826 : 10 : loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
827 : : // Disable deleter so proxy server object doesn't attempt to delete the
828 : : // init implementation when the proxy client is destroyed or
829 : : // disconnected.
830 [ + - - + ]: 10 : return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
831 : : });
832 : 10 : auto it = loop.m_incoming_connections.begin();
833 [ + - + - : 30 : MP_LOG(loop, Log::Info) << "IPC server: socket connected.";
+ - + - +
- + - ]
834 : 20 : it->onDisconnect([&loop, it] {
835 [ + - + - : 30 : MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
+ - + - +
- + - ]
836 : 10 : loop.m_incoming_connections.erase(it);
837 : : });
838 : 10 : }
839 : :
840 : : //! Given connection receiver and an init object, handle incoming connections by
841 : : //! calling _Serve, to create ProxyServer objects and forward requests to the
842 : : //! init object.
843 : : template <typename InitInterface, typename InitImpl>
844 : 12 : void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
845 : : {
846 : 12 : auto* ptr = listener.get();
847 [ + - ]: 24 : loop.m_task_set->add(ptr->accept().then(
848 [ + - + - : 36 : [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
+ - - + ]
849 : 9 : _Serve<InitInterface>(loop, kj::mv(stream), init);
850 : 9 : _Listen<InitInterface>(loop, kj::mv(listener), init);
851 : : }));
852 : 12 : }
853 : :
854 : : //! Given stream file descriptor and an init object, handle requests on the
855 : : //! stream by calling methods on the Init object.
856 : : template <typename InitInterface, typename InitImpl>
857 : 1 : void ServeStream(EventLoop& loop, int fd, InitImpl& init)
858 : : {
859 [ + - ]: 1 : _Serve<InitInterface>(
860 : 1 : loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
861 : 1 : }
862 : :
863 : : //! Given listening socket file descriptor and an init object, handle incoming
864 : : //! connections and requests by calling methods on the Init object.
865 : : template <typename InitInterface, typename InitImpl>
866 : 3 : void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
867 : : {
868 : 3 : loop.sync([&]() {
869 [ + - ]: 3 : _Listen<InitInterface>(loop,
870 : 6 : loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
871 : : init);
872 : : });
873 : 3 : }
874 : :
875 [ - - - - : 136 : extern thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
- - - - -
- - - - -
- - - - -
- - - - -
+ - + - +
- + - + -
+ - ][ - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - - -
- - - ][ +
- - - - -
- - - - -
- ][ + - -
- - - ][ -
- - - +
- ][ + - +
- # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # ]
[ # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # ][ #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# ][ # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # # # #
# # ]
876 : : // Silence nonstandard bitcoin tidy error "Variable with non-trivial destructor
877 : : // cannot be thread_local" which should not be a problem on modern platforms, and
878 : : // could lead to a small memory leak at worst on older ones.
879 : :
880 : : } // namespace mp
881 : :
882 : : #endif // MP_PROXY_IO_H
|