include/boost/corosio/tcp_server.hpp

78.7% Lines (107/136) 91.2% Functions (31/34)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_TCP_SERVER_HPP
11 #define BOOST_COROSIO_TCP_SERVER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/except.hpp>
15 #include <boost/corosio/tcp_acceptor.hpp>
16 #include <boost/corosio/tcp_socket.hpp>
17 #include <boost/corosio/io_context.hpp>
18 #include <boost/corosio/endpoint.hpp>
19 #include <boost/capy/task.hpp>
20 #include <boost/capy/concept/execution_context.hpp>
21 #include <boost/capy/concept/io_awaitable.hpp>
22 #include <boost/capy/concept/executor.hpp>
23 #include <boost/capy/ex/any_executor.hpp>
24 #include <boost/capy/ex/frame_allocator.hpp>
25 #include <boost/capy/ex/io_env.hpp>
26 #include <boost/capy/ex/run_async.hpp>
27
28 #include <coroutine>
29 #include <memory>
30 #include <ranges>
31 #include <vector>
32
33 namespace boost::corosio {
34
35 #ifdef _MSC_VER
36 #pragma warning(push)
37 #pragma warning(disable : 4251) // class needs to have dll-interface
38 #endif
39
40 /** TCP server with pooled workers.
41
42 This class manages a pool of reusable worker objects that handle
43 incoming connections. When a connection arrives, an idle worker
44 is dispatched to handle it. After the connection completes, the
45 worker returns to the pool for reuse, avoiding allocation overhead
46 per connection.
47
48 Workers are set via @ref set_workers as a forward range of
49 pointer-like objects (e.g., `unique_ptr<worker_base>`). The server
50 takes ownership of the container via type erasure.
51
52 @par Thread Safety
53 Distinct objects: Safe.
54 Shared objects: Unsafe.
55
56 @par Lifecycle
57 The server operates in three states:
58
59 - **Stopped**: Initial state, or after @ref join completes.
60 - **Running**: After @ref start, actively accepting connections.
61 - **Stopping**: After @ref stop, draining active work.
62
63 State transitions:
64 @code
65 [Stopped] --start()--> [Running] --stop()--> [Stopping] --join()--> [Stopped]
66 @endcode
67
68 @par Running the Server
69 @code
70 io_context ioc;
71 tcp_server srv(ioc, ioc.get_executor());
72 srv.set_workers(make_workers(ioc, 100));
73 srv.bind(endpoint{address_v4::any(), 8080});
74 srv.start();
75 ioc.run(); // Blocks until all work completes
76 @endcode
77
78 @par Graceful Shutdown
79 To shut down gracefully, call @ref stop then drain the io_context:
80 @code
81 // From a signal handler or timer callback:
82 srv.stop();
83
84 // ioc.run() returns after pending work drains.
85 // Then from the thread that called ioc.run():
86 srv.join(); // Wait for accept loops to finish
87 @endcode
88
89 @par Restart After Stop
90 The server can be restarted after a complete shutdown cycle.
91 You must drain the io_context and call @ref join before restarting:
92 @code
93 srv.start();
94 ioc.run_for( 10s ); // Run for a while
95 srv.stop(); // Signal shutdown
96 ioc.run(); // REQUIRED: drain pending completions
97 srv.join(); // REQUIRED: wait for accept loops
98
99 // Now safe to restart
100 srv.start();
101 ioc.run();
102 @endcode
103
104 @par WARNING: What NOT to Do
105 - Do NOT call @ref join from inside a worker coroutine (deadlock).
106 - Do NOT call @ref join from a thread running `ioc.run()` (deadlock).
107 - Do NOT call @ref start without completing @ref join after @ref stop.
108 - Do NOT call `ioc.stop()` for graceful shutdown; use @ref stop instead.
109
110 @par Example
111 @code
112 class my_worker : public tcp_server::worker_base
113 {
114 corosio::tcp_socket sock_;
115 capy::any_executor ex_;
116 public:
117 my_worker(io_context& ctx)
118 : sock_(ctx)
119 , ex_(ctx.get_executor())
120 {
121 }
122
123 corosio::tcp_socket& socket() override { return sock_; }
124
125 void run(launcher launch) override
126 {
127 launch(ex_, [](corosio::tcp_socket* sock) -> capy::task<>
128 {
129 // handle connection using sock
130 co_return;
131 }(&sock_));
132 }
133 };
134
135 auto make_workers(io_context& ctx, int n)
136 {
137 std::vector<std::unique_ptr<tcp_server::worker_base>> v;
138 v.reserve(n);
139 for(int i = 0; i < n; ++i)
140 v.push_back(std::make_unique<my_worker>(ctx));
141 return v;
142 }
143
144 io_context ioc;
145 tcp_server srv(ioc, ioc.get_executor());
146 srv.set_workers(make_workers(ioc, 100));
147 @endcode
148
149 @see worker_base, set_workers, launcher
150 */
151 class BOOST_COROSIO_DECL tcp_server
152 {
153 public:
154 class worker_base; ///< Abstract base for connection handlers.
155 class launcher; ///< Move-only handle to launch worker coroutines.
156
157 private:
158 struct waiter
159 {
160 waiter* next;
161 std::coroutine_handle<> h;
162 worker_base* w;
163 };
164
165 struct impl;
166
167 static impl* make_impl(capy::execution_context& ctx);
168
169 impl* impl_;
170 capy::any_executor ex_;
171 waiter* waiters_ = nullptr;
172 worker_base* idle_head_ = nullptr; // Forward list: available workers
173 worker_base* active_head_ =
174 nullptr; // Doubly linked: workers handling connections
175 worker_base* active_tail_ = nullptr; // Tail for O(1) push_back
176 std::size_t active_accepts_ = 0; // Number of active do_accept coroutines
177 std::shared_ptr<void> storage_; // Owns the worker container (type-erased)
178 bool running_ = false;
179
180 // Idle list (forward/singly linked) - push front, pop front
181 45x void idle_push(worker_base* w) noexcept
182 {
183 45x w->next_ = idle_head_;
184 45x idle_head_ = w;
185 45x }
186
187 9x worker_base* idle_pop() noexcept
188 {
189 9x auto* w = idle_head_;
190 9x if (w)
191 9x idle_head_ = w->next_;
192 9x return w;
193 }
194
195 9x bool idle_empty() const noexcept
196 {
197 9x return idle_head_ == nullptr;
198 }
199
200 // Active list (doubly linked) - push back, remove anywhere
201 3x void active_push(worker_base* w) noexcept
202 {
203 3x w->next_ = nullptr;
204 3x w->prev_ = active_tail_;
205 3x if (active_tail_)
206 active_tail_->next_ = w;
207 else
208 3x active_head_ = w;
209 3x active_tail_ = w;
210 3x }
211
212 9x void active_remove(worker_base* w) noexcept
213 {
214 // Skip if not in active list (e.g., after failed accept)
215 9x if (w != active_head_ && w->prev_ == nullptr)
216 6x return;
217 3x if (w->prev_)
218 w->prev_->next_ = w->next_;
219 else
220 3x active_head_ = w->next_;
221 3x if (w->next_)
222 w->next_->prev_ = w->prev_;
223 else
224 3x active_tail_ = w->prev_;
225 3x w->prev_ = nullptr; // Mark as not in active list
226 }
227
228 template<capy::Executor Ex>
229 struct launch_wrapper
230 {
231 struct promise_type
232 {
233 Ex ex; // Executor stored directly in frame (outlives child tasks)
234 capy::io_env env_;
235
236 // For regular coroutines: first arg is executor, second is stop token
237 template<class E, class S, class... Args>
238 requires capy::Executor<std::decay_t<E>>
239 promise_type(E e, S s, Args&&...)
240 : ex(std::move(e))
241 , env_{
242 capy::executor_ref(ex), std::move(s),
243 capy::get_current_frame_allocator()}
244 {
245 }
246
247 // For lambda coroutines: first arg is closure, second is executor, third is stop token
248 template<class Closure, class E, class S, class... Args>
249 requires(!capy::Executor<std::decay_t<Closure>> &&
250 capy::Executor<std::decay_t<E>>)
251 3x promise_type(Closure&&, E e, S s, Args&&...)
252 3x : ex(std::move(e))
253 3x , env_{
254 3x capy::executor_ref(ex), std::move(s),
255 3x capy::get_current_frame_allocator()}
256 {
257 3x }
258
259 3x launch_wrapper get_return_object() noexcept
260 {
261 return {
262 3x std::coroutine_handle<promise_type>::from_promise(*this)};
263 }
264 3x std::suspend_always initial_suspend() noexcept
265 {
266 3x return {};
267 }
268 3x std::suspend_never final_suspend() noexcept
269 {
270 3x return {};
271 }
272 3x void return_void() noexcept {}
273 void unhandled_exception()
274 {
275 std::terminate();
276 }
277
278 // Inject io_env for IoAwaitable
279 template<capy::IoAwaitable Awaitable>
280 6x auto await_transform(Awaitable&& a)
281 {
282 using AwaitableT = std::decay_t<Awaitable>;
283 struct adapter
284 {
285 AwaitableT aw;
286 capy::io_env const* env;
287
288 bool await_ready()
289 {
290 return aw.await_ready();
291 }
292 decltype(auto) await_resume()
293 {
294 return aw.await_resume();
295 }
296
297 auto await_suspend(std::coroutine_handle<promise_type> h)
298 {
299 return aw.await_suspend(h, env);
300 }
301 };
302 9x return adapter{std::forward<Awaitable>(a), &env_};
303 3x }
304 };
305
306 std::coroutine_handle<promise_type> h;
307
308 3x launch_wrapper(std::coroutine_handle<promise_type> handle) noexcept
309 3x : h(handle)
310 {
311 3x }
312
313 3x ~launch_wrapper()
314 {
315 3x if (h)
316 h.destroy();
317 3x }
318
319 launch_wrapper(launch_wrapper&& o) noexcept
320 : h(std::exchange(o.h, nullptr))
321 {
322 }
323
324 launch_wrapper(launch_wrapper const&) = delete;
325 launch_wrapper& operator=(launch_wrapper const&) = delete;
326 launch_wrapper& operator=(launch_wrapper&&) = delete;
327 };
328
329 // Named functor to avoid incomplete lambda type in coroutine promise
330 template<class Executor>
331 struct launch_coro
332 {
333 3x launch_wrapper<Executor> operator()(
334 Executor,
335 std::stop_token,
336 tcp_server* self,
337 capy::task<void> t,
338 worker_base* wp)
339 {
340 // Executor and stop token stored in promise via constructor
341 co_await std::move(t);
342 co_await self->push(*wp); // worker goes back to idle list
343 6x }
344 };
345
346 class push_awaitable
347 {
348 tcp_server& self_;
349 worker_base& w_;
350
351 public:
352 9x push_awaitable(tcp_server& self, worker_base& w) noexcept
353 9x : self_(self)
354 9x , w_(w)
355 {
356 9x }
357
358 9x bool await_ready() const noexcept
359 {
360 9x return false;
361 }
362
363 std::coroutine_handle<>
364 9x await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
365 {
366 // Symmetric transfer to server's executor
367 9x return self_.ex_.dispatch(h);
368 }
369
370 9x void await_resume() noexcept
371 {
372 // Running on server executor - safe to modify lists
373 // Remove from active (if present), then wake waiter or add to idle
374 9x self_.active_remove(&w_);
375 9x if (self_.waiters_)
376 {
377 auto* wait = self_.waiters_;
378 self_.waiters_ = wait->next;
379 wait->w = &w_;
380 self_.ex_.post(wait->h);
381 }
382 else
383 {
384 9x self_.idle_push(&w_);
385 }
386 9x }
387 };
388
389 class pop_awaitable
390 {
391 tcp_server& self_;
392 waiter wait_;
393
394 public:
395 9x pop_awaitable(tcp_server& self) noexcept : self_(self), wait_{} {}
396
397 9x bool await_ready() const noexcept
398 {
399 9x return !self_.idle_empty();
400 }
401
402 bool
403 await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
404 {
405 // Running on server executor (do_accept runs there)
406 wait_.h = h;
407 wait_.w = nullptr;
408 wait_.next = self_.waiters_;
409 self_.waiters_ = &wait_;
410 return true;
411 }
412
413 9x worker_base& await_resume() noexcept
414 {
415 // Running on server executor
416 9x if (wait_.w)
417 return *wait_.w; // Woken by push_awaitable
418 9x return *self_.idle_pop();
419 }
420 };
421
422 9x push_awaitable push(worker_base& w)
423 {
424 9x return push_awaitable{*this, w};
425 }
426
427 // Synchronous version for destructor/guard paths
428 // Must be called from server executor context
429 void push_sync(worker_base& w) noexcept
430 {
431 active_remove(&w);
432 if (waiters_)
433 {
434 auto* wait = waiters_;
435 waiters_ = wait->next;
436 wait->w = &w;
437 ex_.post(wait->h);
438 }
439 else
440 {
441 idle_push(&w);
442 }
443 }
444
445 9x pop_awaitable pop()
446 {
447 9x return pop_awaitable{*this};
448 }
449
450 capy::task<void> do_accept(tcp_acceptor& acc);
451
452 public:
453 /** Abstract base class for connection handlers.
454
455 Derive from this class to implement custom connection handling.
456 Each worker owns a socket and is reused across multiple
457 connections to avoid per-connection allocation.
458
459 @see tcp_server, launcher
460 */
461 class BOOST_COROSIO_DECL worker_base
462 {
463 // Ordered largest to smallest for optimal packing
464 std::stop_source stop_; // ~16 bytes
465 worker_base* next_ = nullptr; // 8 bytes - used by idle and active lists
466 worker_base* prev_ = nullptr; // 8 bytes - used only by active list
467
468 friend class tcp_server;
469
470 public:
471 /// Construct a worker.
472 worker_base();
473
474 /// Destroy the worker.
475 virtual ~worker_base();
476
477 /** Handle an accepted connection.
478
479 Called when this worker is dispatched to handle a new
480 connection. The implementation must invoke the launcher
481 exactly once to start the handling coroutine.
482
483 @param launch Handle to launch the connection coroutine.
484 */
485 virtual void run(launcher launch) = 0;
486
487 /// Return the socket used for connections.
488 virtual corosio::tcp_socket& socket() = 0;
489 };
490
491 /** Move-only handle to launch a worker coroutine.
492
493 Passed to @ref worker_base::run to start the connection-handling
494 coroutine. The launcher ensures the worker returns to the idle
495 pool when the coroutine completes or if launching fails.
496
497 The launcher must be invoked exactly once via `operator()`.
498 If destroyed without invoking, the worker is returned to the
499 idle pool automatically.
500
501 @see worker_base::run
502 */
503 class BOOST_COROSIO_DECL launcher
504 {
505 tcp_server* srv_;
506 worker_base* w_;
507
508 friend class tcp_server;
509
510 3x launcher(tcp_server& srv, worker_base& w) noexcept : srv_(&srv), w_(&w)
511 {
512 3x }
513
514 public:
515 /// Return the worker to the pool if not launched.
516 3x ~launcher()
517 {
518 3x if (w_)
519 srv_->push_sync(*w_);
520 3x }
521
522 launcher(launcher&& o) noexcept
523 : srv_(o.srv_)
524 , w_(std::exchange(o.w_, nullptr))
525 {
526 }
527 launcher(launcher const&) = delete;
528 launcher& operator=(launcher const&) = delete;
529 launcher& operator=(launcher&&) = delete;
530
531 /** Launch the connection-handling coroutine.
532
533 Starts the given coroutine on the specified executor. When
534 the coroutine completes, the worker is automatically returned
535 to the idle pool.
536
537 @param ex The executor to run the coroutine on.
538 @param task The coroutine to execute.
539
540 @throws std::logic_error If this launcher was already invoked.
541 */
542 template<class Executor>
543 3x void operator()(Executor const& ex, capy::task<void> task)
544 {
545 3x if (!w_)
546 detail::throw_logic_error(); // launcher already invoked
547
548 3x auto* w = std::exchange(w_, nullptr);
549
550 // Worker is being dispatched - add to active list
551 3x srv_->active_push(w);
552
553 // Return worker to pool if coroutine setup throws
554 struct guard_t
555 {
556 tcp_server* srv;
557 worker_base* w;
558 3x ~guard_t()
559 {
560 3x if (w)
561 srv->push_sync(*w);
562 3x }
563 3x } guard{srv_, w};
564
565 // Reset worker's stop source for this connection
566 3x w->stop_ = {};
567 3x auto st = w->stop_.get_token();
568
569 3x auto wrapper =
570 3x launch_coro<Executor>{}(ex, st, srv_, std::move(task), w);
571
572 // Executor and stop token stored in promise via constructor
573 3x ex.post(std::exchange(wrapper.h, nullptr)); // Release before post
574 3x guard.w = nullptr; // Success - dismiss guard
575 3x }
576 };
577
578 /** Construct a TCP server.
579
580 @tparam Ctx Execution context type satisfying ExecutionContext.
581 @tparam Ex Executor type satisfying Executor.
582
583 @param ctx The execution context for socket operations.
584 @param ex The executor for dispatching coroutines.
585
586 @par Example
587 @code
588 tcp_server srv(ctx, ctx.get_executor());
589 srv.set_workers(make_workers(ctx, 100));
590 srv.bind(endpoint{...});
591 srv.start();
592 @endcode
593 */
594 template<capy::ExecutionContext Ctx, capy::Executor Ex>
595 9x tcp_server(Ctx& ctx, Ex ex) : impl_(make_impl(ctx))
596 9x , ex_(std::move(ex))
597 {
598 9x }
599
600 public:
601 ~tcp_server();
602 tcp_server(tcp_server const&) = delete;
603 tcp_server& operator=(tcp_server const&) = delete;
604 tcp_server(tcp_server&& o) noexcept;
605 tcp_server& operator=(tcp_server&& o) noexcept;
606
607 /** Bind to a local endpoint.
608
609 Creates an acceptor listening on the specified endpoint.
610 Multiple endpoints can be bound by calling this method
611 multiple times before @ref start.
612
613 @param ep The local endpoint to bind to.
614
615 @return The error code if binding fails.
616 */
617 std::error_code bind(endpoint ep);
618
619 /** Set the worker pool.
620
621 Replaces any existing workers with the given range. Any
622 previous workers are released and the idle/active lists
623 are cleared before populating with new workers.
624
625 @tparam Range Forward range of pointer-like objects to worker_base.
626
627 @param workers Range of workers to manage. Each element must
628 support `std::to_address()` yielding `worker_base*`.
629
630 @par Example
631 @code
632 std::vector<std::unique_ptr<my_worker>> workers;
633 for(int i = 0; i < 100; ++i)
634 workers.push_back(std::make_unique<my_worker>(ctx));
635 srv.set_workers(std::move(workers));
636 @endcode
637 */
638 template<std::ranges::forward_range Range>
639 requires std::convertible_to<
640 decltype(std::to_address(
641 std::declval<std::ranges::range_value_t<Range>&>())),
642 worker_base*>
643 9x void set_workers(Range&& workers)
644 {
645 // Clear existing state
646 9x storage_.reset();
647 9x idle_head_ = nullptr;
648 9x active_head_ = nullptr;
649 9x active_tail_ = nullptr;
650
651 // Take ownership and populate idle list
652 using StorageType = std::decay_t<Range>;
653 9x auto* p = new StorageType(std::forward<Range>(workers));
654 9x storage_ = std::shared_ptr<void>(
655 9x p, [](void* ptr) { delete static_cast<StorageType*>(ptr); });
656 45x for (auto&& elem : *static_cast<StorageType*>(p))
657 36x idle_push(std::to_address(elem));
658 9x }
659
660 /** Start accepting connections.
661
662 Launches accept loops for all bound endpoints. Incoming
663 connections are dispatched to idle workers from the pool.
664
665 Calling `start()` on an already-running server has no effect.
666
667 @par Preconditions
668 - At least one endpoint bound via @ref bind.
669 - Workers provided to the constructor.
670 - If restarting, @ref join must have completed first.
671
672 @par Effects
673 Creates one accept coroutine per bound endpoint. Each coroutine
674 runs on the server's executor, waiting for connections and
675 dispatching them to idle workers.
676
677 @par Restart Sequence
678 To restart after stopping, complete the full shutdown cycle:
679 @code
680 srv.start();
681 ioc.run_for( 1s );
682 srv.stop(); // 1. Signal shutdown
683 ioc.run(); // 2. Drain remaining completions
684 srv.join(); // 3. Wait for accept loops
685
686 // Now safe to restart
687 srv.start();
688 ioc.run();
689 @endcode
690
691 @par Thread Safety
692 Not thread safe.
693
694 @throws std::logic_error If a previous session has not been
695 joined (accept loops still active).
696 */
697 void start();
698
699 /** Stop accepting connections.
700
701 Signals all listening ports to stop accepting new connections
702 and requests cancellation of active workers via their stop tokens.
703
704 This function returns immediately; it does not wait for workers
705 to finish. Pending I/O operations complete asynchronously.
706
707 Calling `stop()` on a non-running server has no effect.
708
709 @par Effects
710 - Closes all acceptors (pending accepts complete with error).
711 - Requests stop on each active worker's stop token.
712 - Workers observing their stop token should exit promptly.
713
714 @par Postconditions
715 No new connections will be accepted. Active workers continue
716 until they observe their stop token or complete naturally.
717
718 @par What Happens Next
719 After calling `stop()`:
720 1. Let `ioc.run()` return (drains pending completions).
721 2. Call @ref join to wait for accept loops to finish.
722 3. Only then is it safe to restart or destroy the server.
723
724 @par Thread Safety
725 Not thread safe.
726
727 @see join, start
728 */
729 void stop();
730
731 /** Block until all accept loops complete.
732
733 Blocks the calling thread until all accept coroutines launched
734 by @ref start have finished executing. This synchronizes the
735 shutdown sequence, ensuring the server is fully stopped before
736 restarting or destroying it.
737
738 @par Preconditions
739 @ref stop has been called and `ioc.run()` has returned.
740
741 @par Postconditions
742 All accept loops have completed. The server is in the stopped
743 state and may be restarted via @ref start.
744
745 @par Example (Correct Usage)
746 @code
747 // main thread
748 srv.start();
749 ioc.run(); // Blocks until work completes
750 srv.join(); // Safe: called after ioc.run() returns
751 @endcode
752
753 @par WARNING: Deadlock Scenarios
754 Calling `join()` from the wrong context causes deadlock:
755
756 @code
757 // WRONG: calling join() from inside a worker coroutine
758 void run( launcher launch ) override
759 {
760 launch( ex, [this]() -> capy::task<>
761 {
762 srv_.join(); // DEADLOCK: blocks the executor
763 co_return;
764 }());
765 }
766
767 // WRONG: calling join() while ioc.run() is still active
768 std::thread t( [&]{ ioc.run(); } );
769 srv.stop();
770 srv.join(); // DEADLOCK: ioc.run() still running in thread t
771 @endcode
772
773 @par Thread Safety
774 May be called from any thread, but will deadlock if called
775 from within the io_context event loop or from a worker coroutine.
776
777 @see stop, start
778 */
779 void join();
780
781 private:
782 capy::task<> do_stop();
783 };
784
785 #ifdef _MSC_VER
786 #pragma warning(pop)
787 #endif
788
789 } // namespace boost::corosio
790
791 #endif
792