include/boost/corosio/detail/timer_service.hpp

90.4% Lines (330/365) 97.7% Functions (43/44)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13
14 #include <boost/corosio/timer.hpp>
15 #include <boost/corosio/io_context.hpp>
16 #include <boost/corosio/detail/scheduler_op.hpp>
17 #include <boost/corosio/native/native_scheduler.hpp>
18 #include <boost/corosio/detail/intrusive.hpp>
19 #include <boost/corosio/detail/thread_local_ptr.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/capy/ex/execution_context.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <system_error>
24
25 #include <atomic>
26 #include <chrono>
27 #include <coroutine>
28 #include <cstddef>
29 #include <limits>
30 #include <mutex>
31 #include <optional>
32 #include <stop_token>
33 #include <utility>
34 #include <vector>
35
36 namespace boost::corosio::detail {
37
38 struct scheduler;
39
40 /*
41 Timer Service
42 =============
43
44 Data Structures
45 ---------------
46 waiter_node holds per-waiter state: coroutine handle, executor,
47 error output, stop_token, embedded completion_op. Each concurrent
48 co_await t.wait() allocates one waiter_node.
49
50 timer_service::implementation holds per-timer state: expiry,
51 heap index, and an intrusive_list of waiter_nodes. Multiple
52 coroutines can wait on the same timer simultaneously.
53
54 timer_service owns a min-heap of active timers, a free list
55 of recycled impls, and a free list of recycled waiter_nodes. The
56 heap is ordered by expiry time; the scheduler queries
57 nearest_expiry() to set the epoll/timerfd timeout.
58
59 Optimization Strategy
60 ---------------------
61 1. Deferred heap insertion — expires_after() stores the expiry
62 but does not insert into the heap. Insertion happens in wait().
63 2. Thread-local impl cache — single-slot per-thread cache.
64 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66 5. might_have_pending_waits_ flag — skips lock when no wait issued.
67 6. Thread-local waiter cache — single-slot per-thread cache.
68
69 Concurrency
70 -----------
71 stop_token callbacks can fire from any thread. The impl_
72 pointer on waiter_node is used as a "still in list" marker.
73 */
74
75 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76
77 inline void timer_service_invalidate_cache() noexcept;
78
79 // timer_service class body — member function definitions are
80 // out-of-class (after implementation and waiter_node are complete)
81 class BOOST_COROSIO_DECL timer_service final
82 : public capy::execution_context::service
83 , public io_object::io_service
84 {
85 public:
86 using clock_type = std::chrono::steady_clock;
87 using time_point = clock_type::time_point;
88
89 class callback
90 {
91 void* ctx_ = nullptr;
92 void (*fn_)(void*) = nullptr;
93
94 public:
95 412x callback() = default;
96 412x callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
97
98 explicit operator bool() const noexcept
99 {
100 return fn_ != nullptr;
101 }
102 8257x void operator()() const
103 {
104 8257x if (fn_)
105 8257x fn_(ctx_);
106 8257x }
107 };
108
109 struct implementation;
110
111 private:
112 struct heap_entry
113 {
114 time_point time_;
115 implementation* timer_;
116 };
117
118 scheduler* sched_ = nullptr;
119 mutable std::mutex mutex_;
120 std::vector<heap_entry> heap_;
121 implementation* free_list_ = nullptr;
122 waiter_node* waiter_free_list_ = nullptr;
123 callback on_earliest_changed_;
124 // Avoids mutex in nearest_expiry() and empty()
125 mutable std::atomic<std::int64_t> cached_nearest_ns_{
126 (std::numeric_limits<std::int64_t>::max)()};
127
128 public:
129 412x inline timer_service(capy::execution_context&, scheduler& sched)
130 412x : sched_(&sched)
131 {
132 412x }
133
134 16598x inline scheduler& get_scheduler() noexcept
135 {
136 16598x return *sched_;
137 }
138
139 824x ~timer_service() override = default;
140
141 timer_service(timer_service const&) = delete;
142 timer_service& operator=(timer_service const&) = delete;
143
144 412x inline void set_on_earliest_changed(callback cb)
145 {
146 412x on_earliest_changed_ = cb;
147 412x }
148
149 inline bool empty() const noexcept
150 {
151 return cached_nearest_ns_.load(std::memory_order_acquire) ==
152 (std::numeric_limits<std::int64_t>::max)();
153 }
154
155 19443x inline time_point nearest_expiry() const noexcept
156 {
157 19443x auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
158 19443x return time_point(time_point::duration(ns));
159 }
160
161 inline void shutdown() override;
162 inline io_object::implementation* construct() override;
163 inline void destroy(io_object::implementation* p) override;
164 inline void destroy_impl(implementation& impl);
165 inline waiter_node* create_waiter();
166 inline void destroy_waiter(waiter_node* w);
167 inline std::size_t update_timer(implementation& impl, time_point new_time);
168 inline void insert_waiter(implementation& impl, waiter_node* w);
169 inline std::size_t cancel_timer(implementation& impl);
170 inline void cancel_waiter(waiter_node* w);
171 inline std::size_t cancel_one_waiter(implementation& impl);
172 inline std::size_t process_expired();
173
174 private:
175 99619x inline void refresh_cached_nearest() noexcept
176 {
177 99619x auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
178 99141x : heap_[0].time_.time_since_epoch().count();
179 99619x cached_nearest_ns_.store(ns, std::memory_order_release);
180 99619x }
181
182 inline void remove_timer_impl(implementation& impl);
183 inline void up_heap(std::size_t index);
184 inline void down_heap(std::size_t index);
185 inline void swap_heap(std::size_t i1, std::size_t i2);
186 };
187
188 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
189 : intrusive_list<waiter_node>::node
190 {
191 // Embedded completion op — avoids heap allocation per fire/cancel
192 struct completion_op final : scheduler_op
193 {
194 waiter_node* waiter_ = nullptr;
195
196 static void do_complete(
197 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
198
199 194x completion_op() noexcept : scheduler_op(&do_complete) {}
200
201 void operator()() override;
202 void destroy() override;
203 };
204
205 // Per-waiter stop_token cancellation
206 struct canceller
207 {
208 waiter_node* waiter_;
209 void operator()() const;
210 };
211
212 // nullptr once removed from timer's waiter list (concurrency marker)
213 timer_service::implementation* impl_ = nullptr;
214 timer_service* svc_ = nullptr;
215 std::coroutine_handle<> h_;
216 capy::executor_ref d_;
217 std::error_code* ec_out_ = nullptr;
218 std::stop_token token_;
219 std::optional<std::stop_callback<canceller>> stop_cb_;
220 completion_op op_;
221 std::error_code ec_value_;
222 waiter_node* next_free_ = nullptr;
223
224 194x waiter_node() noexcept
225 194x {
226 194x op_.waiter_ = this;
227 194x }
228 };
229
230 struct timer_service::implementation final : timer::implementation
231 {
232 using clock_type = std::chrono::steady_clock;
233 using time_point = clock_type::time_point;
234 using duration = clock_type::duration;
235
236 timer_service* svc_ = nullptr;
237 intrusive_list<waiter_node> waiters_;
238
239 // Free list linkage (reused when impl is on free_list)
240 implementation* next_free_ = nullptr;
241
242 inline explicit implementation(timer_service& svc) noexcept;
243
244 inline std::coroutine_handle<> wait(
245 std::coroutine_handle<>,
246 capy::executor_ref,
247 std::stop_token,
248 std::error_code*) override;
249 };
250
251 // Thread-local caches avoid hot-path mutex acquisitions:
252 // 1. Impl cache — single-slot, validated by comparing svc_
253 // 2. Waiter cache — single-slot, no service affinity
254 // All caches are cleared by timer_service_invalidate_cache() during shutdown.
255
256 inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
257 inline thread_local_ptr<waiter_node> tl_cached_waiter;
258
259 inline timer_service::implementation*
260 8576x try_pop_tl_cache(timer_service* svc) noexcept
261 {
262 8576x auto* impl = tl_cached_impl.get();
263 8576x if (impl)
264 {
265 8349x tl_cached_impl.set(nullptr);
266 8349x if (impl->svc_ == svc)
267 8349x return impl;
268 // Stale impl from a destroyed service
269 delete impl;
270 }
271 227x return nullptr;
272 }
273
274 inline bool
275 8574x try_push_tl_cache(timer_service::implementation* impl) noexcept
276 {
277 8574x if (!tl_cached_impl.get())
278 {
279 8500x tl_cached_impl.set(impl);
280 8500x return true;
281 }
282 74x return false;
283 }
284
285 inline waiter_node*
286 8300x try_pop_waiter_tl_cache() noexcept
287 {
288 8300x auto* w = tl_cached_waiter.get();
289 8300x if (w)
290 {
291 8104x tl_cached_waiter.set(nullptr);
292 8104x return w;
293 }
294 196x return nullptr;
295 }
296
297 inline bool
298 8290x try_push_waiter_tl_cache(waiter_node* w) noexcept
299 {
300 8290x if (!tl_cached_waiter.get())
301 {
302 8210x tl_cached_waiter.set(w);
303 8210x return true;
304 }
305 80x return false;
306 }
307
308 inline void
309 412x timer_service_invalidate_cache() noexcept
310 {
311 412x delete tl_cached_impl.get();
312 412x tl_cached_impl.set(nullptr);
313
314 412x delete tl_cached_waiter.get();
315 412x tl_cached_waiter.set(nullptr);
316 412x }
317
318 // timer_service out-of-class member function definitions
319
320 227x inline timer_service::implementation::implementation(
321 227x timer_service& svc) noexcept
322 227x : svc_(&svc)
323 {
324 227x }
325
326 inline void
327 412x timer_service::shutdown()
328 {
329 412x timer_service_invalidate_cache();
330
331 // Cancel waiting timers still in the heap.
332 // Each waiter called work_started() in implementation::wait().
333 // On IOCP the scheduler shutdown loop exits when outstanding_work_
334 // reaches zero, so we must call work_finished() here to balance it.
335 // On other backends this is harmless (their drain loops exit when
336 // the queue is empty, not based on outstanding_work_).
337 414x for (auto& entry : heap_)
338 {
339 2x auto* impl = entry.timer_;
340 4x while (auto* w = impl->waiters_.pop_front())
341 {
342 2x w->stop_cb_.reset();
343 2x auto h = std::exchange(w->h_, {});
344 2x sched_->work_finished();
345 2x if (h)
346 2x h.destroy();
347 2x delete w;
348 2x }
349 2x impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
350 2x delete impl;
351 }
352 412x heap_.clear();
353 412x cached_nearest_ns_.store(
354 (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
355
356 // Delete free-listed impls
357 486x while (free_list_)
358 {
359 74x auto* next = free_list_->next_free_;
360 74x delete free_list_;
361 74x free_list_ = next;
362 }
363
364 // Delete free-listed waiters
365 490x while (waiter_free_list_)
366 {
367 78x auto* next = waiter_free_list_->next_free_;
368 78x delete waiter_free_list_;
369 78x waiter_free_list_ = next;
370 }
371 412x }
372
373 inline io_object::implementation*
374 8576x timer_service::construct()
375 {
376 8576x implementation* impl = try_pop_tl_cache(this);
377 8576x if (impl)
378 {
379 8349x impl->svc_ = this;
380 8349x impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
381 8349x impl->might_have_pending_waits_ = false;
382 8349x return impl;
383 }
384
385 227x std::lock_guard lock(mutex_);
386 227x if (free_list_)
387 {
388 impl = free_list_;
389 free_list_ = impl->next_free_;
390 impl->next_free_ = nullptr;
391 impl->svc_ = this;
392 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
393 impl->might_have_pending_waits_ = false;
394 }
395 else
396 {
397 227x impl = new implementation(*this);
398 }
399 227x return impl;
400 227x }
401
402 inline void
403 8574x timer_service::destroy(io_object::implementation* p)
404 {
405 8574x destroy_impl(static_cast<implementation&>(*p));
406 8574x }
407
408 inline void
409 8574x timer_service::destroy_impl(implementation& impl)
410 {
411 8574x cancel_timer(impl);
412
413 8574x if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
414 {
415 std::lock_guard lock(mutex_);
416 remove_timer_impl(impl);
417 refresh_cached_nearest();
418 }
419
420 8574x if (try_push_tl_cache(&impl))
421 8500x return;
422
423 74x std::lock_guard lock(mutex_);
424 74x impl.next_free_ = free_list_;
425 74x free_list_ = &impl;
426 74x }
427
428 inline waiter_node*
429 8300x timer_service::create_waiter()
430 {
431 8300x if (auto* w = try_pop_waiter_tl_cache())
432 8104x return w;
433
434 196x std::lock_guard lock(mutex_);
435 196x if (waiter_free_list_)
436 {
437 2x auto* w = waiter_free_list_;
438 2x waiter_free_list_ = w->next_free_;
439 2x w->next_free_ = nullptr;
440 2x return w;
441 }
442
443 194x return new waiter_node();
444 196x }
445
446 inline void
447 8290x timer_service::destroy_waiter(waiter_node* w)
448 {
449 8290x if (try_push_waiter_tl_cache(w))
450 8210x return;
451
452 80x std::lock_guard lock(mutex_);
453 80x w->next_free_ = waiter_free_list_;
454 80x waiter_free_list_ = w;
455 80x }
456
457 inline std::size_t
458 6x timer_service::update_timer(implementation& impl, time_point new_time)
459 {
460 bool in_heap =
461 6x (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
462 6x if (!in_heap && impl.waiters_.empty())
463 return 0;
464
465 6x bool notify = false;
466 6x intrusive_list<waiter_node> canceled;
467
468 {
469 6x std::lock_guard lock(mutex_);
470
471 16x while (auto* w = impl.waiters_.pop_front())
472 {
473 10x w->impl_ = nullptr;
474 10x canceled.push_back(w);
475 10x }
476
477 6x if (impl.heap_index_ < heap_.size())
478 {
479 6x time_point old_time = heap_[impl.heap_index_].time_;
480 6x heap_[impl.heap_index_].time_ = new_time;
481
482 6x if (new_time < old_time)
483 6x up_heap(impl.heap_index_);
484 else
485 down_heap(impl.heap_index_);
486
487 6x notify = (impl.heap_index_ == 0);
488 }
489
490 6x refresh_cached_nearest();
491 6x }
492
493 6x std::size_t count = 0;
494 16x while (auto* w = canceled.pop_front())
495 {
496 10x w->ec_value_ = make_error_code(capy::error::canceled);
497 10x sched_->post(&w->op_);
498 10x ++count;
499 10x }
500
501 6x if (notify)
502 6x on_earliest_changed_();
503
504 6x return count;
505 }
506
507 inline void
508 8300x timer_service::insert_waiter(implementation& impl, waiter_node* w)
509 {
510 8300x bool notify = false;
511 {
512 8300x std::lock_guard lock(mutex_);
513 8300x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
514 {
515 8278x impl.heap_index_ = heap_.size();
516 8278x heap_.push_back({impl.expiry_, &impl});
517 8278x up_heap(heap_.size() - 1);
518 8278x notify = (impl.heap_index_ == 0);
519 8278x refresh_cached_nearest();
520 }
521 8300x impl.waiters_.push_back(w);
522 8300x }
523 8300x if (notify)
524 8251x on_earliest_changed_();
525 8300x }
526
527 inline std::size_t
528 8582x timer_service::cancel_timer(implementation& impl)
529 {
530 8582x if (!impl.might_have_pending_waits_)
531 8558x return 0;
532
533 // Not in heap and no waiters — just clear the flag
534 24x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
535 impl.waiters_.empty())
536 {
537 impl.might_have_pending_waits_ = false;
538 return 0;
539 }
540
541 24x intrusive_list<waiter_node> canceled;
542
543 {
544 24x std::lock_guard lock(mutex_);
545 24x remove_timer_impl(impl);
546 52x while (auto* w = impl.waiters_.pop_front())
547 {
548 28x w->impl_ = nullptr;
549 28x canceled.push_back(w);
550 28x }
551 24x refresh_cached_nearest();
552 24x }
553
554 24x impl.might_have_pending_waits_ = false;
555
556 24x std::size_t count = 0;
557 52x while (auto* w = canceled.pop_front())
558 {
559 28x w->ec_value_ = make_error_code(capy::error::canceled);
560 28x sched_->post(&w->op_);
561 28x ++count;
562 28x }
563
564 24x return count;
565 }
566
567 inline void
568 30x timer_service::cancel_waiter(waiter_node* w)
569 {
570 {
571 30x std::lock_guard lock(mutex_);
572 // Already removed by cancel_timer or process_expired
573 30x if (!w->impl_)
574 return;
575 30x auto* impl = w->impl_;
576 30x w->impl_ = nullptr;
577 30x impl->waiters_.remove(w);
578 30x if (impl->waiters_.empty())
579 {
580 28x remove_timer_impl(*impl);
581 28x impl->might_have_pending_waits_ = false;
582 }
583 30x refresh_cached_nearest();
584 30x }
585
586 30x w->ec_value_ = make_error_code(capy::error::canceled);
587 30x sched_->post(&w->op_);
588 }
589
590 inline std::size_t
591 2x timer_service::cancel_one_waiter(implementation& impl)
592 {
593 2x if (!impl.might_have_pending_waits_)
594 return 0;
595
596 2x waiter_node* w = nullptr;
597
598 {
599 2x std::lock_guard lock(mutex_);
600 2x w = impl.waiters_.pop_front();
601 2x if (!w)
602 return 0;
603 2x w->impl_ = nullptr;
604 2x if (impl.waiters_.empty())
605 {
606 remove_timer_impl(impl);
607 impl.might_have_pending_waits_ = false;
608 }
609 2x refresh_cached_nearest();
610 2x }
611
612 2x w->ec_value_ = make_error_code(capy::error::canceled);
613 2x sched_->post(&w->op_);
614 2x return 1;
615 }
616
617 inline std::size_t
618 91279x timer_service::process_expired()
619 {
620 91279x intrusive_list<waiter_node> expired;
621
622 {
623 91279x std::lock_guard lock(mutex_);
624 91279x auto now = clock_type::now();
625
626 99503x while (!heap_.empty() && heap_[0].time_ <= now)
627 {
628 8224x implementation* t = heap_[0].timer_;
629 8224x remove_timer_impl(*t);
630 16452x while (auto* w = t->waiters_.pop_front())
631 {
632 8228x w->impl_ = nullptr;
633 8228x w->ec_value_ = {};
634 8228x expired.push_back(w);
635 8228x }
636 8224x t->might_have_pending_waits_ = false;
637 }
638
639 91279x refresh_cached_nearest();
640 91279x }
641
642 91279x std::size_t count = 0;
643 99507x while (auto* w = expired.pop_front())
644 {
645 8228x sched_->post(&w->op_);
646 8228x ++count;
647 8228x }
648
649 91279x return count;
650 }
651
652 inline void
653 8276x timer_service::remove_timer_impl(implementation& impl)
654 {
655 8276x std::size_t index = impl.heap_index_;
656 8276x if (index >= heap_.size())
657 return; // Not in heap
658
659 8276x if (index == heap_.size() - 1)
660 {
661 // Last element, just pop
662 136x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
663 136x heap_.pop_back();
664 }
665 else
666 {
667 // Swap with last and reheapify
668 8140x swap_heap(index, heap_.size() - 1);
669 8140x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
670 8140x heap_.pop_back();
671
672 8140x if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
673 up_heap(index);
674 else
675 8140x down_heap(index);
676 }
677 }
678
679 inline void
680 8284x timer_service::up_heap(std::size_t index)
681 {
682 16403x while (index > 0)
683 {
684 8146x std::size_t parent = (index - 1) / 2;
685 8146x if (!(heap_[index].time_ < heap_[parent].time_))
686 27x break;
687 8119x swap_heap(index, parent);
688 8119x index = parent;
689 }
690 8284x }
691
692 inline void
693 8140x timer_service::down_heap(std::size_t index)
694 {
695 8140x std::size_t child = index * 2 + 1;
696 8140x while (child < heap_.size())
697 {
698 6x std::size_t min_child = (child + 1 == heap_.size() ||
699 heap_[child].time_ < heap_[child + 1].time_)
700 6x ? child
701 6x : child + 1;
702
703 6x if (heap_[index].time_ < heap_[min_child].time_)
704 6x break;
705
706 swap_heap(index, min_child);
707 index = min_child;
708 child = index * 2 + 1;
709 }
710 8140x }
711
712 inline void
713 16259x timer_service::swap_heap(std::size_t i1, std::size_t i2)
714 {
715 16259x heap_entry tmp = heap_[i1];
716 16259x heap_[i1] = heap_[i2];
717 16259x heap_[i2] = tmp;
718 16259x heap_[i1].timer_->heap_index_ = i1;
719 16259x heap_[i2].timer_->heap_index_ = i2;
720 16259x }
721
722 // waiter_node out-of-class member function definitions
723
724 inline void
725 30x waiter_node::canceller::operator()() const
726 {
727 30x waiter_->svc_->cancel_waiter(waiter_);
728 30x }
729
730 inline void
731 waiter_node::completion_op::do_complete(
732 [[maybe_unused]] void* owner,
733 scheduler_op* base,
734 std::uint32_t,
735 std::uint32_t)
736 {
737 // owner is always non-null here. The destroy path (owner == nullptr)
738 // is unreachable because completion_op overrides destroy() directly,
739 // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
740 BOOST_COROSIO_ASSERT(owner);
741 static_cast<completion_op*>(base)->operator()();
742 }
743
744 inline void
745 8290x waiter_node::completion_op::operator()()
746 {
747 8290x auto* w = waiter_;
748 8290x w->stop_cb_.reset();
749 8290x if (w->ec_out_)
750 8290x *w->ec_out_ = w->ec_value_;
751
752 8290x auto h = w->h_;
753 8290x auto d = w->d_;
754 8290x auto* svc = w->svc_;
755 8290x auto& sched = svc->get_scheduler();
756
757 8290x svc->destroy_waiter(w);
758
759 8290x d.post(h);
760 8290x sched.work_finished();
761 8290x }
762
763 // GCC 14 false-positive: inlining ~optional<stop_callback> through
764 // delete loses track that stop_cb_ was already .reset() above.
765 #if defined(__GNUC__) && !defined(__clang__)
766 #pragma GCC diagnostic push
767 #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
768 #endif
769 inline void
770 8x waiter_node::completion_op::destroy()
771 {
772 // Called during scheduler shutdown drain when this completion_op is
773 // in the scheduler's ready queue (posted by cancel_timer() or
774 // process_expired()). Balances the work_started() from
775 // implementation::wait(). The scheduler drain loop separately
776 // balances the work_started() from post(). On IOCP both decrements
777 // are required for outstanding_work_ to reach zero; on other
778 // backends this is harmless.
779 //
780 // This override also prevents scheduler_op::destroy() from calling
781 // do_complete(nullptr, ...). See also: timer_service::shutdown()
782 // which drains waiters still in the timer heap (the other path).
783 8x auto* w = waiter_;
784 8x w->stop_cb_.reset();
785 8x auto h = std::exchange(w->h_, {});
786 8x auto& sched = w->svc_->get_scheduler();
787 8x delete w;
788 8x sched.work_finished();
789 8x if (h)
790 8x h.destroy();
791 8x }
792 #if defined(__GNUC__) && !defined(__clang__)
793 #pragma GCC diagnostic pop
794 #endif
795
796 inline std::coroutine_handle<>
797 8300x timer_service::implementation::wait(
798 std::coroutine_handle<> h,
799 capy::executor_ref d,
800 std::stop_token token,
801 std::error_code* ec)
802 {
803 // Already-expired fast path — no waiter_node, no mutex.
804 // Post instead of dispatch so the coroutine yields to the
805 // scheduler, allowing other queued work to run.
806 8300x if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
807 {
808 8278x if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
809 {
810 if (ec)
811 *ec = {};
812 d.post(h);
813 return std::noop_coroutine();
814 }
815 }
816
817 8300x auto* w = svc_->create_waiter();
818 8300x w->impl_ = this;
819 8300x w->svc_ = svc_;
820 8300x w->h_ = h;
821 8300x w->d_ = d;
822 8300x w->token_ = std::move(token);
823 8300x w->ec_out_ = ec;
824
825 8300x svc_->insert_waiter(*this, w);
826 8300x might_have_pending_waits_ = true;
827 8300x svc_->get_scheduler().work_started();
828
829 8300x if (w->token_.stop_possible())
830 48x w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
831
832 8300x return std::noop_coroutine();
833 }
834
835 // Free functions
836
837 struct timer_service_access
838 {
839 8576x static native_scheduler& get_scheduler(io_context& ctx) noexcept
840 {
841 8576x return static_cast<native_scheduler&>(*ctx.sched_);
842 }
843 };
844
845 // Bypass find_service() mutex by reading the scheduler's cached pointer
846 inline io_object::io_service&
847 8576x timer_service_direct(capy::execution_context& ctx) noexcept
848 {
849 8576x return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
850 8576x .timer_svc_;
851 }
852
853 inline std::size_t
854 6x timer_service_update_expiry(timer::implementation& base)
855 {
856 6x auto& impl = static_cast<timer_service::implementation&>(base);
857 6x return impl.svc_->update_timer(impl, impl.expiry_);
858 }
859
860 inline std::size_t
861 8x timer_service_cancel(timer::implementation& base) noexcept
862 {
863 8x auto& impl = static_cast<timer_service::implementation&>(base);
864 8x return impl.svc_->cancel_timer(impl);
865 }
866
867 inline std::size_t
868 2x timer_service_cancel_one(timer::implementation& base) noexcept
869 {
870 2x auto& impl = static_cast<timer_service::implementation&>(base);
871 2x return impl.svc_->cancel_one_waiter(impl);
872 }
873
874 inline timer_service&
875 412x get_timer_service(capy::execution_context& ctx, scheduler& sched)
876 {
877 412x return ctx.make_service<timer_service>(sched);
878 }
879
880 } // namespace boost::corosio::detail
881
882 #endif
883