TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/io/io_object.hpp>
19 : #include <boost/corosio/endpoint.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <coroutine>
22 : #include <boost/capy/error.hpp>
23 : #include <system_error>
24 :
25 : #include <boost/corosio/detail/make_err.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/scheduler_op.hpp>
28 : #include <boost/corosio/detail/endpoint_convert.hpp>
29 :
30 : #include <unistd.h>
31 : #include <errno.h>
32 :
33 : #include <atomic>
34 : #include <cstddef>
35 : #include <memory>
36 : #include <mutex>
37 : #include <optional>
38 : #include <stop_token>
39 :
40 : #include <netinet/in.h>
41 : #include <sys/socket.h>
42 : #include <sys/uio.h>
43 :
44 : /*
45 : epoll Operation State
46 : =====================
47 :
48 : Each async I/O operation has a corresponding epoll_op-derived struct that
49 : holds the operation's state while it's in flight. The socket impl owns
50 : fixed slots for each operation type (conn_, rd_, wr_), so only one
51 : operation of each type can be pending per socket at a time.
52 :
53 : Persistent Registration
54 : -----------------------
55 : File descriptors are registered with epoll once (via descriptor_state) and
56 : stay registered until closed. The descriptor_state tracks which operations
57 : are pending (read_op, write_op, connect_op). When an event arrives, the
58 : reactor dispatches to the appropriate pending operation.
59 :
60 : Impl Lifetime Management
61 : ------------------------
62 : When cancel() posts an op to the scheduler's ready queue, the socket impl
63 : might be destroyed before the scheduler processes the op. The `impl_ptr`
64 : member holds a shared_ptr to the impl, keeping it alive until the op
65 : completes. This is set by cancel() and cleared in operator() after the
66 : coroutine is resumed.
67 :
68 : EOF Detection
69 : -------------
70 : For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 : returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72 :
73 : SIGPIPE Prevention
74 : ------------------
75 : Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 : SIGPIPE when the peer has closed.
77 : */
78 :
79 : namespace boost::corosio::detail {
80 :
81 : // Forward declarations
82 : class epoll_socket;
83 : class epoll_acceptor;
84 : struct epoll_op;
85 :
86 : // Forward declaration
87 : class epoll_scheduler;
88 :
89 : /** Per-descriptor state for persistent epoll registration.
90 :
91 : Tracks pending operations for a file descriptor. The fd is registered
92 : once with epoll and stays registered until closed.
93 :
94 : This struct extends scheduler_op to support deferred I/O processing.
95 : When epoll events arrive, the reactor sets ready_events and queues
96 : this descriptor for processing. When popped from the scheduler queue,
97 : operator() performs the actual I/O and queues completion handlers.
98 :
99 : @par Deferred I/O Model
100 : The reactor no longer performs I/O directly. Instead:
101 : 1. Reactor sets ready_events and queues descriptor_state
102 : 2. Scheduler pops descriptor_state and calls operator()
103 : 3. operator() performs I/O under mutex and queues completions
104 :
105 : This eliminates per-descriptor mutex locking from the reactor hot path.
106 :
107 : @par Thread Safety
108 : The mutex protects operation pointers and ready flags during I/O.
109 : ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 : */
111 : struct descriptor_state final : scheduler_op
112 : {
113 : std::mutex mutex;
114 :
115 : // Protected by mutex
116 : epoll_op* read_op = nullptr;
117 : epoll_op* write_op = nullptr;
118 : epoll_op* connect_op = nullptr;
119 :
120 : // Caches edge events that arrived before an op was registered
121 : bool read_ready = false;
122 : bool write_ready = false;
123 :
124 : // Deferred cancellation: set by cancel() when the target op is not
125 : // parked (e.g. completing inline via speculative I/O). Checked when
126 : // the next op parks; if set, the op is immediately self-cancelled.
127 : // This matches IOCP semantics where CancelIoEx always succeeds.
128 : bool read_cancel_pending = false;
129 : bool write_cancel_pending = false;
130 : bool connect_cancel_pending = false;
131 :
132 : // Set during registration only (no mutex needed)
133 : std::uint32_t registered_events = 0;
134 : int fd = -1;
135 :
136 : // For deferred I/O - set by reactor, read by scheduler
137 : std::atomic<std::uint32_t> ready_events_{0};
138 : std::atomic<bool> is_enqueued_{false};
139 : epoll_scheduler const* scheduler_ = nullptr;
140 :
141 : // Prevents impl destruction while this descriptor_state is queued.
142 : // Set by close_socket() when is_enqueued_ is true, cleared by operator().
143 : std::shared_ptr<void> impl_ref_;
144 :
145 : /// Add ready events atomically.
146 HIT 57547 : void add_ready_events(std::uint32_t ev) noexcept
147 : {
148 57547 : ready_events_.fetch_or(ev, std::memory_order_relaxed);
149 57547 : }
150 :
151 : /// Perform deferred I/O and queue completions.
152 : void operator()() override;
153 :
154 : /// Destroy without invoking.
155 : /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
156 : /// the self-referential cycle set by close_socket().
157 29 : void destroy() override
158 : {
159 29 : impl_ref_.reset();
160 29 : }
161 : };
162 :
163 : struct epoll_op : scheduler_op
164 : {
165 : struct canceller
166 : {
167 : epoll_op* op;
168 : void operator()() const noexcept;
169 : };
170 :
171 : std::coroutine_handle<> h;
172 : capy::executor_ref ex;
173 : std::error_code* ec_out = nullptr;
174 : std::size_t* bytes_out = nullptr;
175 :
176 : int fd = -1;
177 : int errn = 0;
178 : std::size_t bytes_transferred = 0;
179 :
180 : std::atomic<bool> cancelled{false};
181 : std::optional<std::stop_callback<canceller>> stop_cb;
182 :
183 : // Prevents use-after-free when socket is closed with pending ops.
184 : // See "Impl Lifetime Management" in file header.
185 : std::shared_ptr<void> impl_ptr;
186 :
187 : // For stop_token cancellation - pointer to owning socket/acceptor impl.
188 : // When stop is requested, we call back to the impl to perform actual I/O cancellation.
189 : epoll_socket* socket_impl_ = nullptr;
190 : epoll_acceptor* acceptor_impl_ = nullptr;
191 :
192 40972 : epoll_op() = default;
193 :
194 360560 : void reset() noexcept
195 : {
196 360560 : fd = -1;
197 360560 : errn = 0;
198 360560 : bytes_transferred = 0;
199 360560 : cancelled.store(false, std::memory_order_relaxed);
200 360560 : impl_ptr.reset();
201 360560 : socket_impl_ = nullptr;
202 360560 : acceptor_impl_ = nullptr;
203 360560 : }
204 :
205 : // Defined in sockets.cpp where epoll_socket is complete
206 : void operator()() override;
207 :
208 35098 : virtual bool is_read_operation() const noexcept
209 : {
210 35098 : return false;
211 : }
212 : virtual void cancel() noexcept = 0;
213 :
214 MIS 0 : void destroy() override
215 : {
216 0 : stop_cb.reset();
217 0 : impl_ptr.reset();
218 0 : }
219 :
220 HIT 123547 : void request_cancel() noexcept
221 : {
222 123547 : cancelled.store(true, std::memory_order_release);
223 123547 : }
224 :
225 : // NOLINTNEXTLINE(performance-unnecessary-value-param)
226 74913 : void start(std::stop_token token, epoll_socket* impl)
227 : {
228 74913 : cancelled.store(false, std::memory_order_release);
229 74913 : stop_cb.reset();
230 74913 : socket_impl_ = impl;
231 74913 : acceptor_impl_ = nullptr;
232 :
233 74913 : if (token.stop_possible())
234 100 : stop_cb.emplace(token, canceller{this});
235 74913 : }
236 :
237 : // NOLINTNEXTLINE(performance-unnecessary-value-param)
238 4535 : void start(std::stop_token token, epoll_acceptor* impl)
239 : {
240 4535 : cancelled.store(false, std::memory_order_release);
241 4535 : stop_cb.reset();
242 4535 : socket_impl_ = nullptr;
243 4535 : acceptor_impl_ = impl;
244 :
245 4535 : if (token.stop_possible())
246 9 : stop_cb.emplace(token, canceller{this});
247 4535 : }
248 :
249 79383 : void complete(int err, std::size_t bytes) noexcept
250 : {
251 79383 : errn = err;
252 79383 : bytes_transferred = bytes;
253 79383 : }
254 :
255 MIS 0 : virtual void perform_io() noexcept {}
256 : };
257 :
258 : struct epoll_connect_op final : epoll_op
259 : {
260 : endpoint target_endpoint;
261 :
262 HIT 4527 : void reset() noexcept
263 : {
264 4527 : epoll_op::reset();
265 4527 : target_endpoint = endpoint{};
266 4527 : }
267 :
268 4527 : void perform_io() noexcept override
269 : {
270 : // connect() completion status is retrieved via SO_ERROR, not return value
271 4527 : int err = 0;
272 4527 : socklen_t len = sizeof(err);
273 4527 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
274 MIS 0 : err = errno;
275 HIT 4527 : complete(err, 0);
276 4527 : }
277 :
278 : // Defined in sockets.cpp where epoll_socket is complete
279 : void operator()() override;
280 : void cancel() noexcept override;
281 : };
282 :
283 : struct epoll_read_op final : epoll_op
284 : {
285 : static constexpr std::size_t max_buffers = 16;
286 : iovec iovecs[max_buffers];
287 : int iovec_count = 0;
288 : bool empty_buffer_read = false;
289 :
290 35082 : bool is_read_operation() const noexcept override
291 : {
292 35082 : return !empty_buffer_read;
293 : }
294 :
295 175849 : void reset() noexcept
296 : {
297 175849 : epoll_op::reset();
298 175849 : iovec_count = 0;
299 175849 : empty_buffer_read = false;
300 175849 : }
301 :
302 146 : void perform_io() noexcept override
303 : {
304 : ssize_t n;
305 : do
306 : {
307 146 : n = ::readv(fd, iovecs, iovec_count);
308 : }
309 146 : while (n < 0 && errno == EINTR);
310 :
311 146 : if (n >= 0)
312 4 : complete(0, static_cast<std::size_t>(n));
313 : else
314 142 : complete(errno, 0);
315 146 : }
316 :
317 : void cancel() noexcept override;
318 : };
319 :
320 : struct epoll_write_op final : epoll_op
321 : {
322 : static constexpr std::size_t max_buffers = 16;
323 : iovec iovecs[max_buffers];
324 : int iovec_count = 0;
325 :
326 175649 : void reset() noexcept
327 : {
328 175649 : epoll_op::reset();
329 175649 : iovec_count = 0;
330 175649 : }
331 :
332 MIS 0 : void perform_io() noexcept override
333 : {
334 0 : msghdr msg{};
335 0 : msg.msg_iov = iovecs;
336 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
337 :
338 : ssize_t n;
339 : do
340 : {
341 0 : n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
342 : }
343 0 : while (n < 0 && errno == EINTR);
344 :
345 0 : if (n >= 0)
346 0 : complete(0, static_cast<std::size_t>(n));
347 : else
348 0 : complete(errno, 0);
349 0 : }
350 :
351 : void cancel() noexcept override;
352 : };
353 :
354 : struct epoll_accept_op final : epoll_op
355 : {
356 : int accepted_fd = -1;
357 : io_object::implementation** impl_out = nullptr;
358 : sockaddr_in peer_addr{};
359 :
360 HIT 4535 : void reset() noexcept
361 : {
362 4535 : epoll_op::reset();
363 4535 : accepted_fd = -1;
364 4535 : impl_out = nullptr;
365 4535 : peer_addr = {};
366 4535 : }
367 :
368 4524 : void perform_io() noexcept override
369 : {
370 4524 : socklen_t addrlen = sizeof(peer_addr);
371 : int new_fd;
372 : do
373 : {
374 9048 : new_fd = ::accept4(
375 4524 : fd, reinterpret_cast<sockaddr*>(&peer_addr), &addrlen,
376 : SOCK_NONBLOCK | SOCK_CLOEXEC);
377 : }
378 4524 : while (new_fd < 0 && errno == EINTR);
379 :
380 4524 : if (new_fd >= 0)
381 : {
382 4524 : accepted_fd = new_fd;
383 4524 : complete(0, 0);
384 : }
385 : else
386 : {
387 MIS 0 : complete(errno, 0);
388 : }
389 HIT 4524 : }
390 :
391 : // Defined in acceptors.cpp where epoll_acceptor is complete
392 : void operator()() override;
393 : void cancel() noexcept override;
394 : };
395 :
396 : } // namespace boost::corosio::detail
397 :
398 : #endif // BOOST_COROSIO_HAS_EPOLL
399 :
400 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
|