include/boost/corosio/detail/thread_pool.hpp

89.1% Lines (41/46) 100.0% Functions (7/7)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11 #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/intrusive.hpp>
15 #include <boost/capy/ex/execution_context.hpp>
16
17 #include <condition_variable>
18 #include <mutex>
19 #include <stdexcept>
20 #include <thread>
21 #include <vector>
22
23 namespace boost::corosio::detail {
24
25 /** Base class for thread pool work items.
26
27 Derive from this to create work that can be posted to a
28 @ref thread_pool. Uses static function pointer dispatch,
29 consistent with the IOCP `op` pattern.
30
31 @par Example
32 @code
33 struct my_work : pool_work_item
34 {
35 int* result;
36 static void execute( pool_work_item* w ) noexcept
37 {
38 auto* self = static_cast<my_work*>( w );
39 *self->result = 42;
40 }
41 };
42
43 my_work w;
44 w.func_ = &my_work::execute;
45 w.result = &r;
46 pool.post( &w );
47 @endcode
48 */
49 struct pool_work_item : intrusive_queue<pool_work_item>::node
50 {
51 /// Static dispatch function signature.
52 using func_type = void (*)(pool_work_item*) noexcept;
53
54 /// Completion handler invoked by the worker thread.
55 func_type func_ = nullptr;
56 };
57
58 /** Shared thread pool for dispatching blocking operations.
59
60 Provides a fixed pool of reusable worker threads for operations
61 that cannot be integrated with async I/O (e.g. blocking DNS
62 calls). Registered as an `execution_context::service` so it
63 is a singleton per io_context.
64
65 Threads are created eagerly in the constructor. The default
66 thread count is 1.
67
68 @par Thread Safety
69 All public member functions are thread-safe.
70
71 @par Shutdown
72 Sets a shutdown flag, notifies all threads, and joins them.
73 In-flight blocking calls complete naturally before the thread
74 exits.
75 */
76 class thread_pool final
77 : public capy::execution_context::service
78 {
79 std::mutex mutex_;
80 std::condition_variable cv_;
81 intrusive_queue<pool_work_item> work_queue_;
82 std::vector<std::thread> threads_;
83 bool shutdown_ = false;
84
85 void worker_loop();
86
87 public:
88 using key_type = thread_pool;
89
90 /** Construct the thread pool service.
91
92 Eagerly creates all worker threads.
93
94 @par Exception Safety
95 Strong guarantee. If thread creation fails, all
96 already-created threads are shut down and joined
97 before the exception propagates.
98
99 @param ctx Reference to the owning execution_context.
100 @param num_threads Number of worker threads. Must be
101 at least 1.
102
103 @throws std::logic_error If `num_threads` is 0.
104 */
105 414x explicit thread_pool(
106 capy::execution_context& ctx,
107 unsigned num_threads = 1)
108 414x {
109 (void)ctx;
110 414x if (!num_threads)
111 throw std::logic_error(
112 1x "thread_pool requires at least 1 thread");
113 413x threads_.reserve(num_threads);
114 try
115 {
116 829x for (unsigned i = 0; i < num_threads; ++i)
117 832x threads_.emplace_back([this] { worker_loop(); });
118 }
119 catch (...)
120 {
121 shutdown();
122 throw;
123 }
124 416x }
125
126 825x ~thread_pool() override = default;
127
128 thread_pool(thread_pool const&) = delete;
129 thread_pool& operator=(thread_pool const&) = delete;
130
131 /** Enqueue a work item for execution on the thread pool.
132
133 Zero-allocation: the caller owns the work item's storage.
134
135 @param w The work item to execute. Must remain valid until
136 its `func_` has been called.
137
138 @return `true` if the item was enqueued, `false` if the
139 pool has already shut down.
140 */
141 bool post(pool_work_item* w) noexcept;
142
143 /** Shut down the thread pool.
144
145 Signals all threads to exit after draining any
146 remaining queued work, then joins them.
147 */
148 void shutdown() override;
149 };
150
151 inline void
152 416x thread_pool::worker_loop()
153 {
154 for (;;)
155 {
156 pool_work_item* w;
157 {
158 457x std::unique_lock<std::mutex> lock(mutex_);
159 457x cv_.wait(lock, [this] {
160 633x return shutdown_ || !work_queue_.empty();
161 });
162
163 457x w = work_queue_.pop();
164 457x if (!w)
165 {
166 416x if (shutdown_)
167 832x return;
168 continue;
169 }
170 457x }
171 41x w->func_(w);
172 41x }
173 }
174
175 inline bool
176 42x thread_pool::post(pool_work_item* w) noexcept
177 {
178 {
179 42x std::lock_guard<std::mutex> lock(mutex_);
180 42x if (shutdown_)
181 1x return false;
182 41x work_queue_.push(w);
183 42x }
184 41x cv_.notify_one();
185 41x return true;
186 }
187
188 inline void
189 417x thread_pool::shutdown()
190 {
191 {
192 417x std::lock_guard<std::mutex> lock(mutex_);
193 417x shutdown_ = true;
194 417x }
195 417x cv_.notify_all();
196
197 833x for (auto& t : threads_)
198 {
199 416x if (t.joinable())
200 416x t.join();
201 }
202 417x threads_.clear();
203
204 {
205 417x std::lock_guard<std::mutex> lock(mutex_);
206 417x while (work_queue_.pop())
207 ;
208 417x }
209 417x }
210
211 } // namespace boost::corosio::detail
212
213 #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
214