LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_socket_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 75.6 % 398 301 97
Test Date: 2026-02-17 20:54:14 Functions: 94.7 % 38 36 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/socket_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/select/select_socket.hpp>
      22                 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
      23                 : 
      24                 : #include <boost/corosio/detail/endpoint_convert.hpp>
      25                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      26                 : #include <boost/corosio/detail/make_err.hpp>
      27                 : 
      28                 : #include <boost/corosio/detail/except.hpp>
      29                 : 
      30                 : #include <boost/capy/buffers.hpp>
      31                 : 
      32                 : #include <errno.h>
      33                 : #include <fcntl.h>
      34                 : #include <netinet/in.h>
      35                 : #include <netinet/tcp.h>
      36                 : #include <sys/socket.h>
      37                 : #include <unistd.h>
      38                 : 
      39                 : #include <memory>
      40                 : #include <mutex>
      41                 : #include <unordered_map>
      42                 : 
      43                 : /*
      44                 :     select Socket Implementation
      45                 :     ============================
      46                 : 
      47                 :     This mirrors the epoll_sockets design for behavioral consistency.
      48                 :     Each I/O operation follows the same pattern:
      49                 :       1. Try the syscall immediately (non-blocking socket)
      50                 :       2. If it succeeds or fails with a real error, post to completion queue
      51                 :       3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
      52                 : 
      53                 :     Cancellation
      54                 :     ------------
      55                 :     See op.hpp for the completion/cancellation race handling via the
      56                 :     `registered` atomic. cancel() must complete pending operations (post
      57                 :     them with cancelled flag) so coroutines waiting on them can resume.
      58                 :     close_socket() calls cancel() first to ensure this.
      59                 : 
      60                 :     Impl Lifetime with shared_ptr
      61                 :     -----------------------------
      62                 :     Socket impls use enable_shared_from_this. The service owns impls via
      63                 :     shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
      64                 :     removal. When a user calls close(), we call cancel() which posts pending
      65                 :     ops to the scheduler.
      66                 : 
      67                 :     CRITICAL: The posted ops must keep the impl alive until they complete.
      68                 :     Otherwise the scheduler would process a freed op (use-after-free). The
      69                 :     cancel() method captures shared_from_this() into op.impl_ptr before
      70                 :     posting. When the op completes, impl_ptr is cleared, allowing the impl
      71                 :     to be destroyed if no other references exist.
      72                 : 
      73                 :     Service Ownership
      74                 :     -----------------
      75                 :     select_socket_service owns all socket impls. destroy() removes the
      76                 :     shared_ptr from the map, but the impl may survive if ops still hold
      77                 :     impl_ptr refs. shutdown() closes all sockets and clears the map; any
      78                 :     in-flight ops will complete and release their refs.
      79                 : */
      80                 : 
      81                 : namespace boost::corosio::detail {
      82                 : 
      83                 : /** State for select socket service. */
      84                 : class select_socket_state
      85                 : {
      86                 : public:
      87 HIT         135 :     explicit select_socket_state(select_scheduler& sched) noexcept
      88             135 :         : sched_(sched)
      89                 :     {
      90             135 :     }
      91                 : 
      92                 :     select_scheduler& sched_;
      93                 :     std::mutex mutex_;
      94                 :     intrusive_list<select_socket> socket_list_;
      95                 :     std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
      96                 :         socket_ptrs_;
      97                 : };
      98                 : 
      99                 : /** select socket service implementation.
     100                 : 
     101                 :     Inherits from socket_service to enable runtime polymorphism.
     102                 :     Uses key_type = socket_service for service lookup.
     103                 : */
     104                 : class BOOST_COROSIO_DECL select_socket_service final : public socket_service
     105                 : {
     106                 : public:
     107                 :     explicit select_socket_service(capy::execution_context& ctx);
     108                 :     ~select_socket_service() override;
     109                 : 
     110                 :     select_socket_service(select_socket_service const&)            = delete;
     111                 :     select_socket_service& operator=(select_socket_service const&) = delete;
     112                 : 
     113                 :     void shutdown() override;
     114                 : 
     115                 :     io_object::implementation* construct() override;
     116                 :     void destroy(io_object::implementation*) override;
     117                 :     void close(io_object::handle&) override;
     118                 :     std::error_code open_socket(tcp_socket::implementation& impl) override;
     119                 : 
     120           10077 :     select_scheduler& scheduler() const noexcept
     121                 :     {
     122           10077 :         return state_->sched_;
     123                 :     }
     124                 :     void post(select_op* op);
     125                 :     void work_started() noexcept;
     126                 :     void work_finished() noexcept;
     127                 : 
     128                 : private:
     129                 :     std::unique_ptr<select_socket_state> state_;
     130                 : };
     131                 : 
     132                 : // Backward compatibility alias
     133                 : using select_sockets = select_socket_service;
     134                 : 
     135                 : inline void
     136              99 : select_op::canceller::operator()() const noexcept
     137                 : {
     138              99 :     op->cancel();
     139              99 : }
     140                 : 
     141                 : inline void
     142 MIS           0 : select_connect_op::cancel() noexcept
     143                 : {
     144               0 :     if (socket_impl_)
     145               0 :         socket_impl_->cancel_single_op(*this);
     146                 :     else
     147               0 :         request_cancel();
     148               0 : }
     149                 : 
     150                 : inline void
     151 HIT          99 : select_read_op::cancel() noexcept
     152                 : {
     153              99 :     if (socket_impl_)
     154              99 :         socket_impl_->cancel_single_op(*this);
     155                 :     else
     156 MIS           0 :         request_cancel();
     157 HIT          99 : }
     158                 : 
     159                 : inline void
     160 MIS           0 : select_write_op::cancel() noexcept
     161                 : {
     162               0 :     if (socket_impl_)
     163               0 :         socket_impl_->cancel_single_op(*this);
     164                 :     else
     165               0 :         request_cancel();
     166               0 : }
     167                 : 
     168                 : inline void
     169 HIT        3208 : select_connect_op::operator()()
     170                 : {
     171            3208 :     stop_cb.reset();
     172                 : 
     173            3208 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
     174                 : 
     175                 :     // Cache endpoints on successful connect
     176            3208 :     if (success && socket_impl_)
     177                 :     {
     178                 :         // Query local endpoint via getsockname (may fail, but remote is always known)
     179            3207 :         endpoint local_ep;
     180            3207 :         sockaddr_in local_addr{};
     181            3207 :         socklen_t local_len = sizeof(local_addr);
     182            3207 :         if (::getsockname(
     183            3207 :                 fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     184            3207 :             local_ep = from_sockaddr_in(local_addr);
     185                 :         // Always cache remote endpoint; local may be default if getsockname failed
     186            3207 :         static_cast<select_socket*>(socket_impl_)
     187            3207 :             ->set_endpoints(local_ep, target_endpoint);
     188                 :     }
     189                 : 
     190            3208 :     if (ec_out)
     191                 :     {
     192            3208 :         if (cancelled.load(std::memory_order_acquire))
     193 MIS           0 :             *ec_out = capy::error::canceled;
     194 HIT        3208 :         else if (errn != 0)
     195               1 :             *ec_out = make_err(errn);
     196                 :         else
     197            3207 :             *ec_out = {};
     198                 :     }
     199                 : 
     200            3208 :     if (bytes_out)
     201 MIS           0 :         *bytes_out = bytes_transferred;
     202                 : 
     203                 :     // Move to stack before destroying the frame
     204 HIT        3208 :     capy::executor_ref saved_ex(ex);
     205            3208 :     std::coroutine_handle<> saved_h(h);
     206            3208 :     impl_ptr.reset();
     207            3208 :     dispatch_coro(saved_ex, saved_h).resume();
     208            3208 : }
     209                 : 
     210            9641 : inline select_socket::select_socket(select_socket_service& svc) noexcept
     211            9641 :     : svc_(svc)
     212                 : {
     213            9641 : }
     214                 : 
     215                 : inline std::coroutine_handle<>
     216            3208 : select_socket::connect(
     217                 :     std::coroutine_handle<> h,
     218                 :     capy::executor_ref ex,
     219                 :     endpoint ep,
     220                 :     std::stop_token token,
     221                 :     std::error_code* ec)
     222                 : {
     223            3208 :     auto& op = conn_;
     224            3208 :     op.reset();
     225            3208 :     op.h               = h;
     226            3208 :     op.ex              = ex;
     227            3208 :     op.ec_out          = ec;
     228            3208 :     op.fd              = fd_;
     229            3208 :     op.target_endpoint = ep; // Store target for endpoint caching
     230            3208 :     op.start(token, this);
     231                 : 
     232            3208 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     233                 :     int result =
     234            3208 :         ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
     235                 : 
     236            3208 :     if (result == 0)
     237                 :     {
     238                 :         // Sync success - cache endpoints immediately
     239 MIS           0 :         sockaddr_in local_addr{};
     240               0 :         socklen_t local_len = sizeof(local_addr);
     241               0 :         if (::getsockname(
     242               0 :                 fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     243               0 :             local_endpoint_ = detail::from_sockaddr_in(local_addr);
     244               0 :         remote_endpoint_ = ep;
     245                 : 
     246               0 :         op.complete(0, 0);
     247               0 :         op.impl_ptr = shared_from_this();
     248               0 :         svc_.post(&op);
     249                 :         // completion is always posted to scheduler queue, never inline.
     250               0 :         return std::noop_coroutine();
     251                 :     }
     252                 : 
     253 HIT        3208 :     if (errno == EINPROGRESS)
     254                 :     {
     255            3208 :         svc_.work_started();
     256            3208 :         op.impl_ptr = shared_from_this();
     257                 : 
     258                 :         // Set registering BEFORE register_fd to close the race window where
     259                 :         // reactor sees an event before we set registered. The reactor treats
     260                 :         // registering the same as registered when claiming the op.
     261            3208 :         op.registered.store(
     262                 :             select_registration_state::registering, std::memory_order_release);
     263            3208 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     264                 : 
     265                 :         // Transition to registered. If this fails, reactor or cancel already
     266                 :         // claimed the op (state is now unregistered), so we're done. However,
     267                 :         // we must still deregister the fd because cancel's deregister_fd may
     268                 :         // have run before our register_fd, leaving the fd orphaned.
     269            3208 :         auto expected = select_registration_state::registering;
     270            3208 :         if (!op.registered.compare_exchange_strong(
     271                 :                 expected, select_registration_state::registered,
     272                 :                 std::memory_order_acq_rel))
     273                 :         {
     274 MIS           0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     275                 :             // completion is always posted to scheduler queue, never inline.
     276               0 :             return std::noop_coroutine();
     277                 :         }
     278                 : 
     279                 :         // If cancelled was set before we registered, handle it now.
     280 HIT        3208 :         if (op.cancelled.load(std::memory_order_acquire))
     281                 :         {
     282 MIS           0 :             auto prev = op.registered.exchange(
     283                 :                 select_registration_state::unregistered,
     284                 :                 std::memory_order_acq_rel);
     285               0 :             if (prev != select_registration_state::unregistered)
     286                 :             {
     287               0 :                 svc_.scheduler().deregister_fd(
     288                 :                     fd_, select_scheduler::event_write);
     289               0 :                 op.impl_ptr = shared_from_this();
     290               0 :                 svc_.post(&op);
     291               0 :                 svc_.work_finished();
     292                 :             }
     293                 :         }
     294                 :         // completion is always posted to scheduler queue, never inline.
     295 HIT        3208 :         return std::noop_coroutine();
     296                 :     }
     297                 : 
     298 MIS           0 :     op.complete(errno, 0);
     299               0 :     op.impl_ptr = shared_from_this();
     300               0 :     svc_.post(&op);
     301                 :     // completion is always posted to scheduler queue, never inline.
     302               0 :     return std::noop_coroutine();
     303                 : }
     304                 : 
     305                 : inline std::coroutine_handle<>
     306 HIT      107446 : select_socket::read_some(
     307                 :     std::coroutine_handle<> h,
     308                 :     capy::executor_ref ex,
     309                 :     io_buffer_param param,
     310                 :     std::stop_token token,
     311                 :     std::error_code* ec,
     312                 :     std::size_t* bytes_out)
     313                 : {
     314          107446 :     auto& op = rd_;
     315          107446 :     op.reset();
     316          107446 :     op.h         = h;
     317          107446 :     op.ex        = ex;
     318          107446 :     op.ec_out    = ec;
     319          107446 :     op.bytes_out = bytes_out;
     320          107446 :     op.fd        = fd_;
     321          107446 :     op.start(token, this);
     322                 : 
     323          107446 :     capy::mutable_buffer bufs[select_read_op::max_buffers];
     324          107446 :     op.iovec_count =
     325          107446 :         static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
     326                 : 
     327          107446 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     328                 :     {
     329               1 :         op.empty_buffer_read = true;
     330               1 :         op.complete(0, 0);
     331               1 :         op.impl_ptr = shared_from_this();
     332               1 :         svc_.post(&op);
     333               1 :         return std::noop_coroutine();
     334                 :     }
     335                 : 
     336          214890 :     for (int i = 0; i < op.iovec_count; ++i)
     337                 :     {
     338          107445 :         op.iovecs[i].iov_base = bufs[i].data();
     339          107445 :         op.iovecs[i].iov_len  = bufs[i].size();
     340                 :     }
     341                 : 
     342          107445 :     ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
     343                 : 
     344          107445 :     if (n > 0)
     345                 :     {
     346          107157 :         op.complete(0, static_cast<std::size_t>(n));
     347          107157 :         op.impl_ptr = shared_from_this();
     348          107157 :         svc_.post(&op);
     349          107157 :         return std::noop_coroutine();
     350                 :     }
     351                 : 
     352             288 :     if (n == 0)
     353                 :     {
     354               5 :         op.complete(0, 0);
     355               5 :         op.impl_ptr = shared_from_this();
     356               5 :         svc_.post(&op);
     357               5 :         return std::noop_coroutine();
     358                 :     }
     359                 : 
     360             283 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     361                 :     {
     362             283 :         svc_.work_started();
     363             283 :         op.impl_ptr = shared_from_this();
     364                 : 
     365                 :         // Set registering BEFORE register_fd to close the race window where
     366                 :         // reactor sees an event before we set registered.
     367             283 :         op.registered.store(
     368                 :             select_registration_state::registering, std::memory_order_release);
     369             283 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
     370                 : 
     371                 :         // Transition to registered. If this fails, reactor or cancel already
     372                 :         // claimed the op (state is now unregistered), so we're done. However,
     373                 :         // we must still deregister the fd because cancel's deregister_fd may
     374                 :         // have run before our register_fd, leaving the fd orphaned.
     375             283 :         auto expected = select_registration_state::registering;
     376             283 :         if (!op.registered.compare_exchange_strong(
     377                 :                 expected, select_registration_state::registered,
     378                 :                 std::memory_order_acq_rel))
     379                 :         {
     380 MIS           0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     381               0 :             return std::noop_coroutine();
     382                 :         }
     383                 : 
     384                 :         // If cancelled was set before we registered, handle it now.
     385 HIT         283 :         if (op.cancelled.load(std::memory_order_acquire))
     386                 :         {
     387 MIS           0 :             auto prev = op.registered.exchange(
     388                 :                 select_registration_state::unregistered,
     389                 :                 std::memory_order_acq_rel);
     390               0 :             if (prev != select_registration_state::unregistered)
     391                 :             {
     392               0 :                 svc_.scheduler().deregister_fd(
     393                 :                     fd_, select_scheduler::event_read);
     394               0 :                 op.impl_ptr = shared_from_this();
     395               0 :                 svc_.post(&op);
     396               0 :                 svc_.work_finished();
     397                 :             }
     398                 :         }
     399 HIT         283 :         return std::noop_coroutine();
     400                 :     }
     401                 : 
     402 MIS           0 :     op.complete(errno, 0);
     403               0 :     op.impl_ptr = shared_from_this();
     404               0 :     svc_.post(&op);
     405               0 :     return std::noop_coroutine();
     406                 : }
     407                 : 
     408                 : inline std::coroutine_handle<>
     409 HIT      107282 : select_socket::write_some(
     410                 :     std::coroutine_handle<> h,
     411                 :     capy::executor_ref ex,
     412                 :     io_buffer_param param,
     413                 :     std::stop_token token,
     414                 :     std::error_code* ec,
     415                 :     std::size_t* bytes_out)
     416                 : {
     417          107282 :     auto& op = wr_;
     418          107282 :     op.reset();
     419          107282 :     op.h         = h;
     420          107282 :     op.ex        = ex;
     421          107282 :     op.ec_out    = ec;
     422          107282 :     op.bytes_out = bytes_out;
     423          107282 :     op.fd        = fd_;
     424          107282 :     op.start(token, this);
     425                 : 
     426          107282 :     capy::mutable_buffer bufs[select_write_op::max_buffers];
     427          107282 :     op.iovec_count =
     428          107282 :         static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
     429                 : 
     430          107282 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     431                 :     {
     432               1 :         op.complete(0, 0);
     433               1 :         op.impl_ptr = shared_from_this();
     434               1 :         svc_.post(&op);
     435               1 :         return std::noop_coroutine();
     436                 :     }
     437                 : 
     438          214562 :     for (int i = 0; i < op.iovec_count; ++i)
     439                 :     {
     440          107281 :         op.iovecs[i].iov_base = bufs[i].data();
     441          107281 :         op.iovecs[i].iov_len  = bufs[i].size();
     442                 :     }
     443                 : 
     444          107281 :     msghdr msg{};
     445          107281 :     msg.msg_iov    = op.iovecs;
     446          107281 :     msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
     447                 : 
     448          107281 :     ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
     449                 : 
     450          107281 :     if (n > 0)
     451                 :     {
     452          107280 :         op.complete(0, static_cast<std::size_t>(n));
     453          107280 :         op.impl_ptr = shared_from_this();
     454          107280 :         svc_.post(&op);
     455          107280 :         return std::noop_coroutine();
     456                 :     }
     457                 : 
     458               1 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     459                 :     {
     460 MIS           0 :         svc_.work_started();
     461               0 :         op.impl_ptr = shared_from_this();
     462                 : 
     463                 :         // Set registering BEFORE register_fd to close the race window where
     464                 :         // reactor sees an event before we set registered.
     465               0 :         op.registered.store(
     466                 :             select_registration_state::registering, std::memory_order_release);
     467               0 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     468                 : 
     469                 :         // Transition to registered. If this fails, reactor or cancel already
     470                 :         // claimed the op (state is now unregistered), so we're done. However,
     471                 :         // we must still deregister the fd because cancel's deregister_fd may
     472                 :         // have run before our register_fd, leaving the fd orphaned.
     473               0 :         auto expected = select_registration_state::registering;
     474               0 :         if (!op.registered.compare_exchange_strong(
     475                 :                 expected, select_registration_state::registered,
     476                 :                 std::memory_order_acq_rel))
     477                 :         {
     478               0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     479               0 :             return std::noop_coroutine();
     480                 :         }
     481                 : 
     482                 :         // If cancelled was set before we registered, handle it now.
     483               0 :         if (op.cancelled.load(std::memory_order_acquire))
     484                 :         {
     485               0 :             auto prev = op.registered.exchange(
     486                 :                 select_registration_state::unregistered,
     487                 :                 std::memory_order_acq_rel);
     488               0 :             if (prev != select_registration_state::unregistered)
     489                 :             {
     490               0 :                 svc_.scheduler().deregister_fd(
     491                 :                     fd_, select_scheduler::event_write);
     492               0 :                 op.impl_ptr = shared_from_this();
     493               0 :                 svc_.post(&op);
     494               0 :                 svc_.work_finished();
     495                 :             }
     496                 :         }
     497               0 :         return std::noop_coroutine();
     498                 :     }
     499                 : 
     500 HIT           1 :     op.complete(errno ? errno : EIO, 0);
     501               1 :     op.impl_ptr = shared_from_this();
     502               1 :     svc_.post(&op);
     503               1 :     return std::noop_coroutine();
     504                 : }
     505                 : 
     506                 : inline std::error_code
     507               3 : select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
     508                 : {
     509                 :     int how;
     510               3 :     switch (what)
     511                 :     {
     512               1 :     case tcp_socket::shutdown_receive:
     513               1 :         how = SHUT_RD;
     514               1 :         break;
     515               1 :     case tcp_socket::shutdown_send:
     516               1 :         how = SHUT_WR;
     517               1 :         break;
     518               1 :     case tcp_socket::shutdown_both:
     519               1 :         how = SHUT_RDWR;
     520               1 :         break;
     521 MIS           0 :     default:
     522               0 :         return make_err(EINVAL);
     523                 :     }
     524 HIT           3 :     if (::shutdown(fd_, how) != 0)
     525 MIS           0 :         return make_err(errno);
     526 HIT           3 :     return {};
     527                 : }
     528                 : 
     529                 : inline std::error_code
     530               5 : select_socket::set_no_delay(bool value) noexcept
     531                 : {
     532               5 :     int flag = value ? 1 : 0;
     533               5 :     if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
     534 MIS           0 :         return make_err(errno);
     535 HIT           5 :     return {};
     536                 : }
     537                 : 
     538                 : inline bool
     539               5 : select_socket::no_delay(std::error_code& ec) const noexcept
     540                 : {
     541               5 :     int flag      = 0;
     542               5 :     socklen_t len = sizeof(flag);
     543               5 :     if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
     544                 :     {
     545 MIS           0 :         ec = make_err(errno);
     546               0 :         return false;
     547                 :     }
     548 HIT           5 :     ec = {};
     549               5 :     return flag != 0;
     550                 : }
     551                 : 
     552                 : inline std::error_code
     553               4 : select_socket::set_keep_alive(bool value) noexcept
     554                 : {
     555               4 :     int flag = value ? 1 : 0;
     556               4 :     if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
     557 MIS           0 :         return make_err(errno);
     558 HIT           4 :     return {};
     559                 : }
     560                 : 
     561                 : inline bool
     562               4 : select_socket::keep_alive(std::error_code& ec) const noexcept
     563                 : {
     564               4 :     int flag      = 0;
     565               4 :     socklen_t len = sizeof(flag);
     566               4 :     if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
     567                 :     {
     568 MIS           0 :         ec = make_err(errno);
     569               0 :         return false;
     570                 :     }
     571 HIT           4 :     ec = {};
     572               4 :     return flag != 0;
     573                 : }
     574                 : 
     575                 : inline std::error_code
     576               1 : select_socket::set_receive_buffer_size(int size) noexcept
     577                 : {
     578               1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
     579 MIS           0 :         return make_err(errno);
     580 HIT           1 :     return {};
     581                 : }
     582                 : 
     583                 : inline int
     584               3 : select_socket::receive_buffer_size(std::error_code& ec) const noexcept
     585                 : {
     586               3 :     int size      = 0;
     587               3 :     socklen_t len = sizeof(size);
     588               3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
     589                 :     {
     590 MIS           0 :         ec = make_err(errno);
     591               0 :         return 0;
     592                 :     }
     593 HIT           3 :     ec = {};
     594               3 :     return size;
     595                 : }
     596                 : 
     597                 : inline std::error_code
     598               1 : select_socket::set_send_buffer_size(int size) noexcept
     599                 : {
     600               1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
     601 MIS           0 :         return make_err(errno);
     602 HIT           1 :     return {};
     603                 : }
     604                 : 
     605                 : inline int
     606               3 : select_socket::send_buffer_size(std::error_code& ec) const noexcept
     607                 : {
     608               3 :     int size      = 0;
     609               3 :     socklen_t len = sizeof(size);
     610               3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
     611                 :     {
     612 MIS           0 :         ec = make_err(errno);
     613               0 :         return 0;
     614                 :     }
     615 HIT           3 :     ec = {};
     616               3 :     return size;
     617                 : }
     618                 : 
     619                 : inline std::error_code
     620               6 : select_socket::set_linger(bool enabled, int timeout) noexcept
     621                 : {
     622               6 :     if (timeout < 0)
     623               1 :         return make_err(EINVAL);
     624                 :     struct ::linger lg;
     625               5 :     lg.l_onoff  = enabled ? 1 : 0;
     626               5 :     lg.l_linger = timeout;
     627               5 :     if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
     628 MIS           0 :         return make_err(errno);
     629 HIT           5 :     return {};
     630                 : }
     631                 : 
     632                 : inline tcp_socket::linger_options
     633               3 : select_socket::linger(std::error_code& ec) const noexcept
     634                 : {
     635               3 :     struct ::linger lg{};
     636               3 :     socklen_t len = sizeof(lg);
     637               3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
     638                 :     {
     639 MIS           0 :         ec = make_err(errno);
     640               0 :         return {};
     641                 :     }
     642 HIT           3 :     ec = {};
     643               3 :     return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
     644                 : }
     645                 : 
     646                 : inline void
     647             177 : select_socket::cancel() noexcept
     648                 : {
     649             177 :     auto self = weak_from_this().lock();
     650             177 :     if (!self)
     651 MIS           0 :         return;
     652                 : 
     653 HIT         531 :     auto cancel_op = [this, &self](select_op& op, int events) {
     654             531 :         auto prev = op.registered.exchange(
     655                 :             select_registration_state::unregistered, std::memory_order_acq_rel);
     656             531 :         op.request_cancel();
     657             531 :         if (prev != select_registration_state::unregistered)
     658                 :         {
     659              92 :             svc_.scheduler().deregister_fd(fd_, events);
     660              92 :             op.impl_ptr = self;
     661              92 :             svc_.post(&op);
     662              92 :             svc_.work_finished();
     663                 :         }
     664             708 :     };
     665                 : 
     666             177 :     cancel_op(conn_, select_scheduler::event_write);
     667             177 :     cancel_op(rd_, select_scheduler::event_read);
     668             177 :     cancel_op(wr_, select_scheduler::event_write);
     669             177 : }
     670                 : 
     671                 : inline void
     672              99 : select_socket::cancel_single_op(select_op& op) noexcept
     673                 : {
     674              99 :     auto self = weak_from_this().lock();
     675              99 :     if (!self)
     676 MIS           0 :         return;
     677                 : 
     678                 :     // Called from stop_token callback to cancel a specific pending operation.
     679 HIT          99 :     auto prev = op.registered.exchange(
     680                 :         select_registration_state::unregistered, std::memory_order_acq_rel);
     681              99 :     op.request_cancel();
     682                 : 
     683              99 :     if (prev != select_registration_state::unregistered)
     684                 :     {
     685                 :         // Determine which event type to deregister
     686              67 :         int events = 0;
     687              67 :         if (&op == &conn_ || &op == &wr_)
     688 MIS           0 :             events = select_scheduler::event_write;
     689 HIT          67 :         else if (&op == &rd_)
     690              67 :             events = select_scheduler::event_read;
     691                 : 
     692              67 :         svc_.scheduler().deregister_fd(fd_, events);
     693                 : 
     694              67 :         op.impl_ptr = self;
     695              67 :         svc_.post(&op);
     696              67 :         svc_.work_finished();
     697                 :     }
     698              99 : }
     699                 : 
     700                 : inline void
     701           28927 : select_socket::close_socket() noexcept
     702                 : {
     703           28927 :     auto self = weak_from_this().lock();
     704           28927 :     if (self)
     705                 :     {
     706           86781 :         auto cancel_op = [this, &self](select_op& op, int events) {
     707           86781 :             auto prev = op.registered.exchange(
     708                 :                 select_registration_state::unregistered,
     709                 :                 std::memory_order_acq_rel);
     710           86781 :             op.request_cancel();
     711           86781 :             if (prev != select_registration_state::unregistered)
     712                 :             {
     713               1 :                 svc_.scheduler().deregister_fd(fd_, events);
     714               1 :                 op.impl_ptr = self;
     715               1 :                 svc_.post(&op);
     716               1 :                 svc_.work_finished();
     717                 :             }
     718          115708 :         };
     719                 : 
     720           28927 :         cancel_op(conn_, select_scheduler::event_write);
     721           28927 :         cancel_op(rd_, select_scheduler::event_read);
     722           28927 :         cancel_op(wr_, select_scheduler::event_write);
     723                 :     }
     724                 : 
     725           28927 :     if (fd_ >= 0)
     726                 :     {
     727            6426 :         svc_.scheduler().deregister_fd(
     728                 :             fd_, select_scheduler::event_read | select_scheduler::event_write);
     729            6426 :         ::close(fd_);
     730            6426 :         fd_ = -1;
     731                 :     }
     732                 : 
     733           28927 :     local_endpoint_  = endpoint{};
     734           28927 :     remote_endpoint_ = endpoint{};
     735           28927 : }
     736                 : 
     737             135 : inline select_socket_service::select_socket_service(
     738             135 :     capy::execution_context& ctx)
     739             135 :     : state_(
     740                 :           std::make_unique<select_socket_state>(
     741             135 :               ctx.use_service<select_scheduler>()))
     742                 : {
     743             135 : }
     744                 : 
     745             270 : inline select_socket_service::~select_socket_service() {}
     746                 : 
     747                 : inline void
     748             135 : select_socket_service::shutdown()
     749                 : {
     750             135 :     std::lock_guard lock(state_->mutex_);
     751                 : 
     752             135 :     while (auto* impl = state_->socket_list_.pop_front())
     753 MIS           0 :         impl->close_socket();
     754                 : 
     755                 :     // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
     756                 :     // drains completed_ops_, calling destroy() on each queued op. Letting
     757                 :     // ~state_ release the ptrs (during service destruction, after scheduler
     758                 :     // shutdown) keeps every impl alive until all ops have been drained.
     759 HIT         135 : }
     760                 : 
     761                 : inline io_object::implementation*
     762            9641 : select_socket_service::construct()
     763                 : {
     764            9641 :     auto impl = std::make_shared<select_socket>(*this);
     765            9641 :     auto* raw = impl.get();
     766                 : 
     767                 :     {
     768            9641 :         std::lock_guard lock(state_->mutex_);
     769            9641 :         state_->socket_list_.push_back(raw);
     770            9641 :         state_->socket_ptrs_.emplace(raw, std::move(impl));
     771            9641 :     }
     772                 : 
     773            9641 :     return raw;
     774            9641 : }
     775                 : 
     776                 : inline void
     777            9641 : select_socket_service::destroy(io_object::implementation* impl)
     778                 : {
     779            9641 :     auto* select_impl = static_cast<select_socket*>(impl);
     780            9641 :     select_impl->close_socket();
     781            9641 :     std::lock_guard lock(state_->mutex_);
     782            9641 :     state_->socket_list_.remove(select_impl);
     783            9641 :     state_->socket_ptrs_.erase(select_impl);
     784            9641 : }
     785                 : 
     786                 : inline std::error_code
     787            3219 : select_socket_service::open_socket(tcp_socket::implementation& impl)
     788                 : {
     789            3219 :     auto* select_impl = static_cast<select_socket*>(&impl);
     790            3219 :     select_impl->close_socket();
     791                 : 
     792            3219 :     int fd = ::socket(AF_INET, SOCK_STREAM, 0);
     793            3219 :     if (fd < 0)
     794 MIS           0 :         return make_err(errno);
     795                 : 
     796                 :     // Set non-blocking and close-on-exec
     797 HIT        3219 :     int flags = ::fcntl(fd, F_GETFL, 0);
     798            3219 :     if (flags == -1)
     799                 :     {
     800 MIS           0 :         int errn = errno;
     801               0 :         ::close(fd);
     802               0 :         return make_err(errn);
     803                 :     }
     804 HIT        3219 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     805                 :     {
     806 MIS           0 :         int errn = errno;
     807               0 :         ::close(fd);
     808               0 :         return make_err(errn);
     809                 :     }
     810 HIT        3219 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     811                 :     {
     812 MIS           0 :         int errn = errno;
     813               0 :         ::close(fd);
     814               0 :         return make_err(errn);
     815                 :     }
     816                 : 
     817                 :     // Check fd is within select() limits
     818 HIT        3219 :     if (fd >= FD_SETSIZE)
     819                 :     {
     820 MIS           0 :         ::close(fd);
     821               0 :         return make_err(EMFILE); // Too many open files
     822                 :     }
     823                 : 
     824 HIT        3219 :     select_impl->fd_ = fd;
     825            3219 :     return {};
     826                 : }
     827                 : 
     828                 : inline void
     829           16067 : select_socket_service::close(io_object::handle& h)
     830                 : {
     831           16067 :     static_cast<select_socket*>(h.get())->close_socket();
     832           16067 : }
     833                 : 
     834                 : inline void
     835          214605 : select_socket_service::post(select_op* op)
     836                 : {
     837          214605 :     state_->sched_.post(op);
     838          214605 : }
     839                 : 
     840                 : inline void
     841            3491 : select_socket_service::work_started() noexcept
     842                 : {
     843            3491 :     state_->sched_.work_started();
     844            3491 : }
     845                 : 
     846                 : inline void
     847             160 : select_socket_service::work_finished() noexcept
     848                 : {
     849             160 :     state_->sched_.work_finished();
     850             160 : }
     851                 : 
     852                 : } // namespace boost::corosio::detail
     853                 : 
     854                 : #endif // BOOST_COROSIO_HAS_SELECT
     855                 : 
     856                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
        

Generated by: LCOV version 2.3