TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/io/io_object.hpp>
19 : #include <boost/corosio/endpoint.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <coroutine>
22 : #include <boost/capy/error.hpp>
23 : #include <system_error>
24 :
25 : #include <boost/corosio/native/detail/make_err.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/scheduler_op.hpp>
28 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
29 :
30 : #include <unistd.h>
31 : #include <errno.h>
32 : #include <fcntl.h>
33 :
34 : #include <atomic>
35 : #include <cstddef>
36 : #include <memory>
37 : #include <optional>
38 : #include <stop_token>
39 :
40 : #include <netinet/in.h>
41 : #include <sys/select.h>
42 : #include <sys/socket.h>
43 : #include <sys/uio.h>
44 :
45 : /*
46 : select Operation State
47 : ======================
48 :
49 : Each async I/O operation has a corresponding select_op-derived struct that
50 : holds the operation's state while it's in flight. The socket impl owns
51 : fixed slots for each operation type (conn_, rd_, wr_), so only one
52 : operation of each type can be pending per socket at a time.
53 :
54 : This mirrors the epoll_op design for consistency across backends.
55 :
56 : Completion vs Cancellation Race
57 : -------------------------------
58 : The `registered` atomic uses a tri-state (unregistered, registering,
59 : registered) to handle two races: (1) between register_fd() and the
60 : reactor seeing an event, and (2) between reactor completion and cancel().
61 :
62 : The registering state closes the window where an event could arrive
63 : after register_fd() but before the boolean was set. The reactor and
64 : cancel() both treat registering the same as registered when claiming.
65 :
66 : Whoever atomically exchanges to unregistered "claims" the operation
67 : and is responsible for completing it. The loser sees unregistered and
68 : does nothing. The initiating thread uses compare_exchange to transition
69 : from registering to registered; if this fails, the reactor or cancel
70 : already claimed the op.
71 :
72 : Impl Lifetime Management
73 : ------------------------
74 : When cancel() posts an op to the scheduler's ready queue, the socket impl
75 : might be destroyed before the scheduler processes the op. The `impl_ptr`
76 : member holds a shared_ptr to the impl, keeping it alive until the op
77 : completes.
78 :
79 : EOF Detection
80 : -------------
81 : For reads, 0 bytes with no error means EOF. But an empty user buffer also
82 : returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
83 :
84 : SIGPIPE Prevention
85 : ------------------
86 : Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
87 : SIGPIPE when the peer has closed.
88 : */
89 :
90 : namespace boost::corosio::detail {
91 :
92 : // Forward declarations for cancellation support
93 : class select_socket;
94 : class select_acceptor;
95 :
96 : /** Registration state for async operations.
97 :
98 : Tri-state enum to handle the race between register_fd() and
99 : run_reactor() seeing an event. Setting REGISTERING before
100 : calling register_fd() ensures events delivered during the
101 : registration window are not dropped.
102 : */
103 : enum class select_registration_state : std::uint8_t
104 : {
105 : unregistered, ///< Not registered with reactor
106 : registering, ///< register_fd() called, not yet confirmed
107 : registered ///< Fully registered, ready for events
108 : };
109 :
110 : struct select_op : scheduler_op
111 : {
112 : struct canceller
113 : {
114 : select_op* op;
115 : void operator()() const noexcept;
116 : };
117 :
118 : std::coroutine_handle<> h;
119 : capy::executor_ref ex;
120 : std::error_code* ec_out = nullptr;
121 : std::size_t* bytes_out = nullptr;
122 :
123 : int fd = -1;
124 : int errn = 0;
125 : std::size_t bytes_transferred = 0;
126 :
127 : std::atomic<bool> cancelled{false};
128 : std::atomic<select_registration_state> registered{
129 : select_registration_state::unregistered};
130 : std::optional<std::stop_callback<canceller>> stop_cb;
131 :
132 : // Prevents use-after-free when socket is closed with pending ops.
133 : std::shared_ptr<void> impl_ptr;
134 :
135 : // For stop_token cancellation - pointer to owning socket/acceptor impl.
136 : select_socket* socket_impl_ = nullptr;
137 : select_acceptor* acceptor_impl_ = nullptr;
138 :
139 HIT 28735 : select_op() = default;
140 :
141 168588 : void reset() noexcept
142 : {
143 168588 : fd = -1;
144 168588 : errn = 0;
145 168588 : bytes_transferred = 0;
146 168588 : cancelled.store(false, std::memory_order_relaxed);
147 168588 : registered.store(
148 : select_registration_state::unregistered, std::memory_order_relaxed);
149 168588 : impl_ptr.reset();
150 168588 : socket_impl_ = nullptr;
151 168588 : acceptor_impl_ = nullptr;
152 168588 : }
153 :
154 162228 : void operator()() override
155 : {
156 162228 : stop_cb.reset();
157 :
158 162228 : if (ec_out)
159 : {
160 162228 : if (cancelled.load(std::memory_order_acquire))
161 194 : *ec_out = capy::error::canceled;
162 162034 : else if (errn != 0)
163 1 : *ec_out = make_err(errn);
164 162033 : else if (is_read_operation() && bytes_transferred == 0)
165 5 : *ec_out = capy::error::eof;
166 : else
167 162028 : *ec_out = {};
168 : }
169 :
170 162228 : if (bytes_out)
171 162228 : *bytes_out = bytes_transferred;
172 :
173 : // Move to stack before destroying the frame
174 162228 : capy::executor_ref saved_ex(ex);
175 162228 : std::coroutine_handle<> saved_h(h);
176 162228 : impl_ptr.reset();
177 162228 : dispatch_coro(saved_ex, saved_h).resume();
178 162228 : }
179 :
180 81030 : virtual bool is_read_operation() const noexcept
181 : {
182 81030 : return false;
183 : }
184 : virtual void cancel() noexcept = 0;
185 :
186 MIS 0 : void destroy() override
187 : {
188 0 : stop_cb.reset();
189 0 : impl_ptr.reset();
190 0 : }
191 :
192 HIT 86895 : void request_cancel() noexcept
193 : {
194 86895 : cancelled.store(true, std::memory_order_release);
195 86895 : }
196 :
197 : void start(std::stop_token const& token)
198 : {
199 : cancelled.store(false, std::memory_order_release);
200 : stop_cb.reset();
201 : socket_impl_ = nullptr;
202 : acceptor_impl_ = nullptr;
203 :
204 : if (token.stop_possible())
205 : stop_cb.emplace(token, canceller{this});
206 : }
207 :
208 165408 : void start(std::stop_token const& token, select_socket* impl)
209 : {
210 165408 : cancelled.store(false, std::memory_order_release);
211 165408 : stop_cb.reset();
212 165408 : socket_impl_ = impl;
213 165408 : acceptor_impl_ = nullptr;
214 :
215 165408 : if (token.stop_possible())
216 98 : stop_cb.emplace(token, canceller{this});
217 165408 : }
218 :
219 3180 : void start(std::stop_token const& token, select_acceptor* impl)
220 : {
221 3180 : cancelled.store(false, std::memory_order_release);
222 3180 : stop_cb.reset();
223 3180 : socket_impl_ = nullptr;
224 3180 : acceptor_impl_ = impl;
225 :
226 3180 : if (token.stop_possible())
227 MIS 0 : stop_cb.emplace(token, canceller{this});
228 HIT 3180 : }
229 :
230 168431 : void complete(int err, std::size_t bytes) noexcept
231 : {
232 168431 : errn = err;
233 168431 : bytes_transferred = bytes;
234 168431 : }
235 :
236 MIS 0 : virtual void perform_io() noexcept {}
237 : };
238 :
239 : struct select_connect_op final : select_op
240 : {
241 : endpoint target_endpoint;
242 :
243 HIT 3180 : void reset() noexcept
244 : {
245 3180 : select_op::reset();
246 3180 : target_endpoint = endpoint{};
247 3180 : }
248 :
249 3180 : void perform_io() noexcept override
250 : {
251 : // connect() completion status is retrieved via SO_ERROR, not return value
252 3180 : int err = 0;
253 3180 : socklen_t len = sizeof(err);
254 3180 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
255 MIS 0 : err = errno;
256 HIT 3180 : complete(err, 0);
257 3180 : }
258 :
259 : // Defined in sockets.cpp where select_socket is complete
260 : void operator()() override;
261 : void cancel() noexcept override;
262 : };
263 :
264 : struct select_read_op final : select_op
265 : {
266 : static constexpr std::size_t max_buffers = 16;
267 : iovec iovecs[max_buffers];
268 : int iovec_count = 0;
269 : bool empty_buffer_read = false;
270 :
271 81003 : bool is_read_operation() const noexcept override
272 : {
273 81003 : return !empty_buffer_read;
274 : }
275 :
276 81193 : void reset() noexcept
277 : {
278 81193 : select_op::reset();
279 81193 : iovec_count = 0;
280 81193 : empty_buffer_read = false;
281 81193 : }
282 :
283 119 : void perform_io() noexcept override
284 : {
285 119 : ssize_t n = ::readv(fd, iovecs, iovec_count);
286 119 : if (n >= 0)
287 119 : complete(0, static_cast<std::size_t>(n));
288 : else
289 MIS 0 : complete(errno, 0);
290 HIT 119 : }
291 :
292 : void cancel() noexcept override;
293 : };
294 :
295 : struct select_write_op final : select_op
296 : {
297 : static constexpr std::size_t max_buffers = 16;
298 : iovec iovecs[max_buffers];
299 : int iovec_count = 0;
300 :
301 81035 : void reset() noexcept
302 : {
303 81035 : select_op::reset();
304 81035 : iovec_count = 0;
305 81035 : }
306 :
307 MIS 0 : void perform_io() noexcept override
308 : {
309 0 : msghdr msg{};
310 0 : msg.msg_iov = iovecs;
311 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
312 :
313 0 : ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
314 0 : if (n >= 0)
315 0 : complete(0, static_cast<std::size_t>(n));
316 : else
317 0 : complete(errno, 0);
318 0 : }
319 :
320 : void cancel() noexcept override;
321 : };
322 :
323 : struct select_accept_op final : select_op
324 : {
325 : int accepted_fd = -1;
326 : io_object::implementation* peer_impl = nullptr;
327 : io_object::implementation** impl_out = nullptr;
328 :
329 HIT 3180 : void reset() noexcept
330 : {
331 3180 : select_op::reset();
332 3180 : accepted_fd = -1;
333 3180 : peer_impl = nullptr;
334 3180 : impl_out = nullptr;
335 3180 : }
336 :
337 3175 : void perform_io() noexcept override
338 : {
339 3175 : sockaddr_storage addr_storage{};
340 3175 : socklen_t addrlen = sizeof(addr_storage);
341 :
342 : // Note: select backend uses accept() + fcntl instead of accept4()
343 : // for broader POSIX compatibility
344 : int new_fd =
345 3175 : ::accept(fd, reinterpret_cast<sockaddr*>(&addr_storage), &addrlen);
346 :
347 3175 : if (new_fd >= 0)
348 : {
349 : // Reject fds that exceed select()'s FD_SETSIZE limit.
350 : // Better to fail now than during later async operations.
351 3175 : if (new_fd >= FD_SETSIZE)
352 : {
353 MIS 0 : ::close(new_fd);
354 0 : complete(EINVAL, 0);
355 0 : return;
356 : }
357 :
358 : // Set non-blocking and close-on-exec flags.
359 : // A non-blocking socket is essential for the async reactor;
360 : // if we can't configure it, fail rather than risk blocking.
361 HIT 3175 : int flags = ::fcntl(new_fd, F_GETFL, 0);
362 3175 : if (flags == -1)
363 : {
364 MIS 0 : int err = errno;
365 0 : ::close(new_fd);
366 0 : complete(err, 0);
367 0 : return;
368 : }
369 :
370 HIT 3175 : if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
371 : {
372 MIS 0 : int err = errno;
373 0 : ::close(new_fd);
374 0 : complete(err, 0);
375 0 : return;
376 : }
377 :
378 HIT 3175 : if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
379 : {
380 MIS 0 : int err = errno;
381 0 : ::close(new_fd);
382 0 : complete(err, 0);
383 0 : return;
384 : }
385 :
386 HIT 3175 : accepted_fd = new_fd;
387 3175 : complete(0, 0);
388 : }
389 : else
390 : {
391 MIS 0 : complete(errno, 0);
392 : }
393 : }
394 :
395 : // Defined in acceptors.cpp where select_acceptor is complete
396 : void operator()() override;
397 : void cancel() noexcept override;
398 : };
399 :
400 : } // namespace boost::corosio::detail
401 :
402 : #endif // BOOST_COROSIO_HAS_SELECT
403 :
404 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
|