TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/native_scheduler.hpp>
21 : #include <boost/corosio/detail/scheduler_op.hpp>
22 :
23 : #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24 : #include <boost/corosio/detail/timer_service.hpp>
25 : #include <boost/corosio/detail/make_err.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 :
29 : #include <boost/corosio/detail/except.hpp>
30 : #include <boost/corosio/detail/thread_local_ptr.hpp>
31 :
32 : #include <atomic>
33 : #include <chrono>
34 : #include <condition_variable>
35 : #include <cstddef>
36 : #include <cstdint>
37 : #include <limits>
38 : #include <mutex>
39 : #include <utility>
40 :
41 : #include <errno.h>
42 : #include <fcntl.h>
43 : #include <sys/epoll.h>
44 : #include <sys/eventfd.h>
45 : #include <sys/socket.h>
46 : #include <sys/timerfd.h>
47 : #include <unistd.h>
48 :
49 : namespace boost::corosio::detail {
50 :
51 : struct epoll_op;
52 : struct descriptor_state;
53 : namespace epoll {
54 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
55 : } // namespace epoll
56 :
57 : /** Linux scheduler using epoll for I/O multiplexing.
58 :
59 : This scheduler implements the scheduler interface using Linux epoll
60 : for efficient I/O event notification. It uses a single reactor model
61 : where one thread runs epoll_wait while other threads
62 : wait on a condition variable for handler work. This design provides:
63 :
64 : - Handler parallelism: N posted handlers can execute on N threads
65 : - No thundering herd: condition_variable wakes exactly one thread
66 : - IOCP parity: Behavior matches Windows I/O completion port semantics
67 :
68 : When threads call run(), they first try to execute queued handlers.
69 : If the queue is empty and no reactor is running, one thread becomes
70 : the reactor and runs epoll_wait. Other threads wait on a condition
71 : variable until handlers are available.
72 :
73 : @par Thread Safety
74 : All public member functions are thread-safe.
75 : */
76 : class BOOST_COROSIO_DECL epoll_scheduler final
77 : : public native_scheduler
78 : , public capy::execution_context::service
79 : {
80 : public:
81 : using key_type = scheduler;
82 :
83 : /** Construct the scheduler.
84 :
85 : Creates an epoll instance, eventfd for reactor interruption,
86 : and timerfd for kernel-managed timer expiry.
87 :
88 : @param ctx Reference to the owning execution_context.
89 : @param concurrency_hint Hint for expected thread count (unused).
90 : */
91 : epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92 :
93 : /// Destroy the scheduler.
94 : ~epoll_scheduler() override;
95 :
96 : epoll_scheduler(epoll_scheduler const&) = delete;
97 : epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98 :
99 : void shutdown() override;
100 : void post(std::coroutine_handle<> h) const override;
101 : void post(scheduler_op* h) const override;
102 : bool running_in_this_thread() const noexcept override;
103 : void stop() override;
104 : bool stopped() const noexcept override;
105 : void restart() override;
106 : std::size_t run() override;
107 : std::size_t run_one() override;
108 : std::size_t wait_one(long usec) override;
109 : std::size_t poll() override;
110 : std::size_t poll_one() override;
111 :
112 : /** Return the epoll file descriptor.
113 :
114 : Used by socket services to register file descriptors
115 : for I/O event notification.
116 :
117 : @return The epoll file descriptor.
118 : */
119 : int epoll_fd() const noexcept
120 : {
121 : return epoll_fd_;
122 : }
123 :
124 : /** Reset the thread's inline completion budget.
125 :
126 : Called at the start of each posted completion handler to
127 : grant a fresh budget for speculative inline completions.
128 : */
129 : void reset_inline_budget() const noexcept;
130 :
131 : /** Consume one unit of inline budget if available.
132 :
133 : @return True if budget was available and consumed.
134 : */
135 : bool try_consume_inline_budget() const noexcept;
136 :
137 : /** Register a descriptor for persistent monitoring.
138 :
139 : The fd is registered once and stays registered until explicitly
140 : deregistered. Events are dispatched via descriptor_state which
141 : tracks pending read/write/connect operations.
142 :
143 : @param fd The file descriptor to register.
144 : @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145 : */
146 : void register_descriptor(int fd, descriptor_state* desc) const;
147 :
148 : /** Deregister a persistently registered descriptor.
149 :
150 : @param fd The file descriptor to deregister.
151 : */
152 : void deregister_descriptor(int fd) const;
153 :
154 : void work_started() noexcept override;
155 : void work_finished() noexcept override;
156 :
157 : /** Offset a forthcoming work_finished from work_cleanup.
158 :
159 : Called by descriptor_state when all I/O returned EAGAIN and no
160 : handler will be executed. Must be called from a scheduler thread.
161 : */
162 : void compensating_work_started() const noexcept;
163 :
164 : /** Drain work from thread context's private queue to global queue.
165 :
166 : Called by thread_context_guard destructor when a thread exits run().
167 : Transfers pending work to the global queue under mutex protection.
168 :
169 : @param queue The private queue to drain.
170 : @param count Item count for wakeup decisions (wakes other threads if positive).
171 : */
172 : void drain_thread_queue(op_queue& queue, long count) const;
173 :
174 : /** Post completed operations for deferred invocation.
175 :
176 : If called from a thread running this scheduler, operations go to
177 : the thread's private queue (fast path). Otherwise, operations are
178 : added to the global queue under mutex and a waiter is signaled.
179 :
180 : @par Preconditions
181 : work_started() must have been called for each operation.
182 :
183 : @param ops Queue of operations to post.
184 : */
185 : void post_deferred_completions(op_queue& ops) const;
186 :
187 : private:
188 : struct work_cleanup
189 : {
190 : epoll_scheduler* scheduler;
191 : std::unique_lock<std::mutex>* lock;
192 : epoll::scheduler_context* ctx;
193 : ~work_cleanup();
194 : };
195 :
196 : struct task_cleanup
197 : {
198 : epoll_scheduler const* scheduler;
199 : std::unique_lock<std::mutex>* lock;
200 : epoll::scheduler_context* ctx;
201 : ~task_cleanup();
202 : };
203 :
204 : std::size_t do_one(
205 : std::unique_lock<std::mutex>& lock,
206 : long timeout_us,
207 : epoll::scheduler_context* ctx);
208 : void
209 : run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
210 : void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
211 : void interrupt_reactor() const;
212 : void update_timerfd() const;
213 :
214 : /** Set the signaled state and wake all waiting threads.
215 :
216 : @par Preconditions
217 : Mutex must be held.
218 :
219 : @param lock The held mutex lock.
220 : */
221 : void signal_all(std::unique_lock<std::mutex>& lock) const;
222 :
223 : /** Set the signaled state and wake one waiter if any exist.
224 :
225 : Only unlocks and signals if at least one thread is waiting.
226 : Use this when the caller needs to perform a fallback action
227 : (such as interrupting the reactor) when no waiters exist.
228 :
229 : @par Preconditions
230 : Mutex must be held.
231 :
232 : @param lock The held mutex lock.
233 :
234 : @return `true` if unlocked and signaled, `false` if lock still held.
235 : */
236 : bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
237 :
238 : /** Set the signaled state, unlock, and wake one waiter if any exist.
239 :
240 : Always unlocks the mutex. Use this when the caller will release
241 : the lock regardless of whether a waiter exists.
242 :
243 : @par Preconditions
244 : Mutex must be held.
245 :
246 : @param lock The held mutex lock.
247 :
248 : @return `true` if a waiter was signaled, `false` otherwise.
249 : */
250 : bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
251 :
252 : /** Clear the signaled state before waiting.
253 :
254 : @par Preconditions
255 : Mutex must be held.
256 : */
257 : void clear_signal() const;
258 :
259 : /** Block until the signaled state is set.
260 :
261 : Returns immediately if already signaled (fast-path). Otherwise
262 : increments the waiter count, waits on the condition variable,
263 : and decrements the waiter count upon waking.
264 :
265 : @par Preconditions
266 : Mutex must be held.
267 :
268 : @param lock The held mutex lock.
269 : */
270 : void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
271 :
272 : /** Block until signaled or timeout expires.
273 :
274 : @par Preconditions
275 : Mutex must be held.
276 :
277 : @param lock The held mutex lock.
278 : @param timeout_us Maximum time to wait in microseconds.
279 : */
280 : void wait_for_signal_for(
281 : std::unique_lock<std::mutex>& lock, long timeout_us) const;
282 :
283 : int epoll_fd_;
284 : int event_fd_; // for interrupting reactor
285 : int timer_fd_; // timerfd for kernel-managed timer expiry
286 : mutable std::mutex mutex_;
287 : mutable std::condition_variable cond_;
288 : mutable op_queue completed_ops_;
289 : mutable std::atomic<long> outstanding_work_;
290 : bool stopped_;
291 : bool shutdown_;
292 :
293 : // True while a thread is blocked in epoll_wait. Used by
294 : // wake_one_thread_and_unlock and work_finished to know when
295 : // an eventfd interrupt is needed instead of a condvar signal.
296 : mutable std::atomic<bool> task_running_{false};
297 :
298 : // True when the reactor has been told to do a non-blocking poll
299 : // (more handlers queued or poll mode). Prevents redundant eventfd
300 : // writes and controls the epoll_wait timeout.
301 : mutable bool task_interrupted_ = false;
302 :
303 : // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
304 : mutable std::size_t state_ = 0;
305 :
306 : // Edge-triggered eventfd state
307 : mutable std::atomic<bool> eventfd_armed_{false};
308 :
309 : // Set when the earliest timer changes; flushed before epoll_wait
310 : // blocks. Avoids timerfd_settime syscalls for timers that are
311 : // scheduled then cancelled without being waited on.
312 : mutable std::atomic<bool> timerfd_stale_{false};
313 :
314 : // Sentinel operation for interleaving reactor runs with handler execution.
315 : // Ensures the reactor runs periodically even when handlers are continuously
316 : // posted, preventing starvation of I/O events, timers, and signals.
317 : struct task_op final : scheduler_op
318 : {
319 MIS 0 : void operator()() override {}
320 0 : void destroy() override {}
321 : };
322 : task_op task_op_;
323 : };
324 :
325 : //--------------------------------------------------------------------------
326 : //
327 : // Implementation
328 : //
329 : //--------------------------------------------------------------------------
330 :
331 : /*
332 : epoll Scheduler - Single Reactor Model
333 : ======================================
334 :
335 : This scheduler uses a thread coordination strategy to provide handler
336 : parallelism and avoid the thundering herd problem.
337 : Instead of all threads blocking on epoll_wait(), one thread becomes the
338 : "reactor" while others wait on a condition variable for handler work.
339 :
340 : Thread Model
341 : ------------
342 : - ONE thread runs epoll_wait() at a time (the reactor thread)
343 : - OTHER threads wait on cond_ (condition variable) for handlers
344 : - When work is posted, exactly one waiting thread wakes via notify_one()
345 : - This matches Windows IOCP semantics where N posted items wake N threads
346 :
347 : Event Loop Structure (do_one)
348 : -----------------------------
349 : 1. Lock mutex, try to pop handler from queue
350 : 2. If got handler: execute it (unlocked), return
351 : 3. If queue empty and no reactor running: become reactor
352 : - Run epoll_wait (unlocked), queue I/O completions, loop back
353 : 4. If queue empty and reactor running: wait on condvar for work
354 :
355 : The task_running_ flag ensures only one thread owns epoll_wait().
356 : After the reactor queues I/O completions, it loops back to try getting
357 : a handler, giving priority to handler execution over more I/O polling.
358 :
359 : Signaling State (state_)
360 : ------------------------
361 : The state_ variable encodes two pieces of information:
362 : - Bit 0: signaled flag (1 = signaled, persists until cleared)
363 : - Upper bits: waiter count (each waiter adds 2 before blocking)
364 :
365 : This allows efficient coordination:
366 : - Signalers only call notify when waiters exist (state_ > 1)
367 : - Waiters check if already signaled before blocking (fast-path)
368 :
369 : Wake Coordination (wake_one_thread_and_unlock)
370 : ----------------------------------------------
371 : When posting work:
372 : - If waiters exist (state_ > 1): signal and notify_one()
373 : - Else if reactor running: interrupt via eventfd write
374 : - Else: no-op (thread will find work when it checks queue)
375 :
376 : This avoids waking threads unnecessarily. With cascading wakes,
377 : each handler execution wakes at most one additional thread if
378 : more work exists in the queue.
379 :
380 : Work Counting
381 : -------------
382 : outstanding_work_ tracks pending operations. When it hits zero, run()
383 : returns. Each operation increments on start, decrements on completion.
384 :
385 : Timer Integration
386 : -----------------
387 : Timers are handled by timer_service. The reactor adjusts epoll_wait
388 : timeout to wake for the nearest timer expiry. When a new timer is
389 : scheduled earlier than current, timer_service calls interrupt_reactor()
390 : to re-evaluate the timeout.
391 : */
392 :
393 : namespace epoll {
394 :
395 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
396 : {
397 : epoll_scheduler const* key;
398 : scheduler_context* next;
399 : op_queue private_queue;
400 : long private_outstanding_work;
401 : int inline_budget;
402 : int inline_budget_max;
403 : bool unassisted;
404 :
405 HIT 204 : scheduler_context(epoll_scheduler const* k, scheduler_context* n)
406 204 : : key(k)
407 204 : , next(n)
408 204 : , private_outstanding_work(0)
409 204 : , inline_budget(0)
410 204 : , inline_budget_max(2)
411 204 : , unassisted(false)
412 : {
413 204 : }
414 : };
415 :
416 : inline thread_local_ptr<scheduler_context> context_stack;
417 :
418 : struct thread_context_guard
419 : {
420 : scheduler_context frame_;
421 :
422 204 : explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
423 204 : : frame_(ctx, context_stack.get())
424 : {
425 204 : context_stack.set(&frame_);
426 204 : }
427 :
428 204 : ~thread_context_guard() noexcept
429 : {
430 204 : if (!frame_.private_queue.empty())
431 MIS 0 : frame_.key->drain_thread_queue(
432 0 : frame_.private_queue, frame_.private_outstanding_work);
433 HIT 204 : context_stack.set(frame_.next);
434 204 : }
435 : };
436 :
437 : inline scheduler_context*
438 560965 : find_context(epoll_scheduler const* self) noexcept
439 : {
440 560965 : for (auto* c = context_stack.get(); c != nullptr; c = c->next)
441 559280 : if (c->key == self)
442 559280 : return c;
443 1685 : return nullptr;
444 : }
445 :
446 : } // namespace epoll
447 :
448 : inline void
449 79448 : epoll_scheduler::reset_inline_budget() const noexcept
450 : {
451 79448 : if (auto* ctx = epoll::find_context(this))
452 : {
453 : // Cap when no other thread absorbed queued work. A moderate
454 : // cap (4) amortizes scheduling for small buffers while avoiding
455 : // bursty I/O that fills socket buffers and stalls large transfers.
456 79448 : if (ctx->unassisted)
457 : {
458 79448 : ctx->inline_budget_max = 4;
459 79448 : ctx->inline_budget = 4;
460 79448 : return;
461 : }
462 : // Ramp up when previous cycle fully consumed budget.
463 : // Reset on partial consumption (EAGAIN hit or peer got scheduled).
464 MIS 0 : if (ctx->inline_budget == 0)
465 0 : ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
466 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
467 0 : ctx->inline_budget_max = 2;
468 0 : ctx->inline_budget = ctx->inline_budget_max;
469 : }
470 : }
471 :
472 : inline bool
473 HIT 351296 : epoll_scheduler::try_consume_inline_budget() const noexcept
474 : {
475 351296 : if (auto* ctx = epoll::find_context(this))
476 : {
477 351296 : if (ctx->inline_budget > 0)
478 : {
479 281112 : --ctx->inline_budget;
480 281112 : return true;
481 : }
482 : }
483 70184 : return false;
484 : }
485 :
486 : inline void
487 57518 : descriptor_state::operator()()
488 : {
489 57518 : is_enqueued_.store(false, std::memory_order_relaxed);
490 :
491 : // Take ownership of impl ref set by close_socket() to prevent
492 : // the owning impl from being freed while we're executing
493 57518 : auto prevent_impl_destruction = std::move(impl_ref_);
494 :
495 57518 : std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
496 57518 : if (ev == 0)
497 : {
498 MIS 0 : scheduler_->compensating_work_started();
499 0 : return;
500 : }
501 :
502 HIT 57518 : op_queue local_ops;
503 :
504 57518 : int err = 0;
505 57518 : if (ev & EPOLLERR)
506 : {
507 MIS 0 : socklen_t len = sizeof(err);
508 0 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
509 0 : err = errno;
510 0 : if (err == 0)
511 0 : err = EIO;
512 : }
513 :
514 : {
515 HIT 57518 : std::lock_guard lock(mutex);
516 57518 : if (ev & EPOLLIN)
517 : {
518 17754 : if (read_op)
519 : {
520 4528 : auto* rd = read_op;
521 4528 : if (err)
522 MIS 0 : rd->complete(err, 0);
523 : else
524 HIT 4528 : rd->perform_io();
525 :
526 4528 : if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
527 : {
528 MIS 0 : rd->errn = 0;
529 : }
530 : else
531 : {
532 HIT 4528 : read_op = nullptr;
533 4528 : local_ops.push(rd);
534 : }
535 : }
536 : else
537 : {
538 13226 : read_ready = true;
539 : }
540 : }
541 57518 : if (ev & EPOLLOUT)
542 : {
543 52994 : bool had_write_op = (connect_op || write_op);
544 52994 : if (connect_op)
545 : {
546 4527 : auto* cn = connect_op;
547 4527 : if (err)
548 MIS 0 : cn->complete(err, 0);
549 : else
550 HIT 4527 : cn->perform_io();
551 4527 : connect_op = nullptr;
552 4527 : local_ops.push(cn);
553 : }
554 52994 : if (write_op)
555 : {
556 MIS 0 : auto* wr = write_op;
557 0 : if (err)
558 0 : wr->complete(err, 0);
559 : else
560 0 : wr->perform_io();
561 :
562 0 : if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
563 : {
564 0 : wr->errn = 0;
565 : }
566 : else
567 : {
568 0 : write_op = nullptr;
569 0 : local_ops.push(wr);
570 : }
571 : }
572 HIT 52994 : if (!had_write_op)
573 48467 : write_ready = true;
574 : }
575 57518 : if (err)
576 : {
577 MIS 0 : if (read_op)
578 : {
579 0 : read_op->complete(err, 0);
580 0 : local_ops.push(std::exchange(read_op, nullptr));
581 : }
582 0 : if (write_op)
583 : {
584 0 : write_op->complete(err, 0);
585 0 : local_ops.push(std::exchange(write_op, nullptr));
586 : }
587 0 : if (connect_op)
588 : {
589 0 : connect_op->complete(err, 0);
590 0 : local_ops.push(std::exchange(connect_op, nullptr));
591 : }
592 : }
593 HIT 57518 : }
594 :
595 : // Execute first handler inline — the scheduler's work_cleanup
596 : // accounts for this as the "consumed" work item
597 57518 : scheduler_op* first = local_ops.pop();
598 57518 : if (first)
599 : {
600 9055 : scheduler_->post_deferred_completions(local_ops);
601 9055 : (*first)();
602 : }
603 : else
604 : {
605 48463 : scheduler_->compensating_work_started();
606 : }
607 57518 : }
608 :
609 205 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
610 205 : : epoll_fd_(-1)
611 205 : , event_fd_(-1)
612 205 : , timer_fd_(-1)
613 205 : , outstanding_work_(0)
614 205 : , stopped_(false)
615 205 : , shutdown_(false)
616 205 : , task_running_{false}
617 205 : , task_interrupted_(false)
618 410 : , state_(0)
619 : {
620 205 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
621 205 : if (epoll_fd_ < 0)
622 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
623 :
624 HIT 205 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
625 205 : if (event_fd_ < 0)
626 : {
627 MIS 0 : int errn = errno;
628 0 : ::close(epoll_fd_);
629 0 : detail::throw_system_error(make_err(errn), "eventfd");
630 : }
631 :
632 HIT 205 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
633 205 : if (timer_fd_ < 0)
634 : {
635 MIS 0 : int errn = errno;
636 0 : ::close(event_fd_);
637 0 : ::close(epoll_fd_);
638 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
639 : }
640 :
641 HIT 205 : epoll_event ev{};
642 205 : ev.events = EPOLLIN | EPOLLET;
643 205 : ev.data.ptr = nullptr;
644 205 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
645 : {
646 MIS 0 : int errn = errno;
647 0 : ::close(timer_fd_);
648 0 : ::close(event_fd_);
649 0 : ::close(epoll_fd_);
650 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
651 : }
652 :
653 HIT 205 : epoll_event timer_ev{};
654 205 : timer_ev.events = EPOLLIN | EPOLLERR;
655 205 : timer_ev.data.ptr = &timer_fd_;
656 205 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
657 : {
658 MIS 0 : int errn = errno;
659 0 : ::close(timer_fd_);
660 0 : ::close(event_fd_);
661 0 : ::close(epoll_fd_);
662 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
663 : }
664 :
665 HIT 205 : timer_svc_ = &get_timer_service(ctx, *this);
666 205 : timer_svc_->set_on_earliest_changed(
667 4941 : timer_service::callback(this, [](void* p) {
668 4736 : auto* self = static_cast<epoll_scheduler*>(p);
669 4736 : self->timerfd_stale_.store(true, std::memory_order_release);
670 4736 : if (self->task_running_.load(std::memory_order_acquire))
671 MIS 0 : self->interrupt_reactor();
672 HIT 4736 : }));
673 :
674 : // Initialize resolver service
675 205 : get_resolver_service(ctx, *this);
676 :
677 : // Initialize signal service
678 205 : get_signal_service(ctx, *this);
679 :
680 : // Push task sentinel to interleave reactor runs with handler execution
681 205 : completed_ops_.push(&task_op_);
682 205 : }
683 :
684 410 : inline epoll_scheduler::~epoll_scheduler()
685 : {
686 205 : if (timer_fd_ >= 0)
687 205 : ::close(timer_fd_);
688 205 : if (event_fd_ >= 0)
689 205 : ::close(event_fd_);
690 205 : if (epoll_fd_ >= 0)
691 205 : ::close(epoll_fd_);
692 410 : }
693 :
694 : inline void
695 205 : epoll_scheduler::shutdown()
696 : {
697 : {
698 205 : std::unique_lock lock(mutex_);
699 205 : shutdown_ = true;
700 :
701 439 : while (auto* h = completed_ops_.pop())
702 : {
703 234 : if (h == &task_op_)
704 205 : continue;
705 29 : lock.unlock();
706 29 : h->destroy();
707 29 : lock.lock();
708 234 : }
709 :
710 205 : signal_all(lock);
711 205 : }
712 :
713 205 : outstanding_work_.store(0, std::memory_order_release);
714 :
715 205 : if (event_fd_ >= 0)
716 205 : interrupt_reactor();
717 205 : }
718 :
719 : inline void
720 6578 : epoll_scheduler::post(std::coroutine_handle<> h) const
721 : {
722 : struct post_handler final : scheduler_op
723 : {
724 : std::coroutine_handle<> h_;
725 :
726 6578 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
727 :
728 13156 : ~post_handler() override = default;
729 :
730 6578 : void operator()() override
731 : {
732 6578 : auto h = h_;
733 6578 : delete this;
734 6578 : h.resume();
735 6578 : }
736 :
737 MIS 0 : void destroy() override
738 : {
739 0 : delete this;
740 0 : }
741 : };
742 :
743 HIT 6578 : auto ph = std::make_unique<post_handler>(h);
744 :
745 : // Fast path: same thread posts to private queue
746 : // Only count locally; work_cleanup batches to global counter
747 6578 : if (auto* ctx = epoll::find_context(this))
748 : {
749 4919 : ++ctx->private_outstanding_work;
750 4919 : ctx->private_queue.push(ph.release());
751 4919 : return;
752 : }
753 :
754 : // Slow path: cross-thread post requires mutex
755 1659 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
756 :
757 1659 : std::unique_lock lock(mutex_);
758 1659 : completed_ops_.push(ph.release());
759 1659 : wake_one_thread_and_unlock(lock);
760 6578 : }
761 :
762 : inline void
763 75180 : epoll_scheduler::post(scheduler_op* h) const
764 : {
765 : // Fast path: same thread posts to private queue
766 : // Only count locally; work_cleanup batches to global counter
767 75180 : if (auto* ctx = epoll::find_context(this))
768 : {
769 75154 : ++ctx->private_outstanding_work;
770 75154 : ctx->private_queue.push(h);
771 75154 : return;
772 : }
773 :
774 : // Slow path: cross-thread post requires mutex
775 26 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
776 :
777 26 : std::unique_lock lock(mutex_);
778 26 : completed_ops_.push(h);
779 26 : wake_one_thread_and_unlock(lock);
780 26 : }
781 :
782 : inline bool
783 703 : epoll_scheduler::running_in_this_thread() const noexcept
784 : {
785 703 : for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
786 457 : if (c->key == this)
787 457 : return true;
788 246 : return false;
789 : }
790 :
791 : inline void
792 188 : epoll_scheduler::stop()
793 : {
794 188 : std::unique_lock lock(mutex_);
795 188 : if (!stopped_)
796 : {
797 168 : stopped_ = true;
798 168 : signal_all(lock);
799 168 : interrupt_reactor();
800 : }
801 188 : }
802 :
803 : inline bool
804 18 : epoll_scheduler::stopped() const noexcept
805 : {
806 18 : std::unique_lock lock(mutex_);
807 36 : return stopped_;
808 18 : }
809 :
810 : inline void
811 52 : epoll_scheduler::restart()
812 : {
813 52 : std::unique_lock lock(mutex_);
814 52 : stopped_ = false;
815 52 : }
816 :
817 : inline std::size_t
818 187 : epoll_scheduler::run()
819 : {
820 374 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
821 : {
822 15 : stop();
823 15 : return 0;
824 : }
825 :
826 172 : epoll::thread_context_guard ctx(this);
827 172 : std::unique_lock lock(mutex_);
828 :
829 172 : std::size_t n = 0;
830 : for (;;)
831 : {
832 139415 : if (!do_one(lock, -1, &ctx.frame_))
833 172 : break;
834 139243 : if (n != (std::numeric_limits<std::size_t>::max)())
835 139243 : ++n;
836 139243 : if (!lock.owns_lock())
837 63922 : lock.lock();
838 : }
839 172 : return n;
840 172 : }
841 :
842 : inline std::size_t
843 2 : epoll_scheduler::run_one()
844 : {
845 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
846 : {
847 MIS 0 : stop();
848 0 : return 0;
849 : }
850 :
851 HIT 2 : epoll::thread_context_guard ctx(this);
852 2 : std::unique_lock lock(mutex_);
853 2 : return do_one(lock, -1, &ctx.frame_);
854 2 : }
855 :
856 : inline std::size_t
857 34 : epoll_scheduler::wait_one(long usec)
858 : {
859 68 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
860 : {
861 7 : stop();
862 7 : return 0;
863 : }
864 :
865 27 : epoll::thread_context_guard ctx(this);
866 27 : std::unique_lock lock(mutex_);
867 27 : return do_one(lock, usec, &ctx.frame_);
868 27 : }
869 :
870 : inline std::size_t
871 2 : epoll_scheduler::poll()
872 : {
873 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
874 : {
875 1 : stop();
876 1 : return 0;
877 : }
878 :
879 1 : epoll::thread_context_guard ctx(this);
880 1 : std::unique_lock lock(mutex_);
881 :
882 1 : std::size_t n = 0;
883 : for (;;)
884 : {
885 3 : if (!do_one(lock, 0, &ctx.frame_))
886 1 : break;
887 2 : if (n != (std::numeric_limits<std::size_t>::max)())
888 2 : ++n;
889 2 : if (!lock.owns_lock())
890 2 : lock.lock();
891 : }
892 1 : return n;
893 1 : }
894 :
895 : inline std::size_t
896 4 : epoll_scheduler::poll_one()
897 : {
898 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
899 : {
900 2 : stop();
901 2 : return 0;
902 : }
903 :
904 2 : epoll::thread_context_guard ctx(this);
905 2 : std::unique_lock lock(mutex_);
906 2 : return do_one(lock, 0, &ctx.frame_);
907 2 : }
908 :
909 : inline void
910 9128 : epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
911 : {
912 9128 : epoll_event ev{};
913 9128 : ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
914 9128 : ev.data.ptr = desc;
915 :
916 9128 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
917 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
918 :
919 HIT 9128 : desc->registered_events = ev.events;
920 9128 : desc->fd = fd;
921 9128 : desc->scheduler_ = this;
922 :
923 9128 : std::lock_guard lock(desc->mutex);
924 9128 : desc->read_ready = false;
925 9128 : desc->write_ready = false;
926 9128 : }
927 :
928 : inline void
929 9128 : epoll_scheduler::deregister_descriptor(int fd) const
930 : {
931 9128 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
932 9128 : }
933 :
934 : inline void
935 14728 : epoll_scheduler::work_started() noexcept
936 : {
937 14728 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
938 14728 : }
939 :
940 : inline void
941 21154 : epoll_scheduler::work_finished() noexcept
942 : {
943 42308 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
944 162 : stop();
945 21154 : }
946 :
947 : inline void
948 48463 : epoll_scheduler::compensating_work_started() const noexcept
949 : {
950 48463 : auto* ctx = epoll::find_context(this);
951 48463 : if (ctx)
952 48463 : ++ctx->private_outstanding_work;
953 48463 : }
954 :
955 : inline void
956 MIS 0 : epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
957 : {
958 : // Note: outstanding_work_ was already incremented when posting
959 0 : std::unique_lock lock(mutex_);
960 0 : completed_ops_.splice(queue);
961 0 : if (count > 0)
962 0 : maybe_unlock_and_signal_one(lock);
963 0 : }
964 :
965 : inline void
966 HIT 9055 : epoll_scheduler::post_deferred_completions(op_queue& ops) const
967 : {
968 9055 : if (ops.empty())
969 9055 : return;
970 :
971 : // Fast path: if on scheduler thread, use private queue
972 MIS 0 : if (auto* ctx = epoll::find_context(this))
973 : {
974 0 : ctx->private_queue.splice(ops);
975 0 : return;
976 : }
977 :
978 : // Slow path: add to global queue and wake a thread
979 0 : std::unique_lock lock(mutex_);
980 0 : completed_ops_.splice(ops);
981 0 : wake_one_thread_and_unlock(lock);
982 0 : }
983 :
984 : inline void
985 HIT 399 : epoll_scheduler::interrupt_reactor() const
986 : {
987 : // Only write if not already armed to avoid redundant writes
988 399 : bool expected = false;
989 399 : if (eventfd_armed_.compare_exchange_strong(
990 : expected, true, std::memory_order_release,
991 : std::memory_order_relaxed))
992 : {
993 277 : std::uint64_t val = 1;
994 277 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
995 : }
996 399 : }
997 :
998 : inline void
999 373 : epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
1000 : {
1001 373 : state_ |= 1;
1002 373 : cond_.notify_all();
1003 373 : }
1004 :
1005 : inline bool
1006 1685 : epoll_scheduler::maybe_unlock_and_signal_one(
1007 : std::unique_lock<std::mutex>& lock) const
1008 : {
1009 1685 : state_ |= 1;
1010 1685 : if (state_ > 1)
1011 : {
1012 MIS 0 : lock.unlock();
1013 0 : cond_.notify_one();
1014 0 : return true;
1015 : }
1016 HIT 1685 : return false;
1017 : }
1018 :
1019 : inline bool
1020 176249 : epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1021 : {
1022 176249 : state_ |= 1;
1023 176249 : bool have_waiters = state_ > 1;
1024 176249 : lock.unlock();
1025 176249 : if (have_waiters)
1026 MIS 0 : cond_.notify_one();
1027 HIT 176249 : return have_waiters;
1028 : }
1029 :
1030 : inline void
1031 1 : epoll_scheduler::clear_signal() const
1032 : {
1033 1 : state_ &= ~std::size_t(1);
1034 1 : }
1035 :
1036 : inline void
1037 1 : epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1038 : {
1039 2 : while ((state_ & 1) == 0)
1040 : {
1041 1 : state_ += 2;
1042 1 : cond_.wait(lock);
1043 1 : state_ -= 2;
1044 : }
1045 1 : }
1046 :
1047 : inline void
1048 MIS 0 : epoll_scheduler::wait_for_signal_for(
1049 : std::unique_lock<std::mutex>& lock, long timeout_us) const
1050 : {
1051 0 : if ((state_ & 1) == 0)
1052 : {
1053 0 : state_ += 2;
1054 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1055 0 : state_ -= 2;
1056 : }
1057 0 : }
1058 :
1059 : inline void
1060 HIT 1685 : epoll_scheduler::wake_one_thread_and_unlock(
1061 : std::unique_lock<std::mutex>& lock) const
1062 : {
1063 1685 : if (maybe_unlock_and_signal_one(lock))
1064 MIS 0 : return;
1065 :
1066 HIT 1685 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1067 : {
1068 26 : task_interrupted_ = true;
1069 26 : lock.unlock();
1070 26 : interrupt_reactor();
1071 : }
1072 : else
1073 : {
1074 1659 : lock.unlock();
1075 : }
1076 : }
1077 :
1078 139276 : inline epoll_scheduler::work_cleanup::~work_cleanup()
1079 : {
1080 139276 : if (ctx)
1081 : {
1082 139276 : long produced = ctx->private_outstanding_work;
1083 139276 : if (produced > 1)
1084 7 : scheduler->outstanding_work_.fetch_add(
1085 : produced - 1, std::memory_order_relaxed);
1086 139269 : else if (produced < 1)
1087 15481 : scheduler->work_finished();
1088 139276 : ctx->private_outstanding_work = 0;
1089 :
1090 139276 : if (!ctx->private_queue.empty())
1091 : {
1092 75332 : lock->lock();
1093 75332 : scheduler->completed_ops_.splice(ctx->private_queue);
1094 : }
1095 : }
1096 : else
1097 : {
1098 MIS 0 : scheduler->work_finished();
1099 : }
1100 HIT 139276 : }
1101 :
1102 92516 : inline epoll_scheduler::task_cleanup::~task_cleanup()
1103 : {
1104 46258 : if (!ctx)
1105 MIS 0 : return;
1106 :
1107 HIT 46258 : if (ctx->private_outstanding_work > 0)
1108 : {
1109 4729 : scheduler->outstanding_work_.fetch_add(
1110 4729 : ctx->private_outstanding_work, std::memory_order_relaxed);
1111 4729 : ctx->private_outstanding_work = 0;
1112 : }
1113 :
1114 46258 : if (!ctx->private_queue.empty())
1115 : {
1116 4729 : if (!lock->owns_lock())
1117 MIS 0 : lock->lock();
1118 HIT 4729 : scheduler->completed_ops_.splice(ctx->private_queue);
1119 : }
1120 46258 : }
1121 :
1122 : inline void
1123 9453 : epoll_scheduler::update_timerfd() const
1124 : {
1125 9453 : auto nearest = timer_svc_->nearest_expiry();
1126 :
1127 9453 : itimerspec ts{};
1128 9453 : int flags = 0;
1129 :
1130 9453 : if (nearest == timer_service::time_point::max())
1131 : {
1132 : // No timers - disarm by setting to 0 (relative)
1133 : }
1134 : else
1135 : {
1136 9408 : auto now = std::chrono::steady_clock::now();
1137 9408 : if (nearest <= now)
1138 : {
1139 : // Use 1ns instead of 0 - zero disarms the timerfd
1140 219 : ts.it_value.tv_nsec = 1;
1141 : }
1142 : else
1143 : {
1144 9189 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1145 9189 : nearest - now)
1146 9189 : .count();
1147 9189 : ts.it_value.tv_sec = nsec / 1000000000;
1148 9189 : ts.it_value.tv_nsec = nsec % 1000000000;
1149 : // Ensure non-zero to avoid disarming if duration rounds to 0
1150 9189 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1151 MIS 0 : ts.it_value.tv_nsec = 1;
1152 : }
1153 : }
1154 :
1155 HIT 9453 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1156 MIS 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
1157 HIT 9453 : }
1158 :
1159 : inline void
1160 46258 : epoll_scheduler::run_task(
1161 : std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1162 : {
1163 46258 : int timeout_ms = task_interrupted_ ? 0 : -1;
1164 :
1165 46258 : if (lock.owns_lock())
1166 9285 : lock.unlock();
1167 :
1168 46258 : task_cleanup on_exit{this, &lock, ctx};
1169 :
1170 : // Flush deferred timerfd programming before blocking
1171 46258 : if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1172 4724 : update_timerfd();
1173 :
1174 : // Event loop runs without mutex held
1175 : epoll_event events[128];
1176 46258 : int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1177 :
1178 46258 : if (nfds < 0 && errno != EINTR)
1179 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_wait");
1180 :
1181 HIT 46258 : bool check_timers = false;
1182 46258 : op_queue local_ops;
1183 :
1184 : // Process events without holding the mutex
1185 108606 : for (int i = 0; i < nfds; ++i)
1186 : {
1187 62348 : if (events[i].data.ptr == nullptr)
1188 : {
1189 : std::uint64_t val;
1190 : // Mutex released above; analyzer can't track unlock via ref
1191 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1192 72 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1193 72 : eventfd_armed_.store(false, std::memory_order_relaxed);
1194 72 : continue;
1195 72 : }
1196 :
1197 62276 : if (events[i].data.ptr == &timer_fd_)
1198 : {
1199 : std::uint64_t expirations;
1200 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1201 : [[maybe_unused]] auto r =
1202 4729 : ::read(timer_fd_, &expirations, sizeof(expirations));
1203 4729 : check_timers = true;
1204 4729 : continue;
1205 4729 : }
1206 :
1207 : // Deferred I/O: just set ready events and enqueue descriptor
1208 : // No per-descriptor mutex locking in reactor hot path!
1209 57547 : auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1210 57547 : desc->add_ready_events(events[i].events);
1211 :
1212 : // Only enqueue if not already enqueued
1213 57547 : bool expected = false;
1214 57547 : if (desc->is_enqueued_.compare_exchange_strong(
1215 : expected, true, std::memory_order_release,
1216 : std::memory_order_relaxed))
1217 : {
1218 57547 : local_ops.push(desc);
1219 : }
1220 : }
1221 :
1222 : // Process timers only when timerfd fires
1223 46258 : if (check_timers)
1224 : {
1225 4729 : timer_svc_->process_expired();
1226 4729 : update_timerfd();
1227 : }
1228 :
1229 46258 : lock.lock();
1230 :
1231 46258 : if (!local_ops.empty())
1232 36518 : completed_ops_.splice(local_ops);
1233 46258 : }
1234 :
1235 : inline std::size_t
1236 139449 : epoll_scheduler::do_one(
1237 : std::unique_lock<std::mutex>& lock,
1238 : long timeout_us,
1239 : epoll::scheduler_context* ctx)
1240 : {
1241 : for (;;)
1242 : {
1243 185708 : if (stopped_)
1244 173 : return 0;
1245 :
1246 185535 : scheduler_op* op = completed_ops_.pop();
1247 :
1248 : // Handle reactor sentinel - time to poll for I/O
1249 185535 : if (op == &task_op_)
1250 : {
1251 46258 : bool more_handlers = !completed_ops_.empty();
1252 :
1253 : // Nothing to run the reactor for: no pending work to wait on,
1254 : // or caller requested a non-blocking poll
1255 55543 : if (!more_handlers &&
1256 18570 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1257 : timeout_us == 0))
1258 : {
1259 MIS 0 : completed_ops_.push(&task_op_);
1260 0 : return 0;
1261 : }
1262 :
1263 HIT 46258 : task_interrupted_ = more_handlers || timeout_us == 0;
1264 46258 : task_running_.store(true, std::memory_order_release);
1265 :
1266 46258 : if (more_handlers)
1267 36973 : unlock_and_signal_one(lock);
1268 :
1269 46258 : run_task(lock, ctx);
1270 :
1271 46258 : task_running_.store(false, std::memory_order_relaxed);
1272 46258 : completed_ops_.push(&task_op_);
1273 46258 : continue;
1274 46258 : }
1275 :
1276 : // Handle operation
1277 139277 : if (op != nullptr)
1278 : {
1279 139276 : bool more = !completed_ops_.empty();
1280 :
1281 139276 : if (more)
1282 139276 : ctx->unassisted = !unlock_and_signal_one(lock);
1283 : else
1284 : {
1285 MIS 0 : ctx->unassisted = false;
1286 0 : lock.unlock();
1287 : }
1288 :
1289 HIT 139276 : work_cleanup on_exit{this, &lock, ctx};
1290 :
1291 139276 : (*op)();
1292 139276 : return 1;
1293 139276 : }
1294 :
1295 : // No pending work to wait on, or caller requested non-blocking poll
1296 2 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1297 : timeout_us == 0)
1298 MIS 0 : return 0;
1299 :
1300 HIT 1 : clear_signal();
1301 1 : if (timeout_us < 0)
1302 1 : wait_for_signal(lock);
1303 : else
1304 MIS 0 : wait_for_signal_for(lock, timeout_us);
1305 HIT 46259 : }
1306 : }
1307 :
1308 : } // namespace boost::corosio::detail
1309 :
1310 : #endif // BOOST_COROSIO_HAS_EPOLL
1311 :
1312 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
|