TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11 : #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/intrusive.hpp>
15 : #include <boost/capy/ex/execution_context.hpp>
16 :
17 : #include <condition_variable>
18 : #include <mutex>
19 : #include <stdexcept>
20 : #include <thread>
21 : #include <vector>
22 :
23 : namespace boost::corosio::detail {
24 :
25 : /** Base class for thread pool work items.
26 :
27 : Derive from this to create work that can be posted to a
28 : @ref thread_pool. Uses static function pointer dispatch,
29 : consistent with the IOCP `op` pattern.
30 :
31 : @par Example
32 : @code
33 : struct my_work : pool_work_item
34 : {
35 : int* result;
36 : static void execute( pool_work_item* w ) noexcept
37 : {
38 : auto* self = static_cast<my_work*>( w );
39 : *self->result = 42;
40 : }
41 : };
42 :
43 : my_work w;
44 : w.func_ = &my_work::execute;
45 : w.result = &r;
46 : pool.post( &w );
47 : @endcode
48 : */
49 : struct pool_work_item : intrusive_queue<pool_work_item>::node
50 : {
51 : /// Static dispatch function signature.
52 : using func_type = void (*)(pool_work_item*) noexcept;
53 :
54 : /// Completion handler invoked by the worker thread.
55 : func_type func_ = nullptr;
56 : };
57 :
58 : /** Shared thread pool for dispatching blocking operations.
59 :
60 : Provides a fixed pool of reusable worker threads for operations
61 : that cannot be integrated with async I/O (e.g. blocking DNS
62 : calls). Registered as an `execution_context::service` so it
63 : is a singleton per io_context.
64 :
65 : Threads are created eagerly in the constructor. The default
66 : thread count is 1.
67 :
68 : @par Thread Safety
69 : All public member functions are thread-safe.
70 :
71 : @par Shutdown
72 : Sets a shutdown flag, notifies all threads, and joins them.
73 : In-flight blocking calls complete naturally before the thread
74 : exits.
75 : */
76 : class thread_pool final
77 : : public capy::execution_context::service
78 : {
79 : std::mutex mutex_;
80 : std::condition_variable cv_;
81 : intrusive_queue<pool_work_item> work_queue_;
82 : std::vector<std::thread> threads_;
83 : bool shutdown_ = false;
84 :
85 : void worker_loop();
86 :
87 : public:
88 : using key_type = thread_pool;
89 :
90 : /** Construct the thread pool service.
91 :
92 : Eagerly creates all worker threads.
93 :
94 : @par Exception Safety
95 : Strong guarantee. If thread creation fails, all
96 : already-created threads are shut down and joined
97 : before the exception propagates.
98 :
99 : @param ctx Reference to the owning execution_context.
100 : @param num_threads Number of worker threads. Must be
101 : at least 1.
102 :
103 : @throws std::logic_error If `num_threads` is 0.
104 : */
105 HIT 414 : explicit thread_pool(
106 : capy::execution_context& ctx,
107 : unsigned num_threads = 1)
108 414 : {
109 : (void)ctx;
110 414 : if (!num_threads)
111 : throw std::logic_error(
112 1 : "thread_pool requires at least 1 thread");
113 413 : threads_.reserve(num_threads);
114 : try
115 : {
116 829 : for (unsigned i = 0; i < num_threads; ++i)
117 832 : threads_.emplace_back([this] { worker_loop(); });
118 : }
119 MIS 0 : catch (...)
120 : {
121 0 : shutdown();
122 0 : throw;
123 0 : }
124 HIT 416 : }
125 :
126 825 : ~thread_pool() override = default;
127 :
128 : thread_pool(thread_pool const&) = delete;
129 : thread_pool& operator=(thread_pool const&) = delete;
130 :
131 : /** Enqueue a work item for execution on the thread pool.
132 :
133 : Zero-allocation: the caller owns the work item's storage.
134 :
135 : @param w The work item to execute. Must remain valid until
136 : its `func_` has been called.
137 :
138 : @return `true` if the item was enqueued, `false` if the
139 : pool has already shut down.
140 : */
141 : bool post(pool_work_item* w) noexcept;
142 :
143 : /** Shut down the thread pool.
144 :
145 : Signals all threads to exit after draining any
146 : remaining queued work, then joins them.
147 : */
148 : void shutdown() override;
149 : };
150 :
151 : inline void
152 416 : thread_pool::worker_loop()
153 : {
154 : for (;;)
155 : {
156 : pool_work_item* w;
157 : {
158 457 : std::unique_lock<std::mutex> lock(mutex_);
159 457 : cv_.wait(lock, [this] {
160 633 : return shutdown_ || !work_queue_.empty();
161 : });
162 :
163 457 : w = work_queue_.pop();
164 457 : if (!w)
165 : {
166 416 : if (shutdown_)
167 832 : return;
168 MIS 0 : continue;
169 : }
170 HIT 457 : }
171 41 : w->func_(w);
172 41 : }
173 : }
174 :
175 : inline bool
176 42 : thread_pool::post(pool_work_item* w) noexcept
177 : {
178 : {
179 42 : std::lock_guard<std::mutex> lock(mutex_);
180 42 : if (shutdown_)
181 1 : return false;
182 41 : work_queue_.push(w);
183 42 : }
184 41 : cv_.notify_one();
185 41 : return true;
186 : }
187 :
188 : inline void
189 417 : thread_pool::shutdown()
190 : {
191 : {
192 417 : std::lock_guard<std::mutex> lock(mutex_);
193 417 : shutdown_ = true;
194 417 : }
195 417 : cv_.notify_all();
196 :
197 833 : for (auto& t : threads_)
198 : {
199 416 : if (t.joinable())
200 416 : t.join();
201 : }
202 417 : threads_.clear();
203 :
204 : {
205 417 : std::lock_guard<std::mutex> lock(mutex_);
206 417 : while (work_queue_.pop())
207 : ;
208 417 : }
209 417 : }
210 :
211 : } // namespace boost::corosio::detail
212 :
213 : #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
|