LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 83.2 % 101 84 17
Test Date: 2026-02-17 20:54:14 Functions: 85.0 % 20 17 3

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/corosio/io/io_object.hpp>
      19                 : #include <boost/corosio/endpoint.hpp>
      20                 : #include <boost/capy/ex/executor_ref.hpp>
      21                 : #include <coroutine>
      22                 : #include <boost/capy/error.hpp>
      23                 : #include <system_error>
      24                 : 
      25                 : #include <boost/corosio/detail/make_err.hpp>
      26                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      27                 : #include <boost/corosio/detail/scheduler_op.hpp>
      28                 : #include <boost/corosio/detail/endpoint_convert.hpp>
      29                 : 
      30                 : #include <unistd.h>
      31                 : #include <errno.h>
      32                 : 
      33                 : #include <atomic>
      34                 : #include <cstddef>
      35                 : #include <memory>
      36                 : #include <mutex>
      37                 : #include <optional>
      38                 : #include <stop_token>
      39                 : 
      40                 : #include <netinet/in.h>
      41                 : #include <sys/socket.h>
      42                 : #include <sys/uio.h>
      43                 : 
      44                 : /*
      45                 :     epoll Operation State
      46                 :     =====================
      47                 : 
      48                 :     Each async I/O operation has a corresponding epoll_op-derived struct that
      49                 :     holds the operation's state while it's in flight. The socket impl owns
      50                 :     fixed slots for each operation type (conn_, rd_, wr_), so only one
      51                 :     operation of each type can be pending per socket at a time.
      52                 : 
      53                 :     Persistent Registration
      54                 :     -----------------------
      55                 :     File descriptors are registered with epoll once (via descriptor_state) and
      56                 :     stay registered until closed. The descriptor_state tracks which operations
      57                 :     are pending (read_op, write_op, connect_op). When an event arrives, the
      58                 :     reactor dispatches to the appropriate pending operation.
      59                 : 
      60                 :     Impl Lifetime Management
      61                 :     ------------------------
      62                 :     When cancel() posts an op to the scheduler's ready queue, the socket impl
      63                 :     might be destroyed before the scheduler processes the op. The `impl_ptr`
      64                 :     member holds a shared_ptr to the impl, keeping it alive until the op
      65                 :     completes. This is set by cancel() and cleared in operator() after the
      66                 :     coroutine is resumed.
      67                 : 
      68                 :     EOF Detection
      69                 :     -------------
      70                 :     For reads, 0 bytes with no error means EOF. But an empty user buffer also
      71                 :     returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
      72                 : 
      73                 :     SIGPIPE Prevention
      74                 :     ------------------
      75                 :     Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
      76                 :     SIGPIPE when the peer has closed.
      77                 : */
      78                 : 
      79                 : namespace boost::corosio::detail {
      80                 : 
      81                 : // Forward declarations
      82                 : class epoll_socket;
      83                 : class epoll_acceptor;
      84                 : struct epoll_op;
      85                 : 
      86                 : // Forward declaration
      87                 : class epoll_scheduler;
      88                 : 
      89                 : /** Per-descriptor state for persistent epoll registration.
      90                 : 
      91                 :     Tracks pending operations for a file descriptor. The fd is registered
      92                 :     once with epoll and stays registered until closed.
      93                 : 
      94                 :     This struct extends scheduler_op to support deferred I/O processing.
      95                 :     When epoll events arrive, the reactor sets ready_events and queues
      96                 :     this descriptor for processing. When popped from the scheduler queue,
      97                 :     operator() performs the actual I/O and queues completion handlers.
      98                 : 
      99                 :     @par Deferred I/O Model
     100                 :     The reactor no longer performs I/O directly. Instead:
     101                 :     1. Reactor sets ready_events and queues descriptor_state
     102                 :     2. Scheduler pops descriptor_state and calls operator()
     103                 :     3. operator() performs I/O under mutex and queues completions
     104                 : 
     105                 :     This eliminates per-descriptor mutex locking from the reactor hot path.
     106                 : 
     107                 :     @par Thread Safety
     108                 :     The mutex protects operation pointers and ready flags during I/O.
     109                 :     ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
     110                 : */
     111                 : struct descriptor_state final : scheduler_op
     112                 : {
     113                 :     std::mutex mutex;
     114                 : 
     115                 :     // Protected by mutex
     116                 :     epoll_op* read_op    = nullptr;
     117                 :     epoll_op* write_op   = nullptr;
     118                 :     epoll_op* connect_op = nullptr;
     119                 : 
     120                 :     // Caches edge events that arrived before an op was registered
     121                 :     bool read_ready  = false;
     122                 :     bool write_ready = false;
     123                 : 
     124                 :     // Deferred cancellation: set by cancel() when the target op is not
     125                 :     // parked (e.g. completing inline via speculative I/O). Checked when
     126                 :     // the next op parks; if set, the op is immediately self-cancelled.
     127                 :     // This matches IOCP semantics where CancelIoEx always succeeds.
     128                 :     bool read_cancel_pending    = false;
     129                 :     bool write_cancel_pending   = false;
     130                 :     bool connect_cancel_pending = false;
     131                 : 
     132                 :     // Set during registration only (no mutex needed)
     133                 :     std::uint32_t registered_events = 0;
     134                 :     int fd                          = -1;
     135                 : 
     136                 :     // For deferred I/O - set by reactor, read by scheduler
     137                 :     std::atomic<std::uint32_t> ready_events_{0};
     138                 :     std::atomic<bool> is_enqueued_{false};
     139                 :     epoll_scheduler const* scheduler_ = nullptr;
     140                 : 
     141                 :     // Prevents impl destruction while this descriptor_state is queued.
     142                 :     // Set by close_socket() when is_enqueued_ is true, cleared by operator().
     143                 :     std::shared_ptr<void> impl_ref_;
     144                 : 
     145                 :     /// Add ready events atomically.
     146 HIT       57547 :     void add_ready_events(std::uint32_t ev) noexcept
     147                 :     {
     148           57547 :         ready_events_.fetch_or(ev, std::memory_order_relaxed);
     149           57547 :     }
     150                 : 
     151                 :     /// Perform deferred I/O and queue completions.
     152                 :     void operator()() override;
     153                 : 
     154                 :     /// Destroy without invoking.
     155                 :     /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
     156                 :     /// the self-referential cycle set by close_socket().
     157              29 :     void destroy() override
     158                 :     {
     159              29 :         impl_ref_.reset();
     160              29 :     }
     161                 : };
     162                 : 
     163                 : struct epoll_op : scheduler_op
     164                 : {
     165                 :     struct canceller
     166                 :     {
     167                 :         epoll_op* op;
     168                 :         void operator()() const noexcept;
     169                 :     };
     170                 : 
     171                 :     std::coroutine_handle<> h;
     172                 :     capy::executor_ref ex;
     173                 :     std::error_code* ec_out = nullptr;
     174                 :     std::size_t* bytes_out  = nullptr;
     175                 : 
     176                 :     int fd                        = -1;
     177                 :     int errn                      = 0;
     178                 :     std::size_t bytes_transferred = 0;
     179                 : 
     180                 :     std::atomic<bool> cancelled{false};
     181                 :     std::optional<std::stop_callback<canceller>> stop_cb;
     182                 : 
     183                 :     // Prevents use-after-free when socket is closed with pending ops.
     184                 :     // See "Impl Lifetime Management" in file header.
     185                 :     std::shared_ptr<void> impl_ptr;
     186                 : 
     187                 :     // For stop_token cancellation - pointer to owning socket/acceptor impl.
     188                 :     // When stop is requested, we call back to the impl to perform actual I/O cancellation.
     189                 :     epoll_socket* socket_impl_     = nullptr;
     190                 :     epoll_acceptor* acceptor_impl_ = nullptr;
     191                 : 
     192           40972 :     epoll_op() = default;
     193                 : 
     194          360560 :     void reset() noexcept
     195                 :     {
     196          360560 :         fd                = -1;
     197          360560 :         errn              = 0;
     198          360560 :         bytes_transferred = 0;
     199          360560 :         cancelled.store(false, std::memory_order_relaxed);
     200          360560 :         impl_ptr.reset();
     201          360560 :         socket_impl_   = nullptr;
     202          360560 :         acceptor_impl_ = nullptr;
     203          360560 :     }
     204                 : 
     205                 :     // Defined in sockets.cpp where epoll_socket is complete
     206                 :     void operator()() override;
     207                 : 
     208           35098 :     virtual bool is_read_operation() const noexcept
     209                 :     {
     210           35098 :         return false;
     211                 :     }
     212                 :     virtual void cancel() noexcept = 0;
     213                 : 
     214 MIS           0 :     void destroy() override
     215                 :     {
     216               0 :         stop_cb.reset();
     217               0 :         impl_ptr.reset();
     218               0 :     }
     219                 : 
     220 HIT      123547 :     void request_cancel() noexcept
     221                 :     {
     222          123547 :         cancelled.store(true, std::memory_order_release);
     223          123547 :     }
     224                 : 
     225                 :     // NOLINTNEXTLINE(performance-unnecessary-value-param)
     226           74913 :     void start(std::stop_token token, epoll_socket* impl)
     227                 :     {
     228           74913 :         cancelled.store(false, std::memory_order_release);
     229           74913 :         stop_cb.reset();
     230           74913 :         socket_impl_   = impl;
     231           74913 :         acceptor_impl_ = nullptr;
     232                 : 
     233           74913 :         if (token.stop_possible())
     234             100 :             stop_cb.emplace(token, canceller{this});
     235           74913 :     }
     236                 : 
     237                 :     // NOLINTNEXTLINE(performance-unnecessary-value-param)
     238            4535 :     void start(std::stop_token token, epoll_acceptor* impl)
     239                 :     {
     240            4535 :         cancelled.store(false, std::memory_order_release);
     241            4535 :         stop_cb.reset();
     242            4535 :         socket_impl_   = nullptr;
     243            4535 :         acceptor_impl_ = impl;
     244                 : 
     245            4535 :         if (token.stop_possible())
     246               9 :             stop_cb.emplace(token, canceller{this});
     247            4535 :     }
     248                 : 
     249           79383 :     void complete(int err, std::size_t bytes) noexcept
     250                 :     {
     251           79383 :         errn              = err;
     252           79383 :         bytes_transferred = bytes;
     253           79383 :     }
     254                 : 
     255 MIS           0 :     virtual void perform_io() noexcept {}
     256                 : };
     257                 : 
     258                 : struct epoll_connect_op final : epoll_op
     259                 : {
     260                 :     endpoint target_endpoint;
     261                 : 
     262 HIT        4527 :     void reset() noexcept
     263                 :     {
     264            4527 :         epoll_op::reset();
     265            4527 :         target_endpoint = endpoint{};
     266            4527 :     }
     267                 : 
     268            4527 :     void perform_io() noexcept override
     269                 :     {
     270                 :         // connect() completion status is retrieved via SO_ERROR, not return value
     271            4527 :         int err       = 0;
     272            4527 :         socklen_t len = sizeof(err);
     273            4527 :         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     274 MIS           0 :             err = errno;
     275 HIT        4527 :         complete(err, 0);
     276            4527 :     }
     277                 : 
     278                 :     // Defined in sockets.cpp where epoll_socket is complete
     279                 :     void operator()() override;
     280                 :     void cancel() noexcept override;
     281                 : };
     282                 : 
     283                 : struct epoll_read_op final : epoll_op
     284                 : {
     285                 :     static constexpr std::size_t max_buffers = 16;
     286                 :     iovec iovecs[max_buffers];
     287                 :     int iovec_count        = 0;
     288                 :     bool empty_buffer_read = false;
     289                 : 
     290           35082 :     bool is_read_operation() const noexcept override
     291                 :     {
     292           35082 :         return !empty_buffer_read;
     293                 :     }
     294                 : 
     295          175849 :     void reset() noexcept
     296                 :     {
     297          175849 :         epoll_op::reset();
     298          175849 :         iovec_count       = 0;
     299          175849 :         empty_buffer_read = false;
     300          175849 :     }
     301                 : 
     302             146 :     void perform_io() noexcept override
     303                 :     {
     304                 :         ssize_t n;
     305                 :         do
     306                 :         {
     307             146 :             n = ::readv(fd, iovecs, iovec_count);
     308                 :         }
     309             146 :         while (n < 0 && errno == EINTR);
     310                 : 
     311             146 :         if (n >= 0)
     312               4 :             complete(0, static_cast<std::size_t>(n));
     313                 :         else
     314             142 :             complete(errno, 0);
     315             146 :     }
     316                 : 
     317                 :     void cancel() noexcept override;
     318                 : };
     319                 : 
     320                 : struct epoll_write_op final : epoll_op
     321                 : {
     322                 :     static constexpr std::size_t max_buffers = 16;
     323                 :     iovec iovecs[max_buffers];
     324                 :     int iovec_count = 0;
     325                 : 
     326          175649 :     void reset() noexcept
     327                 :     {
     328          175649 :         epoll_op::reset();
     329          175649 :         iovec_count = 0;
     330          175649 :     }
     331                 : 
     332 MIS           0 :     void perform_io() noexcept override
     333                 :     {
     334               0 :         msghdr msg{};
     335               0 :         msg.msg_iov    = iovecs;
     336               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     337                 : 
     338                 :         ssize_t n;
     339                 :         do
     340                 :         {
     341               0 :             n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
     342                 :         }
     343               0 :         while (n < 0 && errno == EINTR);
     344                 : 
     345               0 :         if (n >= 0)
     346               0 :             complete(0, static_cast<std::size_t>(n));
     347                 :         else
     348               0 :             complete(errno, 0);
     349               0 :     }
     350                 : 
     351                 :     void cancel() noexcept override;
     352                 : };
     353                 : 
     354                 : struct epoll_accept_op final : epoll_op
     355                 : {
     356                 :     int accepted_fd                      = -1;
     357                 :     io_object::implementation** impl_out = nullptr;
     358                 :     sockaddr_in peer_addr{};
     359                 : 
     360 HIT        4535 :     void reset() noexcept
     361                 :     {
     362            4535 :         epoll_op::reset();
     363            4535 :         accepted_fd = -1;
     364            4535 :         impl_out    = nullptr;
     365            4535 :         peer_addr   = {};
     366            4535 :     }
     367                 : 
     368            4524 :     void perform_io() noexcept override
     369                 :     {
     370            4524 :         socklen_t addrlen = sizeof(peer_addr);
     371                 :         int new_fd;
     372                 :         do
     373                 :         {
     374            9048 :             new_fd = ::accept4(
     375            4524 :                 fd, reinterpret_cast<sockaddr*>(&peer_addr), &addrlen,
     376                 :                 SOCK_NONBLOCK | SOCK_CLOEXEC);
     377                 :         }
     378            4524 :         while (new_fd < 0 && errno == EINTR);
     379                 : 
     380            4524 :         if (new_fd >= 0)
     381                 :         {
     382            4524 :             accepted_fd = new_fd;
     383            4524 :             complete(0, 0);
     384                 :         }
     385                 :         else
     386                 :         {
     387 MIS           0 :             complete(errno, 0);
     388                 :         }
     389 HIT        4524 :     }
     390                 : 
     391                 :     // Defined in acceptors.cpp where epoll_acceptor is complete
     392                 :     void operator()() override;
     393                 :     void cancel() noexcept override;
     394                 : };
     395                 : 
     396                 : } // namespace boost::corosio::detail
     397                 : 
     398                 : #endif // BOOST_COROSIO_HAS_EPOLL
     399                 : 
     400                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
        

Generated by: LCOV version 2.3