TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
23 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 :
25 : #include <boost/corosio/detail/endpoint_convert.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/make_err.hpp>
28 :
29 : #include <memory>
30 : #include <mutex>
31 : #include <unordered_map>
32 : #include <utility>
33 :
34 : #include <errno.h>
35 : #include <netinet/in.h>
36 : #include <sys/epoll.h>
37 : #include <sys/socket.h>
38 : #include <unistd.h>
39 :
40 : namespace boost::corosio::detail {
41 :
42 : /** State for epoll acceptor service. */
43 : class epoll_acceptor_state
44 : {
45 : public:
46 HIT 205 : explicit epoll_acceptor_state(epoll_scheduler& sched) noexcept
47 205 : : sched_(sched)
48 : {
49 205 : }
50 :
51 : epoll_scheduler& sched_;
52 : std::mutex mutex_;
53 : intrusive_list<epoll_acceptor> acceptor_list_;
54 : std::unordered_map<epoll_acceptor*, std::shared_ptr<epoll_acceptor>>
55 : acceptor_ptrs_;
56 : };
57 :
58 : /** epoll acceptor service implementation.
59 :
60 : Inherits from acceptor_service to enable runtime polymorphism.
61 : Uses key_type = acceptor_service for service lookup.
62 : */
63 : class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
64 : {
65 : public:
66 : explicit epoll_acceptor_service(capy::execution_context& ctx);
67 : ~epoll_acceptor_service() override;
68 :
69 : epoll_acceptor_service(epoll_acceptor_service const&) = delete;
70 : epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
71 :
72 : void shutdown() override;
73 :
74 : io_object::implementation* construct() override;
75 : void destroy(io_object::implementation*) override;
76 : void close(io_object::handle&) override;
77 : std::error_code open_acceptor(
78 : tcp_acceptor::implementation& impl, endpoint ep, int backlog) override;
79 :
80 4665 : epoll_scheduler& scheduler() const noexcept
81 : {
82 4665 : return state_->sched_;
83 : }
84 : void post(epoll_op* op);
85 : void work_started() noexcept;
86 : void work_finished() noexcept;
87 :
88 : /** Get the socket service for creating peer sockets during accept. */
89 : epoll_socket_service* socket_service() const noexcept;
90 :
91 : private:
92 : capy::execution_context& ctx_;
93 : std::unique_ptr<epoll_acceptor_state> state_;
94 : };
95 :
96 : //--------------------------------------------------------------------------
97 : //
98 : // Implementation
99 : //
100 : //--------------------------------------------------------------------------
101 :
102 : inline void
103 6 : epoll_accept_op::cancel() noexcept
104 : {
105 6 : if (acceptor_impl_)
106 6 : acceptor_impl_->cancel_single_op(*this);
107 : else
108 MIS 0 : request_cancel();
109 HIT 6 : }
110 :
111 : inline void
112 4535 : epoll_accept_op::operator()()
113 : {
114 4535 : stop_cb.reset();
115 :
116 4535 : static_cast<epoll_acceptor*>(acceptor_impl_)
117 4535 : ->service()
118 4535 : .scheduler()
119 4535 : .reset_inline_budget();
120 :
121 4535 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
122 :
123 4535 : if (cancelled.load(std::memory_order_acquire))
124 9 : *ec_out = capy::error::canceled;
125 4526 : else if (errn != 0)
126 MIS 0 : *ec_out = make_err(errn);
127 : else
128 HIT 4526 : *ec_out = {};
129 :
130 : // Set up the peer socket on success
131 4535 : if (success && accepted_fd >= 0 && acceptor_impl_)
132 : {
133 4526 : auto* socket_svc = static_cast<epoll_acceptor*>(acceptor_impl_)
134 4526 : ->service()
135 4526 : .socket_service();
136 4526 : if (socket_svc)
137 : {
138 4526 : auto& impl = static_cast<epoll_socket&>(*socket_svc->construct());
139 4526 : impl.set_socket(accepted_fd);
140 :
141 4526 : impl.desc_state_.fd = accepted_fd;
142 : {
143 4526 : std::lock_guard lock(impl.desc_state_.mutex);
144 4526 : impl.desc_state_.read_op = nullptr;
145 4526 : impl.desc_state_.write_op = nullptr;
146 4526 : impl.desc_state_.connect_op = nullptr;
147 4526 : }
148 4526 : socket_svc->scheduler().register_descriptor(
149 : accepted_fd, &impl.desc_state_);
150 :
151 4526 : impl.set_endpoints(
152 4526 : static_cast<epoll_acceptor*>(acceptor_impl_)->local_endpoint(),
153 4526 : from_sockaddr_in(peer_addr));
154 :
155 4526 : if (impl_out)
156 4526 : *impl_out = &impl;
157 4526 : accepted_fd = -1;
158 : }
159 : else
160 : {
161 : // No socket service — treat as error
162 MIS 0 : *ec_out = make_err(ENOENT);
163 0 : success = false;
164 : }
165 : }
166 :
167 HIT 4535 : if (!success || !acceptor_impl_)
168 : {
169 9 : if (accepted_fd >= 0)
170 : {
171 MIS 0 : ::close(accepted_fd);
172 0 : accepted_fd = -1;
173 : }
174 HIT 9 : if (impl_out)
175 9 : *impl_out = nullptr;
176 : }
177 :
178 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
179 4535 : capy::executor_ref saved_ex(ex);
180 4535 : std::coroutine_handle<> saved_h(h);
181 4535 : auto prevent_premature_destruction = std::move(impl_ptr);
182 4535 : dispatch_coro(saved_ex, saved_h).resume();
183 4535 : }
184 :
185 67 : inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
186 67 : : svc_(svc)
187 : {
188 67 : }
189 :
190 : inline std::coroutine_handle<>
191 4535 : epoll_acceptor::accept(
192 : std::coroutine_handle<> h,
193 : capy::executor_ref ex,
194 : std::stop_token token,
195 : std::error_code* ec,
196 : io_object::implementation** impl_out)
197 : {
198 4535 : auto& op = acc_;
199 4535 : op.reset();
200 4535 : op.h = h;
201 4535 : op.ex = ex;
202 4535 : op.ec_out = ec;
203 4535 : op.impl_out = impl_out;
204 4535 : op.fd = fd_;
205 4535 : op.start(token, this);
206 :
207 4535 : sockaddr_in addr{};
208 4535 : socklen_t addrlen = sizeof(addr);
209 : int accepted;
210 : do
211 : {
212 4535 : accepted = ::accept4(
213 : fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen,
214 : SOCK_NONBLOCK | SOCK_CLOEXEC);
215 : }
216 4535 : while (accepted < 0 && errno == EINTR);
217 :
218 4535 : if (accepted >= 0)
219 : {
220 : {
221 2 : std::lock_guard lock(desc_state_.mutex);
222 2 : desc_state_.read_ready = false;
223 2 : }
224 :
225 2 : if (svc_.scheduler().try_consume_inline_budget())
226 : {
227 MIS 0 : auto* socket_svc = svc_.socket_service();
228 0 : if (socket_svc)
229 : {
230 : auto& impl =
231 0 : static_cast<epoll_socket&>(*socket_svc->construct());
232 0 : impl.set_socket(accepted);
233 :
234 0 : impl.desc_state_.fd = accepted;
235 : {
236 0 : std::lock_guard lock(impl.desc_state_.mutex);
237 0 : impl.desc_state_.read_op = nullptr;
238 0 : impl.desc_state_.write_op = nullptr;
239 0 : impl.desc_state_.connect_op = nullptr;
240 0 : }
241 0 : socket_svc->scheduler().register_descriptor(
242 : accepted, &impl.desc_state_);
243 :
244 0 : impl.set_endpoints(local_endpoint_, from_sockaddr_in(addr));
245 :
246 0 : *ec = {};
247 0 : if (impl_out)
248 0 : *impl_out = &impl;
249 : }
250 : else
251 : {
252 0 : ::close(accepted);
253 0 : *ec = make_err(ENOENT);
254 0 : if (impl_out)
255 0 : *impl_out = nullptr;
256 : }
257 0 : return dispatch_coro(ex, h);
258 : }
259 :
260 HIT 2 : op.accepted_fd = accepted;
261 2 : op.peer_addr = addr;
262 2 : op.complete(0, 0);
263 2 : op.impl_ptr = shared_from_this();
264 2 : svc_.post(&op);
265 2 : return std::noop_coroutine();
266 : }
267 :
268 4533 : if (errno == EAGAIN || errno == EWOULDBLOCK)
269 : {
270 4533 : op.impl_ptr = shared_from_this();
271 4533 : svc_.work_started();
272 :
273 4533 : std::lock_guard lock(desc_state_.mutex);
274 4533 : bool io_done = false;
275 4533 : if (desc_state_.read_ready)
276 : {
277 MIS 0 : desc_state_.read_ready = false;
278 0 : op.perform_io();
279 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
280 0 : if (!io_done)
281 0 : op.errn = 0;
282 : }
283 :
284 HIT 4533 : if (io_done || op.cancelled.load(std::memory_order_acquire))
285 : {
286 MIS 0 : svc_.post(&op);
287 0 : svc_.work_finished();
288 : }
289 : else
290 : {
291 HIT 4533 : desc_state_.read_op = &op;
292 : }
293 4533 : return std::noop_coroutine();
294 4533 : }
295 :
296 MIS 0 : op.complete(errno, 0);
297 0 : op.impl_ptr = shared_from_this();
298 0 : svc_.post(&op);
299 : // completion is always posted to scheduler queue, never inline.
300 0 : return std::noop_coroutine();
301 : }
302 :
303 : inline void
304 HIT 1 : epoll_acceptor::cancel() noexcept
305 : {
306 1 : cancel_single_op(acc_);
307 1 : }
308 :
309 : inline void
310 7 : epoll_acceptor::cancel_single_op(epoll_op& op) noexcept
311 : {
312 7 : auto self = weak_from_this().lock();
313 7 : if (!self)
314 MIS 0 : return;
315 :
316 HIT 7 : op.request_cancel();
317 :
318 7 : epoll_op* claimed = nullptr;
319 : {
320 7 : std::lock_guard lock(desc_state_.mutex);
321 7 : if (desc_state_.read_op == &op)
322 7 : claimed = std::exchange(desc_state_.read_op, nullptr);
323 7 : }
324 7 : if (claimed)
325 : {
326 7 : op.impl_ptr = self;
327 7 : svc_.post(&op);
328 7 : svc_.work_finished();
329 : }
330 7 : }
331 :
332 : inline void
333 264 : epoll_acceptor::close_socket() noexcept
334 : {
335 264 : auto self = weak_from_this().lock();
336 264 : if (self)
337 : {
338 264 : acc_.request_cancel();
339 :
340 264 : epoll_op* claimed = nullptr;
341 : {
342 264 : std::lock_guard lock(desc_state_.mutex);
343 264 : claimed = std::exchange(desc_state_.read_op, nullptr);
344 264 : desc_state_.read_ready = false;
345 264 : desc_state_.write_ready = false;
346 264 : }
347 :
348 264 : if (claimed)
349 : {
350 2 : acc_.impl_ptr = self;
351 2 : svc_.post(&acc_);
352 2 : svc_.work_finished();
353 : }
354 :
355 264 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
356 MIS 0 : desc_state_.impl_ref_ = self;
357 : }
358 :
359 HIT 264 : if (fd_ >= 0)
360 : {
361 64 : if (desc_state_.registered_events != 0)
362 64 : svc_.scheduler().deregister_descriptor(fd_);
363 64 : ::close(fd_);
364 64 : fd_ = -1;
365 : }
366 :
367 264 : desc_state_.fd = -1;
368 264 : desc_state_.registered_events = 0;
369 :
370 264 : local_endpoint_ = endpoint{};
371 264 : }
372 :
373 205 : inline epoll_acceptor_service::epoll_acceptor_service(
374 205 : capy::execution_context& ctx)
375 205 : : ctx_(ctx)
376 205 : , state_(
377 : std::make_unique<epoll_acceptor_state>(
378 205 : ctx.use_service<epoll_scheduler>()))
379 : {
380 205 : }
381 :
382 410 : inline epoll_acceptor_service::~epoll_acceptor_service() {}
383 :
384 : inline void
385 205 : epoll_acceptor_service::shutdown()
386 : {
387 205 : std::lock_guard lock(state_->mutex_);
388 :
389 205 : while (auto* impl = state_->acceptor_list_.pop_front())
390 MIS 0 : impl->close_socket();
391 :
392 : // Don't clear acceptor_ptrs_ here — same rationale as
393 : // epoll_socket_service::shutdown(). Let ~state_ release ptrs
394 : // after scheduler shutdown has drained all queued ops.
395 HIT 205 : }
396 :
397 : inline io_object::implementation*
398 67 : epoll_acceptor_service::construct()
399 : {
400 67 : auto impl = std::make_shared<epoll_acceptor>(*this);
401 67 : auto* raw = impl.get();
402 :
403 67 : std::lock_guard lock(state_->mutex_);
404 67 : state_->acceptor_list_.push_back(raw);
405 67 : state_->acceptor_ptrs_.emplace(raw, std::move(impl));
406 :
407 67 : return raw;
408 67 : }
409 :
410 : inline void
411 67 : epoll_acceptor_service::destroy(io_object::implementation* impl)
412 : {
413 67 : auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
414 67 : epoll_impl->close_socket();
415 67 : std::lock_guard lock(state_->mutex_);
416 67 : state_->acceptor_list_.remove(epoll_impl);
417 67 : state_->acceptor_ptrs_.erase(epoll_impl);
418 67 : }
419 :
420 : inline void
421 131 : epoll_acceptor_service::close(io_object::handle& h)
422 : {
423 131 : static_cast<epoll_acceptor*>(h.get())->close_socket();
424 131 : }
425 :
426 : inline std::error_code
427 66 : epoll_acceptor_service::open_acceptor(
428 : tcp_acceptor::implementation& impl, endpoint ep, int backlog)
429 : {
430 66 : auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
431 66 : epoll_impl->close_socket();
432 :
433 66 : int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
434 66 : if (fd < 0)
435 MIS 0 : return make_err(errno);
436 :
437 HIT 66 : int reuse = 1;
438 66 : ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
439 :
440 66 : sockaddr_in addr = detail::to_sockaddr_in(ep);
441 66 : if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
442 : {
443 2 : int errn = errno;
444 2 : ::close(fd);
445 2 : return make_err(errn);
446 : }
447 :
448 64 : if (::listen(fd, backlog) < 0)
449 : {
450 MIS 0 : int errn = errno;
451 0 : ::close(fd);
452 0 : return make_err(errn);
453 : }
454 :
455 HIT 64 : epoll_impl->fd_ = fd;
456 :
457 : // Register fd with epoll (edge-triggered mode)
458 64 : epoll_impl->desc_state_.fd = fd;
459 : {
460 64 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
461 64 : epoll_impl->desc_state_.read_op = nullptr;
462 64 : }
463 64 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
464 :
465 : // Cache the local endpoint (queries OS for ephemeral port if port was 0)
466 64 : sockaddr_in local_addr{};
467 64 : socklen_t local_len = sizeof(local_addr);
468 64 : if (::getsockname(
469 64 : fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
470 64 : epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
471 :
472 64 : return {};
473 : }
474 :
475 : inline void
476 11 : epoll_acceptor_service::post(epoll_op* op)
477 : {
478 11 : state_->sched_.post(op);
479 11 : }
480 :
481 : inline void
482 4533 : epoll_acceptor_service::work_started() noexcept
483 : {
484 4533 : state_->sched_.work_started();
485 4533 : }
486 :
487 : inline void
488 9 : epoll_acceptor_service::work_finished() noexcept
489 : {
490 9 : state_->sched_.work_finished();
491 9 : }
492 :
493 : inline epoll_socket_service*
494 4526 : epoll_acceptor_service::socket_service() const noexcept
495 : {
496 4526 : auto* svc = ctx_.find_service<detail::socket_service>();
497 4526 : return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
498 : }
499 :
500 : } // namespace boost::corosio::detail
501 :
502 : #endif // BOOST_COROSIO_HAS_EPOLL
503 :
504 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
|