include/boost/corosio/detail/timer_service.hpp

86.2% Lines (305/354) 95.5% Functions (42/44)
include/boost/corosio/detail/timer_service.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13
14 #include <boost/corosio/timer.hpp>
15 #include <boost/corosio/io_context.hpp>
16 #include <boost/corosio/detail/scheduler_op.hpp>
17 #include <boost/corosio/native/native_scheduler.hpp>
18 #include <boost/corosio/detail/intrusive.hpp>
19 #include <boost/corosio/detail/thread_local_ptr.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/capy/ex/execution_context.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <system_error>
24
25 #include <atomic>
26 #include <chrono>
27 #include <coroutine>
28 #include <cstddef>
29 #include <limits>
30 #include <mutex>
31 #include <optional>
32 #include <stop_token>
33 #include <vector>
34
35 namespace boost::corosio::detail {
36
37 struct scheduler;
38
39 /*
40 Timer Service
41 =============
42
43 Data Structures
44 ---------------
45 waiter_node holds per-waiter state: coroutine handle, executor,
46 error output, stop_token, embedded completion_op. Each concurrent
47 co_await t.wait() allocates one waiter_node.
48
49 timer_service::implementation holds per-timer state: expiry,
50 heap index, and an intrusive_list of waiter_nodes. Multiple
51 coroutines can wait on the same timer simultaneously.
52
53 timer_service owns a min-heap of active timers, a free list
54 of recycled impls, and a free list of recycled waiter_nodes. The
55 heap is ordered by expiry time; the scheduler queries
56 nearest_expiry() to set the epoll/timerfd timeout.
57
58 Optimization Strategy
59 ---------------------
60 1. Deferred heap insertion — expires_after() stores the expiry
61 but does not insert into the heap. Insertion happens in wait().
62 2. Thread-local impl cache — single-slot per-thread cache.
63 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65 5. might_have_pending_waits_ flag — skips lock when no wait issued.
66 6. Thread-local waiter cache — single-slot per-thread cache.
67
68 Concurrency
69 -----------
70 stop_token callbacks can fire from any thread. The impl_
71 pointer on waiter_node is used as a "still in list" marker.
72 */
73
74 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75
76 inline void timer_service_invalidate_cache() noexcept;
77
78 // timer_service class body — member function definitions are
79 // out-of-class (after implementation and waiter_node are complete)
80 class BOOST_COROSIO_DECL timer_service final
81 : public capy::execution_context::service
82 , public io_object::io_service
83 {
84 public:
85 using clock_type = std::chrono::steady_clock;
86 using time_point = clock_type::time_point;
87
88 class callback
89 {
90 void* ctx_ = nullptr;
91 void (*fn_)(void*) = nullptr;
92
93 public:
94 340 callback() = default;
95 340 callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
96
97 explicit operator bool() const noexcept
98 {
99 return fn_ != nullptr;
100 }
101 8156 void operator()() const
102 {
103 8156 if (fn_)
104 8156 fn_(ctx_);
105 8156 }
106 };
107
108 struct implementation;
109
110 private:
111 struct heap_entry
112 {
113 time_point time_;
114 implementation* timer_;
115 };
116
117 scheduler* sched_ = nullptr;
118 mutable std::mutex mutex_;
119 std::vector<heap_entry> heap_;
120 implementation* free_list_ = nullptr;
121 waiter_node* waiter_free_list_ = nullptr;
122 callback on_earliest_changed_;
123 // Avoids mutex in nearest_expiry() and empty()
124 mutable std::atomic<std::int64_t> cached_nearest_ns_{
125 (std::numeric_limits<std::int64_t>::max)()};
126
127 public:
128 340 inline timer_service(capy::execution_context&, scheduler& sched)
129 340 : sched_(&sched)
130 {
131 340 }
132
133 16370 inline scheduler& get_scheduler() noexcept
134 {
135 16370 return *sched_;
136 }
137
138 680 ~timer_service() override = default;
139
140 timer_service(timer_service const&) = delete;
141 timer_service& operator=(timer_service const&) = delete;
142
143 340 inline void set_on_earliest_changed(callback cb)
144 {
145 340 on_earliest_changed_ = cb;
146 340 }
147
148 inline bool empty() const noexcept
149 {
150 return cached_nearest_ns_.load(std::memory_order_acquire) ==
151 (std::numeric_limits<std::int64_t>::max)();
152 }
153
154 19242 inline time_point nearest_expiry() const noexcept
155 {
156 19242 auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
157 19242 return time_point(time_point::duration(ns));
158 }
159
160 inline void shutdown() override;
161 inline io_object::implementation* construct() override;
162 inline void destroy(io_object::implementation* p) override;
163 inline void destroy_impl(implementation& impl);
164 inline waiter_node* create_waiter();
165 inline void destroy_waiter(waiter_node* w);
166 inline std::size_t update_timer(implementation& impl, time_point new_time);
167 inline void insert_waiter(implementation& impl, waiter_node* w);
168 inline std::size_t cancel_timer(implementation& impl);
169 inline void cancel_waiter(waiter_node* w);
170 inline std::size_t cancel_one_waiter(implementation& impl);
171 inline std::size_t process_expired();
172
173 private:
174 137796 inline void refresh_cached_nearest() noexcept
175 {
176 137796 auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
177 137405 : heap_[0].time_.time_since_epoch().count();
178 137796 cached_nearest_ns_.store(ns, std::memory_order_release);
179 137796 }
180
181 inline void remove_timer_impl(implementation& impl);
182 inline void up_heap(std::size_t index);
183 inline void down_heap(std::size_t index);
184 inline void swap_heap(std::size_t i1, std::size_t i2);
185 };
186
187 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
188 : intrusive_list<waiter_node>::node
189 {
190 // Embedded completion op — avoids heap allocation per fire/cancel
191 struct completion_op final : scheduler_op
192 {
193 waiter_node* waiter_ = nullptr;
194
195 static void do_complete(
196 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
197
198 142 completion_op() noexcept : scheduler_op(&do_complete) {}
199
200 void operator()() override;
201 // No-op — lifetime owned by waiter_node, not the scheduler queue
202 void destroy() override {}
203 };
204
205 // Per-waiter stop_token cancellation
206 struct canceller
207 {
208 waiter_node* waiter_;
209 void operator()() const;
210 };
211
212 // nullptr once removed from timer's waiter list (concurrency marker)
213 timer_service::implementation* impl_ = nullptr;
214 timer_service* svc_ = nullptr;
215 std::coroutine_handle<> h_;
216 capy::executor_ref d_;
217 std::error_code* ec_out_ = nullptr;
218 std::stop_token token_;
219 std::optional<std::stop_callback<canceller>> stop_cb_;
220 completion_op op_;
221 std::error_code ec_value_;
222 waiter_node* next_free_ = nullptr;
223
224 142 waiter_node() noexcept
225 142 {
226 142 op_.waiter_ = this;
227 142 }
228 };
229
230 struct timer_service::implementation final : timer::implementation
231 {
232 using clock_type = std::chrono::steady_clock;
233 using time_point = clock_type::time_point;
234 using duration = clock_type::duration;
235
236 timer_service* svc_ = nullptr;
237 intrusive_list<waiter_node> waiters_;
238
239 // Free list linkage (reused when impl is on free_list)
240 implementation* next_free_ = nullptr;
241
242 inline explicit implementation(timer_service& svc) noexcept;
243
244 inline std::coroutine_handle<> wait(
245 std::coroutine_handle<>,
246 capy::executor_ref,
247 std::stop_token,
248 std::error_code*) override;
249 };
250
251 // Thread-local caches avoid hot-path mutex acquisitions:
252 // 1. Impl cache — single-slot, validated by comparing svc_
253 // 2. Waiter cache — single-slot, no service affinity
254 // All caches are cleared by timer_service_invalidate_cache() during shutdown.
255
256 inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
257 inline thread_local_ptr<waiter_node> tl_cached_waiter;
258
259 inline timer_service::implementation*
260 8475 try_pop_tl_cache(timer_service* svc) noexcept
261 {
262 8475 auto* impl = tl_cached_impl.get();
263 8475 if (impl)
264 {
265 8302 tl_cached_impl.set(nullptr);
266 8302 if (impl->svc_ == svc)
267 8302 return impl;
268 // Stale impl from a destroyed service
269 delete impl;
270 }
271 173 return nullptr;
272 }
273
274 inline bool
275 8475 try_push_tl_cache(timer_service::implementation* impl) noexcept
276 {
277 8475 if (!tl_cached_impl.get())
278 {
279 8427 tl_cached_impl.set(impl);
280 8427 return true;
281 }
282 48 return false;
283 }
284
285 inline waiter_node*
286 8185 try_pop_waiter_tl_cache() noexcept
287 {
288 8185 auto* w = tl_cached_waiter.get();
289 8185 if (w)
290 {
291 8043 tl_cached_waiter.set(nullptr);
292 8043 return w;
293 }
294 142 return nullptr;
295 }
296
297 inline bool
298 8185 try_push_waiter_tl_cache(waiter_node* w) noexcept
299 {
300 8185 if (!tl_cached_waiter.get())
301 {
302 8127 tl_cached_waiter.set(w);
303 8127 return true;
304 }
305 58 return false;
306 }
307
308 inline void
309 340 timer_service_invalidate_cache() noexcept
310 {
311 340 delete tl_cached_impl.get();
312 340 tl_cached_impl.set(nullptr);
313
314 340 delete tl_cached_waiter.get();
315 340 tl_cached_waiter.set(nullptr);
316 340 }
317
318 // timer_service out-of-class member function definitions
319
320 173 inline timer_service::implementation::implementation(
321 173 timer_service& svc) noexcept
322 173 : svc_(&svc)
323 {
324 173 }
325
326 inline void
327 340 timer_service::shutdown()
328 {
329 340 timer_service_invalidate_cache();
330
331 // Cancel waiting timers still in the heap
332 340 for (auto& entry : heap_)
333 {
334 auto* impl = entry.timer_;
335 while (auto* w = impl->waiters_.pop_front())
336 {
337 w->stop_cb_.reset();
338 w->h_.destroy();
339 sched_->work_finished();
340 delete w;
341 }
342 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
343 delete impl;
344 }
345 340 heap_.clear();
346 340 cached_nearest_ns_.store(
347 (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
348
349 // Delete free-listed impls
350 388 while (free_list_)
351 {
352 48 auto* next = free_list_->next_free_;
353 48 delete free_list_;
354 48 free_list_ = next;
355 }
356
357 // Delete free-listed waiters
358 398 while (waiter_free_list_)
359 {
360 58 auto* next = waiter_free_list_->next_free_;
361 58 delete waiter_free_list_;
362 58 waiter_free_list_ = next;
363 }
364 340 }
365
366 inline io_object::implementation*
367 8475 timer_service::construct()
368 {
369 8475 implementation* impl = try_pop_tl_cache(this);
370 8475 if (impl)
371 {
372 8302 impl->svc_ = this;
373 8302 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
374 8302 impl->might_have_pending_waits_ = false;
375 8302 return impl;
376 }
377
378 173 std::lock_guard lock(mutex_);
379 173 if (free_list_)
380 {
381 impl = free_list_;
382 free_list_ = impl->next_free_;
383 impl->next_free_ = nullptr;
384 impl->svc_ = this;
385 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
386 impl->might_have_pending_waits_ = false;
387 }
388 else
389 {
390 173 impl = new implementation(*this);
391 }
392 173 return impl;
393 173 }
394
395 inline void
396 8475 timer_service::destroy(io_object::implementation* p)
397 {
398 8475 destroy_impl(static_cast<implementation&>(*p));
399 8475 }
400
401 inline void
402 8475 timer_service::destroy_impl(implementation& impl)
403 {
404 8475 cancel_timer(impl);
405
406 8475 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
407 {
408 std::lock_guard lock(mutex_);
409 remove_timer_impl(impl);
410 refresh_cached_nearest();
411 }
412
413 8475 if (try_push_tl_cache(&impl))
414 8427 return;
415
416 48 std::lock_guard lock(mutex_);
417 48 impl.next_free_ = free_list_;
418 48 free_list_ = &impl;
419 48 }
420
421 inline waiter_node*
422 8185 timer_service::create_waiter()
423 {
424 8185 if (auto* w = try_pop_waiter_tl_cache())
425 8043 return w;
426
427 142 std::lock_guard lock(mutex_);
428 142 if (waiter_free_list_)
429 {
430 auto* w = waiter_free_list_;
431 waiter_free_list_ = w->next_free_;
432 w->next_free_ = nullptr;
433 return w;
434 }
435
436 142 return new waiter_node();
437 142 }
438
439 inline void
440 8185 timer_service::destroy_waiter(waiter_node* w)
441 {
442 8185 if (try_push_waiter_tl_cache(w))
443 8127 return;
444
445 58 std::lock_guard lock(mutex_);
446 58 w->next_free_ = waiter_free_list_;
447 58 waiter_free_list_ = w;
448 58 }
449
450 inline std::size_t
451 6 timer_service::update_timer(implementation& impl, time_point new_time)
452 {
453 bool in_heap =
454 6 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
455 6 if (!in_heap && impl.waiters_.empty())
456 return 0;
457
458 6 bool notify = false;
459 6 intrusive_list<waiter_node> canceled;
460
461 {
462 6 std::lock_guard lock(mutex_);
463
464 16 while (auto* w = impl.waiters_.pop_front())
465 {
466 10 w->impl_ = nullptr;
467 10 canceled.push_back(w);
468 10 }
469
470 6 if (impl.heap_index_ < heap_.size())
471 {
472 6 time_point old_time = heap_[impl.heap_index_].time_;
473 6 heap_[impl.heap_index_].time_ = new_time;
474
475 6 if (new_time < old_time)
476 6 up_heap(impl.heap_index_);
477 else
478 down_heap(impl.heap_index_);
479
480 6 notify = (impl.heap_index_ == 0);
481 }
482
483 6 refresh_cached_nearest();
484 6 }
485
486 6 std::size_t count = 0;
487 16 while (auto* w = canceled.pop_front())
488 {
489 10 w->ec_value_ = make_error_code(capy::error::canceled);
490 10 sched_->post(&w->op_);
491 10 ++count;
492 10 }
493
494 6 if (notify)
495 6 on_earliest_changed_();
496
497 6 return count;
498 }
499
500 inline void
501 8185 timer_service::insert_waiter(implementation& impl, waiter_node* w)
502 {
503 8185 bool notify = false;
504 {
505 8185 std::lock_guard lock(mutex_);
506 8185 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
507 {
508 8163 impl.heap_index_ = heap_.size();
509 8163 heap_.push_back({impl.expiry_, &impl});
510 8163 up_heap(heap_.size() - 1);
511 8163 notify = (impl.heap_index_ == 0);
512 8163 refresh_cached_nearest();
513 }
514 8185 impl.waiters_.push_back(w);
515 8185 }
516 8185 if (notify)
517 8150 on_earliest_changed_();
518 8185 }
519
520 inline std::size_t
521 8483 timer_service::cancel_timer(implementation& impl)
522 {
523 8483 if (!impl.might_have_pending_waits_)
524 8467 return 0;
525
526 // Not in heap and no waiters — just clear the flag
527 16 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
528 impl.waiters_.empty())
529 {
530 impl.might_have_pending_waits_ = false;
531 return 0;
532 }
533
534 16 intrusive_list<waiter_node> canceled;
535
536 {
537 16 std::lock_guard lock(mutex_);
538 16 remove_timer_impl(impl);
539 36 while (auto* w = impl.waiters_.pop_front())
540 {
541 20 w->impl_ = nullptr;
542 20 canceled.push_back(w);
543 20 }
544 16 refresh_cached_nearest();
545 16 }
546
547 16 impl.might_have_pending_waits_ = false;
548
549 16 std::size_t count = 0;
550 36 while (auto* w = canceled.pop_front())
551 {
552 20 w->ec_value_ = make_error_code(capy::error::canceled);
553 20 sched_->post(&w->op_);
554 20 ++count;
555 20 }
556
557 16 return count;
558 }
559
560 inline void
561 4 timer_service::cancel_waiter(waiter_node* w)
562 {
563 {
564 4 std::lock_guard lock(mutex_);
565 // Already removed by cancel_timer or process_expired
566 4 if (!w->impl_)
567 return;
568 4 auto* impl = w->impl_;
569 4 w->impl_ = nullptr;
570 4 impl->waiters_.remove(w);
571 4 if (impl->waiters_.empty())
572 {
573 2 remove_timer_impl(*impl);
574 2 impl->might_have_pending_waits_ = false;
575 }
576 4 refresh_cached_nearest();
577 4 }
578
579 4 w->ec_value_ = make_error_code(capy::error::canceled);
580 4 sched_->post(&w->op_);
581 }
582
583 inline std::size_t
584 2 timer_service::cancel_one_waiter(implementation& impl)
585 {
586 2 if (!impl.might_have_pending_waits_)
587 return 0;
588
589 2 waiter_node* w = nullptr;
590
591 {
592 2 std::lock_guard lock(mutex_);
593 2 w = impl.waiters_.pop_front();
594 2 if (!w)
595 return 0;
596 2 w->impl_ = nullptr;
597 2 if (impl.waiters_.empty())
598 {
599 remove_timer_impl(impl);
600 impl.might_have_pending_waits_ = false;
601 }
602 2 refresh_cached_nearest();
603 2 }
604
605 2 w->ec_value_ = make_error_code(capy::error::canceled);
606 2 sched_->post(&w->op_);
607 2 return 1;
608 }
609
610 inline std::size_t
611 129605 timer_service::process_expired()
612 {
613 129605 intrusive_list<waiter_node> expired;
614
615 {
616 129605 std::lock_guard lock(mutex_);
617 129605 auto now = clock_type::now();
618
619 137750 while (!heap_.empty() && heap_[0].time_ <= now)
620 {
621 8145 implementation* t = heap_[0].timer_;
622 8145 remove_timer_impl(*t);
623 16294 while (auto* w = t->waiters_.pop_front())
624 {
625 8149 w->impl_ = nullptr;
626 8149 w->ec_value_ = {};
627 8149 expired.push_back(w);
628 8149 }
629 8145 t->might_have_pending_waits_ = false;
630 }
631
632 129605 refresh_cached_nearest();
633 129605 }
634
635 129605 std::size_t count = 0;
636 137754 while (auto* w = expired.pop_front())
637 {
638 8149 sched_->post(&w->op_);
639 8149 ++count;
640 8149 }
641
642 129605 return count;
643 }
644
645 inline void
646 8163 timer_service::remove_timer_impl(implementation& impl)
647 {
648 8163 std::size_t index = impl.heap_index_;
649 8163 if (index >= heap_.size())
650 return; // Not in heap
651
652 8163 if (index == heap_.size() - 1)
653 {
654 // Last element, just pop
655 102 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
656 102 heap_.pop_back();
657 }
658 else
659 {
660 // Swap with last and reheapify
661 8061 swap_heap(index, heap_.size() - 1);
662 8061 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
663 8061 heap_.pop_back();
664
665 8061 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
666 up_heap(index);
667 else
668 8061 down_heap(index);
669 }
670 }
671
672 inline void
673 8169 timer_service::up_heap(std::size_t index)
674 {
675 16219 while (index > 0)
676 {
677 8063 std::size_t parent = (index - 1) / 2;
678 8063 if (!(heap_[index].time_ < heap_[parent].time_))
679 13 break;
680 8050 swap_heap(index, parent);
681 8050 index = parent;
682 }
683 8169 }
684
685 inline void
686 8061 timer_service::down_heap(std::size_t index)
687 {
688 8061 std::size_t child = index * 2 + 1;
689 8061 while (child < heap_.size())
690 {
691 4 std::size_t min_child = (child + 1 == heap_.size() ||
692 heap_[child].time_ < heap_[child + 1].time_)
693 4 ? child
694 4 : child + 1;
695
696 4 if (heap_[index].time_ < heap_[min_child].time_)
697 4 break;
698
699 swap_heap(index, min_child);
700 index = min_child;
701 child = index * 2 + 1;
702 }
703 8061 }
704
705 inline void
706 16111 timer_service::swap_heap(std::size_t i1, std::size_t i2)
707 {
708 16111 heap_entry tmp = heap_[i1];
709 16111 heap_[i1] = heap_[i2];
710 16111 heap_[i2] = tmp;
711 16111 heap_[i1].timer_->heap_index_ = i1;
712 16111 heap_[i2].timer_->heap_index_ = i2;
713 16111 }
714
715 // waiter_node out-of-class member function definitions
716
717 inline void
718 4 waiter_node::canceller::operator()() const
719 {
720 4 waiter_->svc_->cancel_waiter(waiter_);
721 4 }
722
723 inline void
724 waiter_node::completion_op::do_complete(
725 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
726 {
727 if (!owner)
728 return;
729 static_cast<completion_op*>(base)->operator()();
730 }
731
732 inline void
733 8185 waiter_node::completion_op::operator()()
734 {
735 8185 auto* w = waiter_;
736 8185 w->stop_cb_.reset();
737 8185 if (w->ec_out_)
738 8185 *w->ec_out_ = w->ec_value_;
739
740 8185 auto h = w->h_;
741 8185 auto d = w->d_;
742 8185 auto* svc = w->svc_;
743 8185 auto& sched = svc->get_scheduler();
744
745 8185 svc->destroy_waiter(w);
746
747 8185 d.post(h);
748 8185 sched.work_finished();
749 8185 }
750
751 inline std::coroutine_handle<>
752 8185 timer_service::implementation::wait(
753 std::coroutine_handle<> h,
754 capy::executor_ref d,
755 std::stop_token token,
756 std::error_code* ec)
757 {
758 // Already-expired fast path — no waiter_node, no mutex.
759 // Post instead of dispatch so the coroutine yields to the
760 // scheduler, allowing other queued work to run.
761 8185 if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
762 {
763 8163 if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
764 {
765 if (ec)
766 *ec = {};
767 d.post(h);
768 return std::noop_coroutine();
769 }
770 }
771
772 8185 auto* w = svc_->create_waiter();
773 8185 w->impl_ = this;
774 8185 w->svc_ = svc_;
775 8185 w->h_ = h;
776 8185 w->d_ = d;
777 8185 w->token_ = std::move(token);
778 8185 w->ec_out_ = ec;
779
780 8185 svc_->insert_waiter(*this, w);
781 8185 might_have_pending_waits_ = true;
782 8185 svc_->get_scheduler().work_started();
783
784 8185 if (w->token_.stop_possible())
785 4 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
786
787 8185 return std::noop_coroutine();
788 }
789
790 // Free functions
791
792 struct timer_service_access
793 {
794 8475 static native_scheduler& get_scheduler(io_context& ctx) noexcept
795 {
796 8475 return static_cast<native_scheduler&>(*ctx.sched_);
797 }
798 };
799
800 // Bypass find_service() mutex by reading the scheduler's cached pointer
801 inline io_object::io_service&
802 8475 timer_service_direct(capy::execution_context& ctx) noexcept
803 {
804 8475 return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
805 8475 .timer_svc_;
806 }
807
808 inline std::size_t
809 6 timer_service_update_expiry(timer::implementation& base)
810 {
811 6 auto& impl = static_cast<timer_service::implementation&>(base);
812 6 return impl.svc_->update_timer(impl, impl.expiry_);
813 }
814
815 inline std::size_t
816 8 timer_service_cancel(timer::implementation& base) noexcept
817 {
818 8 auto& impl = static_cast<timer_service::implementation&>(base);
819 8 return impl.svc_->cancel_timer(impl);
820 }
821
822 inline std::size_t
823 2 timer_service_cancel_one(timer::implementation& base) noexcept
824 {
825 2 auto& impl = static_cast<timer_service::implementation&>(base);
826 2 return impl.svc_->cancel_one_waiter(impl);
827 }
828
829 inline timer_service&
830 340 get_timer_service(capy::execution_context& ctx, scheduler& sched)
831 {
832 340 return ctx.make_service<timer_service>(sched);
833 }
834
835 } // namespace boost::corosio::detail
836
837 #endif
838