1 +
//
 
2 +
// Copyright (c) 2026 Steve Gerbino
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
 
11 +
#define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/config.hpp>
 
14 +
#include <boost/corosio/detail/intrusive.hpp>
 
15 +
#include <boost/capy/ex/execution_context.hpp>
 
16 +

 
17 +
#include <condition_variable>
 
18 +
#include <mutex>
 
19 +
#include <stdexcept>
 
20 +
#include <thread>
 
21 +
#include <vector>
 
22 +

 
23 +
namespace boost::corosio::detail {
 
24 +

 
25 +
/** Base class for thread pool work items.
 
26 +

 
27 +
    Derive from this to create work that can be posted to a
 
28 +
    @ref thread_pool. Uses static function pointer dispatch,
 
29 +
    consistent with the IOCP `op` pattern.
 
30 +

 
31 +
    @par Example
 
32 +
    @code
 
33 +
    struct my_work : pool_work_item
 
34 +
    {
 
35 +
        int* result;
 
36 +
        static void execute( pool_work_item* w ) noexcept
 
37 +
        {
 
38 +
            auto* self = static_cast<my_work*>( w );
 
39 +
            *self->result = 42;
 
40 +
        }
 
41 +
    };
 
42 +

 
43 +
    my_work w;
 
44 +
    w.func_ = &my_work::execute;
 
45 +
    w.result = &r;
 
46 +
    pool.post( &w );
 
47 +
    @endcode
 
48 +
*/
 
49 +
struct pool_work_item : intrusive_queue<pool_work_item>::node
 
50 +
{
 
51 +
    /// Static dispatch function signature.
 
52 +
    using func_type = void (*)(pool_work_item*) noexcept;
 
53 +

 
54 +
    /// Completion handler invoked by the worker thread.
 
55 +
    func_type func_ = nullptr;
 
56 +
};
 
57 +

 
58 +
/** Shared thread pool for dispatching blocking operations.
 
59 +

 
60 +
    Provides a fixed pool of reusable worker threads for operations
 
61 +
    that cannot be integrated with async I/O (e.g. blocking DNS
 
62 +
    calls). Registered as an `execution_context::service` so it
 
63 +
    is a singleton per io_context.
 
64 +

 
65 +
    Threads are created eagerly in the constructor. The default
 
66 +
    thread count is 1.
 
67 +

 
68 +
    @par Thread Safety
 
69 +
    All public member functions are thread-safe.
 
70 +

 
71 +
    @par Shutdown
 
72 +
    Sets a shutdown flag, notifies all threads, and joins them.
 
73 +
    In-flight blocking calls complete naturally before the thread
 
74 +
    exits.
 
75 +
*/
 
76 +
class thread_pool final
 
77 +
    : public capy::execution_context::service
 
78 +
{
 
79 +
    std::mutex mutex_;
 
80 +
    std::condition_variable cv_;
 
81 +
    intrusive_queue<pool_work_item> work_queue_;
 
82 +
    std::vector<std::thread> threads_;
 
83 +
    bool shutdown_ = false;
 
84 +

 
85 +
    void worker_loop();
 
86 +

 
87 +
public:
 
88 +
    using key_type = thread_pool;
 
89 +

 
90 +
    /** Construct the thread pool service.
 
91 +

 
92 +
        Eagerly creates all worker threads.
 
93 +

 
94 +
        @par Exception Safety
 
95 +
        Strong guarantee. If thread creation fails, all
 
96 +
        already-created threads are shut down and joined
 
97 +
        before the exception propagates.
 
98 +

 
99 +
        @param ctx Reference to the owning execution_context.
 
100 +
        @param num_threads Number of worker threads. Must be
 
101 +
               at least 1.
 
102 +

 
103 +
        @throws std::logic_error If `num_threads` is 0.
 
104 +
    */
 
105 +
    explicit thread_pool(
 
106 +
        capy::execution_context& ctx,
 
107 +
        unsigned num_threads = 1)
 
108 +
    {
 
109 +
        (void)ctx;
 
110 +
        if (!num_threads)
 
111 +
            throw std::logic_error(
 
112 +
                "thread_pool requires at least 1 thread");
 
113 +
        threads_.reserve(num_threads);
 
114 +
        try
 
115 +
        {
 
116 +
            for (unsigned i = 0; i < num_threads; ++i)
 
117 +
                threads_.emplace_back([this] { worker_loop(); });
 
118 +
        }
 
119 +
        catch (...)
 
120 +
        {
 
121 +
            shutdown();
 
122 +
            throw;
 
123 +
        }
 
124 +
    }
 
125 +

 
126 +
    ~thread_pool() override = default;
 
127 +

 
128 +
    thread_pool(thread_pool const&) = delete;
 
129 +
    thread_pool& operator=(thread_pool const&) = delete;
 
130 +

 
131 +
    /** Enqueue a work item for execution on the thread pool.
 
132 +

 
133 +
        Zero-allocation: the caller owns the work item's storage.
 
134 +

 
135 +
        @param w The work item to execute. Must remain valid until
 
136 +
                 its `func_` has been called.
 
137 +

 
138 +
        @return `true` if the item was enqueued, `false` if the
 
139 +
                pool has already shut down.
 
140 +
    */
 
141 +
    bool post(pool_work_item* w) noexcept;
 
142 +

 
143 +
    /** Shut down the thread pool.
 
144 +

 
145 +
        Signals all threads to exit after draining any
 
146 +
        remaining queued work, then joins them.
 
147 +
    */
 
148 +
    void shutdown() override;
 
149 +
};
 
150 +

 
151 +
inline void
 
152 +
thread_pool::worker_loop()
 
153 +
{
 
154 +
    for (;;)
 
155 +
    {
 
156 +
        pool_work_item* w;
 
157 +
        {
 
158 +
            std::unique_lock<std::mutex> lock(mutex_);
 
159 +
            cv_.wait(lock, [this] {
 
160 +
                return shutdown_ || !work_queue_.empty();
 
161 +
            });
 
162 +

 
163 +
            w = work_queue_.pop();
 
164 +
            if (!w)
 
165 +
            {
 
166 +
                if (shutdown_)
 
167 +
                    return;
 
168 +
                continue;
 
169 +
            }
 
170 +
        }
 
171 +
        w->func_(w);
 
172 +
    }
 
173 +
}
 
174 +

 
175 +
inline bool
 
176 +
thread_pool::post(pool_work_item* w) noexcept
 
177 +
{
 
178 +
    {
 
179 +
        std::lock_guard<std::mutex> lock(mutex_);
 
180 +
        if (shutdown_)
 
181 +
            return false;
 
182 +
        work_queue_.push(w);
 
183 +
    }
 
184 +
    cv_.notify_one();
 
185 +
    return true;
 
186 +
}
 
187 +

 
188 +
inline void
 
189 +
thread_pool::shutdown()
 
190 +
{
 
191 +
    {
 
192 +
        std::lock_guard<std::mutex> lock(mutex_);
 
193 +
        shutdown_ = true;
 
194 +
    }
 
195 +
    cv_.notify_all();
 
196 +

 
197 +
    for (auto& t : threads_)
 
198 +
    {
 
199 +
        if (t.joinable())
 
200 +
            t.join();
 
201 +
    }
 
202 +
    threads_.clear();
 
203 +

 
204 +
    {
 
205 +
        std::lock_guard<std::mutex> lock(mutex_);
 
206 +
        while (work_queue_.pop())
 
207 +
            ;
 
208 +
    }
 
209 +
}
 
210 +

 
211 +
} // namespace boost::corosio::detail
 
212 +

 
213 +
#endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP