LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_scheduler.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 72.7 % 359 261 98
Test Date: 2026-02-17 20:54:14 Functions: 82.9 % 35 29 6

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : 
      20                 : #include <boost/corosio/native/native_scheduler.hpp>
      21                 : #include <boost/corosio/detail/scheduler_op.hpp>
      22                 : 
      23                 : #include <boost/corosio/native/detail/select/select_op.hpp>
      24                 : #include <boost/corosio/detail/timer_service.hpp>
      25                 : #include <boost/corosio/detail/make_err.hpp>
      26                 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
      27                 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
      28                 : 
      29                 : #include <boost/corosio/detail/except.hpp>
      30                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      31                 : 
      32                 : #include <sys/select.h>
      33                 : #include <sys/socket.h>
      34                 : #include <unistd.h>
      35                 : #include <errno.h>
      36                 : #include <fcntl.h>
      37                 : 
      38                 : #include <algorithm>
      39                 : #include <atomic>
      40                 : #include <chrono>
      41                 : #include <condition_variable>
      42                 : #include <cstddef>
      43                 : #include <limits>
      44                 : #include <mutex>
      45                 : #include <unordered_map>
      46                 : 
      47                 : namespace boost::corosio::detail {
      48                 : 
      49                 : struct select_op;
      50                 : 
      51                 : /** POSIX scheduler using select() for I/O multiplexing.
      52                 : 
      53                 :     This scheduler implements the scheduler interface using the POSIX select()
      54                 :     call for I/O event notification. It uses a single reactor model
      55                 :     where one thread runs select() while other threads wait on a condition
      56                 :     variable for handler work. This design provides:
      57                 : 
      58                 :     - Handler parallelism: N posted handlers can execute on N threads
      59                 :     - No thundering herd: condition_variable wakes exactly one thread
      60                 :     - Portability: Works on all POSIX systems
      61                 : 
      62                 :     The design mirrors epoll_scheduler for behavioral consistency:
      63                 :     - Same single-reactor thread coordination model
      64                 :     - Same work counting semantics
      65                 :     - Same timer integration pattern
      66                 : 
      67                 :     Known Limitations:
      68                 :     - FD_SETSIZE (~1024) limits maximum concurrent connections
      69                 :     - O(n) scanning: rebuilds fd_sets each iteration
      70                 :     - Level-triggered only (no edge-triggered mode)
      71                 : 
      72                 :     @par Thread Safety
      73                 :     All public member functions are thread-safe.
      74                 : */
      75                 : class BOOST_COROSIO_DECL select_scheduler final
      76                 :     : public native_scheduler
      77                 :     , public capy::execution_context::service
      78                 : {
      79                 : public:
      80                 :     using key_type = scheduler;
      81                 : 
      82                 :     /** Construct the scheduler.
      83                 : 
      84                 :         Creates a self-pipe for reactor interruption.
      85                 : 
      86                 :         @param ctx Reference to the owning execution_context.
      87                 :         @param concurrency_hint Hint for expected thread count (unused).
      88                 :     */
      89                 :     select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
      90                 : 
      91                 :     ~select_scheduler() override;
      92                 : 
      93                 :     select_scheduler(select_scheduler const&)            = delete;
      94                 :     select_scheduler& operator=(select_scheduler const&) = delete;
      95                 : 
      96                 :     void shutdown() override;
      97                 :     void post(std::coroutine_handle<> h) const override;
      98                 :     void post(scheduler_op* h) const override;
      99                 :     bool running_in_this_thread() const noexcept override;
     100                 :     void stop() override;
     101                 :     bool stopped() const noexcept override;
     102                 :     void restart() override;
     103                 :     std::size_t run() override;
     104                 :     std::size_t run_one() override;
     105                 :     std::size_t wait_one(long usec) override;
     106                 :     std::size_t poll() override;
     107                 :     std::size_t poll_one() override;
     108                 : 
     109                 :     /** Return the maximum file descriptor value supported.
     110                 : 
     111                 :         Returns FD_SETSIZE - 1, the maximum fd value that can be
     112                 :         monitored by select(). Operations with fd >= FD_SETSIZE
     113                 :         will fail with EINVAL.
     114                 : 
     115                 :         @return The maximum supported file descriptor value.
     116                 :     */
     117                 :     static constexpr int max_fd() noexcept
     118                 :     {
     119                 :         return FD_SETSIZE - 1;
     120                 :     }
     121                 : 
     122                 :     /** Register a file descriptor for monitoring.
     123                 : 
     124                 :         @param fd The file descriptor to register.
     125                 :         @param op The operation associated with this fd.
     126                 :         @param events Event mask: 1 = read, 2 = write, 3 = both.
     127                 :     */
     128                 :     void register_fd(int fd, select_op* op, int events) const;
     129                 : 
     130                 :     /** Unregister a file descriptor from monitoring.
     131                 : 
     132                 :         @param fd The file descriptor to unregister.
     133                 :         @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
     134                 :     */
     135                 :     void deregister_fd(int fd, int events) const;
     136                 : 
     137                 :     void work_started() noexcept override;
     138                 :     void work_finished() noexcept override;
     139                 : 
     140                 :     // Event flags for register_fd/deregister_fd
     141                 :     static constexpr int event_read  = 1;
     142                 :     static constexpr int event_write = 2;
     143                 : 
     144                 : private:
     145                 :     std::size_t do_one(long timeout_us);
     146                 :     void run_reactor(std::unique_lock<std::mutex>& lock);
     147                 :     void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
     148                 :     void interrupt_reactor() const;
     149                 :     long calculate_timeout(long requested_timeout_us) const;
     150                 : 
     151                 :     // Self-pipe for interrupting select()
     152                 :     int pipe_fds_[2]; // [0]=read, [1]=write
     153                 : 
     154                 :     mutable std::mutex mutex_;
     155                 :     mutable std::condition_variable wakeup_event_;
     156                 :     mutable op_queue completed_ops_;
     157                 :     mutable std::atomic<long> outstanding_work_;
     158                 :     std::atomic<bool> stopped_;
     159                 :     bool shutdown_;
     160                 : 
     161                 :     // Per-fd state for tracking registered operations
     162                 :     struct fd_state
     163                 :     {
     164                 :         select_op* read_op  = nullptr;
     165                 :         select_op* write_op = nullptr;
     166                 :     };
     167                 :     mutable std::unordered_map<int, fd_state> registered_fds_;
     168                 :     mutable int max_fd_ = -1;
     169                 : 
     170                 :     // Single reactor thread coordination
     171                 :     mutable bool reactor_running_     = false;
     172                 :     mutable bool reactor_interrupted_ = false;
     173                 :     mutable int idle_thread_count_    = 0;
     174                 : 
     175                 :     // Sentinel operation for interleaving reactor runs with handler execution.
     176                 :     // Ensures the reactor runs periodically even when handlers are continuously
     177                 :     // posted, preventing timer starvation.
     178                 :     struct task_op final : scheduler_op
     179                 :     {
     180 MIS           0 :         void operator()() override {}
     181               0 :         void destroy() override {}
     182                 :     };
     183                 :     task_op task_op_;
     184                 : };
     185                 : 
     186                 : /*
     187                 :     select Scheduler - Single Reactor Model
     188                 :     =======================================
     189                 : 
     190                 :     This scheduler mirrors the epoll_scheduler design but uses select() instead
     191                 :     of epoll for I/O multiplexing. The thread coordination strategy is identical:
     192                 :     one thread becomes the "reactor" while others wait on a condition variable.
     193                 : 
     194                 :     Thread Model
     195                 :     ------------
     196                 :     - ONE thread runs select() at a time (the reactor thread)
     197                 :     - OTHER threads wait on wakeup_event_ (condition variable) for handlers
     198                 :     - When work is posted, exactly one waiting thread wakes via notify_one()
     199                 : 
     200                 :     Key Differences from epoll
     201                 :     --------------------------
     202                 :     - Uses self-pipe instead of eventfd for interruption (more portable)
     203                 :     - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
     204                 :     - FD_SETSIZE limit (~1024 fds on most systems)
     205                 :     - Level-triggered only (no edge-triggered mode)
     206                 : 
     207                 :     Self-Pipe Pattern
     208                 :     -----------------
     209                 :     To interrupt a blocking select() call (e.g., when work is posted or a timer
     210                 :     expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
     211                 :     always in the read_fds set, so select() returns immediately. We drain the
     212                 :     pipe to clear the readable state.
     213                 : 
     214                 :     fd-to-op Mapping
     215                 :     ----------------
     216                 :     We use an unordered_map<int, fd_state> to track which operations are
     217                 :     registered for each fd. This allows O(1) lookup when select() returns
     218                 :     ready fds. Each fd can have at most one read op and one write op registered.
     219                 : */
     220                 : 
     221                 : namespace select {
     222                 : 
     223                 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
     224                 : {
     225                 :     select_scheduler const* key;
     226                 :     scheduler_context* next;
     227                 : };
     228                 : 
     229                 : inline thread_local_ptr<scheduler_context> context_stack;
     230                 : 
     231                 : struct thread_context_guard
     232                 : {
     233                 :     scheduler_context frame_;
     234                 : 
     235 HIT         124 :     explicit thread_context_guard(select_scheduler const* ctx) noexcept
     236             124 :         : frame_{ctx, context_stack.get()}
     237                 :     {
     238             124 :         context_stack.set(&frame_);
     239             124 :     }
     240                 : 
     241             124 :     ~thread_context_guard() noexcept
     242                 :     {
     243             124 :         context_stack.set(frame_.next);
     244             124 :     }
     245                 : };
     246                 : 
     247                 : struct work_guard
     248                 : {
     249                 :     select_scheduler* self;
     250          228359 :     ~work_guard()
     251                 :     {
     252          228359 :         self->work_finished();
     253          228359 :     }
     254                 : };
     255                 : 
     256                 : } // namespace select
     257                 : 
     258             135 : inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
     259             135 :     : pipe_fds_{-1, -1}
     260             135 :     , outstanding_work_(0)
     261             135 :     , stopped_(false)
     262             135 :     , shutdown_(false)
     263             135 :     , max_fd_(-1)
     264             135 :     , reactor_running_(false)
     265             135 :     , reactor_interrupted_(false)
     266             270 :     , idle_thread_count_(0)
     267                 : {
     268                 :     // Create self-pipe for interrupting select()
     269             135 :     if (::pipe(pipe_fds_) < 0)
     270 MIS           0 :         detail::throw_system_error(make_err(errno), "pipe");
     271                 : 
     272                 :     // Set both ends to non-blocking and close-on-exec
     273 HIT         405 :     for (int i = 0; i < 2; ++i)
     274                 :     {
     275             270 :         int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
     276             270 :         if (flags == -1)
     277                 :         {
     278 MIS           0 :             int errn = errno;
     279               0 :             ::close(pipe_fds_[0]);
     280               0 :             ::close(pipe_fds_[1]);
     281               0 :             detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
     282                 :         }
     283 HIT         270 :         if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
     284                 :         {
     285 MIS           0 :             int errn = errno;
     286               0 :             ::close(pipe_fds_[0]);
     287               0 :             ::close(pipe_fds_[1]);
     288               0 :             detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
     289                 :         }
     290 HIT         270 :         if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
     291                 :         {
     292 MIS           0 :             int errn = errno;
     293               0 :             ::close(pipe_fds_[0]);
     294               0 :             ::close(pipe_fds_[1]);
     295               0 :             detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
     296                 :         }
     297                 :     }
     298                 : 
     299 HIT         135 :     timer_svc_ = &get_timer_service(ctx, *this);
     300             135 :     timer_svc_->set_on_earliest_changed(
     301            3555 :         timer_service::callback(this, [](void* p) {
     302            3420 :             static_cast<select_scheduler*>(p)->interrupt_reactor();
     303            3420 :         }));
     304                 : 
     305                 :     // Initialize resolver service
     306             135 :     get_resolver_service(ctx, *this);
     307                 : 
     308                 :     // Initialize signal service
     309             135 :     get_signal_service(ctx, *this);
     310                 : 
     311                 :     // Push task sentinel to interleave reactor runs with handler execution
     312             135 :     completed_ops_.push(&task_op_);
     313             135 : }
     314                 : 
     315             270 : inline select_scheduler::~select_scheduler()
     316                 : {
     317             135 :     if (pipe_fds_[0] >= 0)
     318             135 :         ::close(pipe_fds_[0]);
     319             135 :     if (pipe_fds_[1] >= 0)
     320             135 :         ::close(pipe_fds_[1]);
     321             270 : }
     322                 : 
     323                 : inline void
     324             135 : select_scheduler::shutdown()
     325                 : {
     326                 :     {
     327             135 :         std::unique_lock lock(mutex_);
     328             135 :         shutdown_ = true;
     329                 : 
     330             270 :         while (auto* h = completed_ops_.pop())
     331                 :         {
     332             135 :             if (h == &task_op_)
     333             135 :                 continue;
     334 MIS           0 :             lock.unlock();
     335               0 :             h->destroy();
     336               0 :             lock.lock();
     337 HIT         135 :         }
     338             135 :     }
     339                 : 
     340             135 :     outstanding_work_.store(0, std::memory_order_release);
     341                 : 
     342             135 :     if (pipe_fds_[1] >= 0)
     343             135 :         interrupt_reactor();
     344                 : 
     345             135 :     wakeup_event_.notify_all();
     346             135 : }
     347                 : 
     348                 : inline void
     349            3767 : select_scheduler::post(std::coroutine_handle<> h) const
     350                 : {
     351                 :     struct post_handler final : scheduler_op
     352                 :     {
     353                 :         std::coroutine_handle<> h_;
     354                 : 
     355            3767 :         explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
     356                 : 
     357            7534 :         ~post_handler() override = default;
     358                 : 
     359            3767 :         void operator()() override
     360                 :         {
     361            3767 :             auto h = h_;
     362            3767 :             delete this;
     363            3767 :             h.resume();
     364            3767 :         }
     365                 : 
     366 MIS           0 :         void destroy() override
     367                 :         {
     368               0 :             delete this;
     369               0 :         }
     370                 :     };
     371                 : 
     372 HIT        3767 :     auto ph = std::make_unique<post_handler>(h);
     373            3767 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     374                 : 
     375            3767 :     std::unique_lock lock(mutex_);
     376            3767 :     completed_ops_.push(ph.release());
     377            3767 :     wake_one_thread_and_unlock(lock);
     378            3767 : }
     379                 : 
     380                 : inline void
     381          218056 : select_scheduler::post(scheduler_op* h) const
     382                 : {
     383          218056 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     384                 : 
     385          218056 :     std::unique_lock lock(mutex_);
     386          218056 :     completed_ops_.push(h);
     387          218056 :     wake_one_thread_and_unlock(lock);
     388          218056 : }
     389                 : 
     390                 : inline bool
     391             559 : select_scheduler::running_in_this_thread() const noexcept
     392                 : {
     393             559 :     for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
     394             369 :         if (c->key == this)
     395             369 :             return true;
     396             190 :     return false;
     397                 : }
     398                 : 
     399                 : inline void
     400             103 : select_scheduler::stop()
     401                 : {
     402             103 :     bool expected = false;
     403             103 :     if (stopped_.compare_exchange_strong(
     404                 :             expected, true, std::memory_order_release,
     405                 :             std::memory_order_relaxed))
     406                 :     {
     407                 :         // Wake all threads so they notice stopped_ and exit
     408                 :         {
     409             103 :             std::lock_guard lock(mutex_);
     410             103 :             wakeup_event_.notify_all();
     411             103 :         }
     412             103 :         interrupt_reactor();
     413                 :     }
     414             103 : }
     415                 : 
     416                 : inline bool
     417               3 : select_scheduler::stopped() const noexcept
     418                 : {
     419               3 :     return stopped_.load(std::memory_order_acquire);
     420                 : }
     421                 : 
     422                 : inline void
     423              37 : select_scheduler::restart()
     424                 : {
     425              37 :     stopped_.store(false, std::memory_order_release);
     426              37 : }
     427                 : 
     428                 : inline std::size_t
     429             100 : select_scheduler::run()
     430                 : {
     431             100 :     if (stopped_.load(std::memory_order_acquire))
     432 MIS           0 :         return 0;
     433                 : 
     434 HIT         200 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     435                 :     {
     436 MIS           0 :         stop();
     437               0 :         return 0;
     438                 :     }
     439                 : 
     440 HIT         100 :     select::thread_context_guard ctx(this);
     441                 : 
     442             100 :     std::size_t n = 0;
     443          228435 :     while (do_one(-1))
     444          228335 :         if (n != (std::numeric_limits<std::size_t>::max)())
     445          228335 :             ++n;
     446             100 :     return n;
     447             100 : }
     448                 : 
     449                 : inline std::size_t
     450 MIS           0 : select_scheduler::run_one()
     451                 : {
     452               0 :     if (stopped_.load(std::memory_order_acquire))
     453               0 :         return 0;
     454                 : 
     455               0 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     456                 :     {
     457               0 :         stop();
     458               0 :         return 0;
     459                 :     }
     460                 : 
     461               0 :     select::thread_context_guard ctx(this);
     462               0 :     return do_one(-1);
     463               0 : }
     464                 : 
     465                 : inline std::size_t
     466 HIT          27 : select_scheduler::wait_one(long usec)
     467                 : {
     468              27 :     if (stopped_.load(std::memory_order_acquire))
     469               3 :         return 0;
     470                 : 
     471              48 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     472                 :     {
     473 MIS           0 :         stop();
     474               0 :         return 0;
     475                 :     }
     476                 : 
     477 HIT          24 :     select::thread_context_guard ctx(this);
     478              24 :     return do_one(usec);
     479              24 : }
     480                 : 
     481                 : inline std::size_t
     482 MIS           0 : select_scheduler::poll()
     483                 : {
     484               0 :     if (stopped_.load(std::memory_order_acquire))
     485               0 :         return 0;
     486                 : 
     487               0 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     488                 :     {
     489               0 :         stop();
     490               0 :         return 0;
     491                 :     }
     492                 : 
     493               0 :     select::thread_context_guard ctx(this);
     494                 : 
     495               0 :     std::size_t n = 0;
     496               0 :     while (do_one(0))
     497               0 :         if (n != (std::numeric_limits<std::size_t>::max)())
     498               0 :             ++n;
     499               0 :     return n;
     500               0 : }
     501                 : 
     502                 : inline std::size_t
     503               0 : select_scheduler::poll_one()
     504                 : {
     505               0 :     if (stopped_.load(std::memory_order_acquire))
     506               0 :         return 0;
     507                 : 
     508               0 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     509                 :     {
     510               0 :         stop();
     511               0 :         return 0;
     512                 :     }
     513                 : 
     514               0 :     select::thread_context_guard ctx(this);
     515               0 :     return do_one(0);
     516               0 : }
     517                 : 
     518                 : inline void
     519 HIT        6699 : select_scheduler::register_fd(int fd, select_op* op, int events) const
     520                 : {
     521                 :     // Validate fd is within select() limits
     522            6699 :     if (fd < 0 || fd >= FD_SETSIZE)
     523 MIS           0 :         detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
     524                 : 
     525                 :     {
     526 HIT        6699 :         std::lock_guard lock(mutex_);
     527                 : 
     528            6699 :         auto& state = registered_fds_[fd];
     529            6699 :         if (events & event_read)
     530            3491 :             state.read_op = op;
     531            6699 :         if (events & event_write)
     532            3208 :             state.write_op = op;
     533                 : 
     534            6699 :         if (fd > max_fd_)
     535             232 :             max_fd_ = fd;
     536            6699 :     }
     537                 : 
     538                 :     // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
     539                 :     // with the newly registered fd.
     540            6699 :     interrupt_reactor();
     541            6699 : }
     542                 : 
     543                 : inline void
     544            6633 : select_scheduler::deregister_fd(int fd, int events) const
     545                 : {
     546            6633 :     std::lock_guard lock(mutex_);
     547                 : 
     548            6633 :     auto it = registered_fds_.find(fd);
     549            6633 :     if (it == registered_fds_.end())
     550            6470 :         return;
     551                 : 
     552             163 :     if (events & event_read)
     553             163 :         it->second.read_op = nullptr;
     554             163 :     if (events & event_write)
     555 MIS           0 :         it->second.write_op = nullptr;
     556                 : 
     557                 :     // Remove entry if both are null
     558 HIT         163 :     if (!it->second.read_op && !it->second.write_op)
     559                 :     {
     560             163 :         registered_fds_.erase(it);
     561                 : 
     562                 :         // Recalculate max_fd_ if needed
     563             163 :         if (fd == max_fd_)
     564                 :         {
     565             162 :             max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
     566             162 :             for (auto& [registered_fd, state] : registered_fds_)
     567                 :             {
     568 MIS           0 :                 if (registered_fd > max_fd_)
     569               0 :                     max_fd_ = registered_fd;
     570                 :             }
     571                 :         }
     572                 :     }
     573 HIT        6633 : }
     574                 : 
     575                 : inline void
     576           10701 : select_scheduler::work_started() noexcept
     577                 : {
     578           10701 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     579           10701 : }
     580                 : 
     581                 : inline void
     582          232524 : select_scheduler::work_finished() noexcept
     583                 : {
     584          465048 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     585             103 :         stop();
     586          232524 : }
     587                 : 
     588                 : inline void
     589           13768 : select_scheduler::interrupt_reactor() const
     590                 : {
     591           13768 :     char byte               = 1;
     592           13768 :     [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
     593           13768 : }
     594                 : 
     595                 : inline void
     596          221823 : select_scheduler::wake_one_thread_and_unlock(
     597                 :     std::unique_lock<std::mutex>& lock) const
     598                 : {
     599          221823 :     if (idle_thread_count_ > 0)
     600                 :     {
     601                 :         // Idle worker exists - wake it via condvar
     602 MIS           0 :         wakeup_event_.notify_one();
     603               0 :         lock.unlock();
     604                 :     }
     605 HIT      221823 :     else if (reactor_running_ && !reactor_interrupted_)
     606                 :     {
     607                 :         // No idle workers but reactor is running - interrupt it
     608            3411 :         reactor_interrupted_ = true;
     609            3411 :         lock.unlock();
     610            3411 :         interrupt_reactor();
     611                 :     }
     612                 :     else
     613                 :     {
     614                 :         // No one to wake
     615          218412 :         lock.unlock();
     616                 :     }
     617          221823 : }
     618                 : 
     619                 : inline long
     620            9789 : select_scheduler::calculate_timeout(long requested_timeout_us) const
     621                 : {
     622            9789 :     if (requested_timeout_us == 0)
     623 MIS           0 :         return 0;
     624                 : 
     625 HIT        9789 :     auto nearest = timer_svc_->nearest_expiry();
     626            9789 :     if (nearest == timer_service::time_point::max())
     627              37 :         return requested_timeout_us;
     628                 : 
     629            9752 :     auto now = std::chrono::steady_clock::now();
     630            9752 :     if (nearest <= now)
     631             160 :         return 0;
     632                 : 
     633                 :     auto timer_timeout_us =
     634            9592 :         std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
     635            9592 :             .count();
     636                 : 
     637                 :     // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
     638            9592 :     constexpr auto long_max =
     639                 :         static_cast<long long>((std::numeric_limits<long>::max)());
     640                 :     auto capped_timer_us =
     641            9592 :         (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
     642            9592 :                               static_cast<long long>(0)),
     643            9592 :                    long_max);
     644                 : 
     645            9592 :     if (requested_timeout_us < 0)
     646            9592 :         return static_cast<long>(capped_timer_us);
     647                 : 
     648                 :     // requested_timeout_us is already long, so min() result fits in long
     649                 :     return static_cast<long>(
     650 MIS           0 :         (std::min)(static_cast<long long>(requested_timeout_us),
     651               0 :                    capped_timer_us));
     652                 : }
     653                 : 
     654                 : inline void
     655 HIT      124876 : select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
     656                 : {
     657                 :     // Calculate timeout considering timers, use 0 if interrupted
     658                 :     long effective_timeout_us =
     659          124876 :         reactor_interrupted_ ? 0 : calculate_timeout(-1);
     660                 : 
     661                 :     // Build fd_sets from registered_fds_
     662                 :     fd_set read_fds, write_fds, except_fds;
     663         2122892 :     FD_ZERO(&read_fds);
     664         2122892 :     FD_ZERO(&write_fds);
     665         2122892 :     FD_ZERO(&except_fds);
     666                 : 
     667                 :     // Always include the interrupt pipe
     668          124876 :     FD_SET(pipe_fds_[0], &read_fds);
     669          124876 :     int nfds = pipe_fds_[0];
     670                 : 
     671                 :     // Add registered fds
     672          140812 :     for (auto& [fd, state] : registered_fds_)
     673                 :     {
     674           15936 :         if (state.read_op)
     675           12728 :             FD_SET(fd, &read_fds);
     676           15936 :         if (state.write_op)
     677                 :         {
     678            3208 :             FD_SET(fd, &write_fds);
     679                 :             // Also monitor for errors on connect operations
     680            3208 :             FD_SET(fd, &except_fds);
     681                 :         }
     682           15936 :         if (fd > nfds)
     683           12731 :             nfds = fd;
     684                 :     }
     685                 : 
     686                 :     // Convert timeout to timeval
     687                 :     struct timeval tv;
     688          124876 :     struct timeval* tv_ptr = nullptr;
     689          124876 :     if (effective_timeout_us >= 0)
     690                 :     {
     691          124839 :         tv.tv_sec  = effective_timeout_us / 1000000;
     692          124839 :         tv.tv_usec = effective_timeout_us % 1000000;
     693          124839 :         tv_ptr     = &tv;
     694                 :     }
     695                 : 
     696          124876 :     lock.unlock();
     697                 : 
     698          124876 :     int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
     699          124876 :     int saved_errno = errno;
     700                 : 
     701                 :     // Process timers outside the lock
     702          124876 :     timer_svc_->process_expired();
     703                 : 
     704          124876 :     if (ready < 0 && saved_errno != EINTR)
     705 MIS           0 :         detail::throw_system_error(make_err(saved_errno), "select");
     706                 : 
     707                 :     // Re-acquire lock before modifying completed_ops_
     708 HIT      124876 :     lock.lock();
     709                 : 
     710                 :     // Drain the interrupt pipe if readable
     711          124876 :     if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
     712                 :     {
     713                 :         char buf[256];
     714           20306 :         while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
     715                 :         {
     716                 :         }
     717                 :     }
     718                 : 
     719                 :     // Process I/O completions
     720          124876 :     int completions_queued = 0;
     721          124876 :     if (ready > 0)
     722                 :     {
     723                 :         // Iterate over registered fds (copy keys to avoid iterator invalidation)
     724           10153 :         std::vector<int> fds_to_check;
     725           10153 :         fds_to_check.reserve(registered_fds_.size());
     726           22918 :         for (auto& [fd, state] : registered_fds_)
     727           12765 :             fds_to_check.push_back(fd);
     728                 : 
     729           22918 :         for (int fd : fds_to_check)
     730                 :         {
     731           12765 :             auto it = registered_fds_.find(fd);
     732           12765 :             if (it == registered_fds_.end())
     733 MIS           0 :                 continue;
     734                 : 
     735 HIT       12765 :             auto& state = it->second;
     736                 : 
     737                 :             // Check for errors (especially for connect operations)
     738           12765 :             bool has_error = FD_ISSET(fd, &except_fds);
     739                 : 
     740                 :             // Process read readiness
     741           12765 :             if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
     742                 :             {
     743            3328 :                 auto* op = state.read_op;
     744                 :                 // Claim the op by exchanging to unregistered. Both registering and
     745                 :                 // registered states mean the op is ours to complete.
     746            3328 :                 auto prev = op->registered.exchange(
     747                 :                     select_registration_state::unregistered,
     748                 :                     std::memory_order_acq_rel);
     749            3328 :                 if (prev != select_registration_state::unregistered)
     750                 :                 {
     751            3328 :                     state.read_op = nullptr;
     752                 : 
     753            3328 :                     if (has_error)
     754                 :                     {
     755 MIS           0 :                         int errn      = 0;
     756               0 :                         socklen_t len = sizeof(errn);
     757               0 :                         if (::getsockopt(
     758               0 :                                 fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
     759               0 :                             errn = errno;
     760               0 :                         if (errn == 0)
     761               0 :                             errn = EIO;
     762               0 :                         op->complete(errn, 0);
     763                 :                     }
     764                 :                     else
     765                 :                     {
     766 HIT        3328 :                         op->perform_io();
     767                 :                     }
     768                 : 
     769            3328 :                     completed_ops_.push(op);
     770            3328 :                     ++completions_queued;
     771                 :                 }
     772                 :             }
     773                 : 
     774                 :             // Process write readiness
     775           12765 :             if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
     776                 :             {
     777            3208 :                 auto* op = state.write_op;
     778                 :                 // Claim the op by exchanging to unregistered. Both registering and
     779                 :                 // registered states mean the op is ours to complete.
     780            3208 :                 auto prev = op->registered.exchange(
     781                 :                     select_registration_state::unregistered,
     782                 :                     std::memory_order_acq_rel);
     783            3208 :                 if (prev != select_registration_state::unregistered)
     784                 :                 {
     785            3208 :                     state.write_op = nullptr;
     786                 : 
     787            3208 :                     if (has_error)
     788                 :                     {
     789 MIS           0 :                         int errn      = 0;
     790               0 :                         socklen_t len = sizeof(errn);
     791               0 :                         if (::getsockopt(
     792               0 :                                 fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
     793               0 :                             errn = errno;
     794               0 :                         if (errn == 0)
     795               0 :                             errn = EIO;
     796               0 :                         op->complete(errn, 0);
     797                 :                     }
     798                 :                     else
     799                 :                     {
     800 HIT        3208 :                         op->perform_io();
     801                 :                     }
     802                 : 
     803            3208 :                     completed_ops_.push(op);
     804            3208 :                     ++completions_queued;
     805                 :                 }
     806                 :             }
     807                 : 
     808                 :             // Clean up empty entries
     809           12765 :             if (!state.read_op && !state.write_op)
     810            6536 :                 registered_fds_.erase(it);
     811                 :         }
     812           10153 :     }
     813                 : 
     814          124876 :     if (completions_queued > 0)
     815                 :     {
     816            3331 :         if (completions_queued == 1)
     817             126 :             wakeup_event_.notify_one();
     818                 :         else
     819            3205 :             wakeup_event_.notify_all();
     820                 :     }
     821          124876 : }
     822                 : 
     823                 : inline std::size_t
     824          228459 : select_scheduler::do_one(long timeout_us)
     825                 : {
     826          228459 :     std::unique_lock lock(mutex_);
     827                 : 
     828                 :     for (;;)
     829                 :     {
     830          353335 :         if (stopped_.load(std::memory_order_acquire))
     831             100 :             return 0;
     832                 : 
     833          353235 :         scheduler_op* op = completed_ops_.pop();
     834                 : 
     835          353235 :         if (op == &task_op_)
     836                 :         {
     837          124876 :             bool more_handlers = !completed_ops_.empty();
     838                 : 
     839          124876 :             if (!more_handlers)
     840                 :             {
     841           19578 :                 if (outstanding_work_.load(std::memory_order_acquire) == 0)
     842                 :                 {
     843 MIS           0 :                     completed_ops_.push(&task_op_);
     844               0 :                     return 0;
     845                 :                 }
     846 HIT        9789 :                 if (timeout_us == 0)
     847                 :                 {
     848 MIS           0 :                     completed_ops_.push(&task_op_);
     849               0 :                     return 0;
     850                 :                 }
     851                 :             }
     852                 : 
     853 HIT      124876 :             reactor_interrupted_ = more_handlers || timeout_us == 0;
     854          124876 :             reactor_running_     = true;
     855                 : 
     856          124876 :             if (more_handlers && idle_thread_count_ > 0)
     857 MIS           0 :                 wakeup_event_.notify_one();
     858                 : 
     859 HIT      124876 :             run_reactor(lock);
     860                 : 
     861          124876 :             reactor_running_ = false;
     862          124876 :             completed_ops_.push(&task_op_);
     863          124876 :             continue;
     864          124876 :         }
     865                 : 
     866          228359 :         if (op != nullptr)
     867                 :         {
     868          228359 :             lock.unlock();
     869          228359 :             select::work_guard g{this};
     870          228359 :             (*op)();
     871          228359 :             return 1;
     872          228359 :         }
     873                 : 
     874 MIS           0 :         if (outstanding_work_.load(std::memory_order_acquire) == 0)
     875               0 :             return 0;
     876                 : 
     877               0 :         if (timeout_us == 0)
     878               0 :             return 0;
     879                 : 
     880               0 :         ++idle_thread_count_;
     881               0 :         if (timeout_us < 0)
     882               0 :             wakeup_event_.wait(lock);
     883                 :         else
     884               0 :             wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
     885               0 :         --idle_thread_count_;
     886 HIT      124876 :     }
     887          228459 : }
     888                 : 
     889                 : } // namespace boost::corosio::detail
     890                 : 
     891                 : #endif // BOOST_COROSIO_HAS_SELECT
     892                 : 
     893                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
        

Generated by: LCOV version 2.3