libs/capy/include/boost/capy/io/any_read_stream.hpp

87.4% Lines (83/95) 81.6% Functions (31/38) 77.3% Branches (17/22)
libs/capy/include/boost/capy/io/any_read_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/read_stream.hpp>
19 #include <boost/capy/ex/io_env.hpp>
20 #include <boost/capy/io_result.hpp>
21
22 #include <concepts>
23 #include <coroutine>
24 #include <cstddef>
25 #include <new>
26 #include <span>
27 #include <stop_token>
28 #include <system_error>
29 #include <utility>
30
31 namespace boost {
32 namespace capy {
33
34 /** Type-erased wrapper for any ReadStream.
35
36 This class provides type erasure for any type satisfying the
37 @ref ReadStream concept, enabling runtime polymorphism for
38 read operations. It uses cached awaitable storage to achieve
39 zero steady-state allocation after construction.
40
41 The wrapper supports two construction modes:
42 - **Owning**: Pass by value to transfer ownership. The wrapper
43 allocates storage and owns the stream.
44 - **Reference**: Pass a pointer to wrap without ownership. The
45 pointed-to stream must outlive this wrapper.
46
47 @par Awaitable Preallocation
48 The constructor preallocates storage for the type-erased awaitable.
49 This reserves all virtual address space at server startup
50 so memory usage can be measured up front, rather than
51 allocating piecemeal as traffic arrives.
52
53 @par Immediate Completion
54 When the underlying stream's awaitable reports ready immediately
55 (e.g. buffered data already available), the wrapper skips
56 coroutine suspension entirely and returns the result inline.
57
58 @par Thread Safety
59 Not thread-safe. Concurrent operations on the same wrapper
60 are undefined behavior.
61
62 @par Example
63 @code
64 // Owning - takes ownership of the stream
65 any_read_stream stream(socket{ioc});
66
67 // Reference - wraps without ownership
68 socket sock(ioc);
69 any_read_stream stream(&sock);
70
71 mutable_buffer buf(data, size);
72 auto [ec, n] = co_await stream.read_some(buf);
73 @endcode
74
75 @see any_write_stream, any_stream, ReadStream
76 */
77 class any_read_stream
78 {
79 struct vtable;
80
81 template<ReadStream S>
82 struct vtable_for_impl;
83
84 // ordered for cache line coherence
85 void* stream_ = nullptr;
86 vtable const* vt_ = nullptr;
87 void* cached_awaitable_ = nullptr;
88 void* storage_ = nullptr;
89 bool awaitable_active_ = false;
90
91 public:
92 /** Destructor.
93
94 Destroys the owned stream (if any) and releases the cached
95 awaitable storage.
96 */
97 ~any_read_stream();
98
99 /** Default constructor.
100
101 Constructs an empty wrapper. Operations on a default-constructed
102 wrapper result in undefined behavior.
103 */
104 1 any_read_stream() = default;
105
106 /** Non-copyable.
107
108 The awaitable cache is per-instance and cannot be shared.
109 */
110 any_read_stream(any_read_stream const&) = delete;
111 any_read_stream& operator=(any_read_stream const&) = delete;
112
113 /** Move constructor.
114
115 Transfers ownership of the wrapped stream (if owned) and
116 cached awaitable storage from `other`. After the move, `other` is
117 in a default-constructed state.
118
119 @param other The wrapper to move from.
120 */
121 2 any_read_stream(any_read_stream&& other) noexcept
122 2 : stream_(std::exchange(other.stream_, nullptr))
123 2 , vt_(std::exchange(other.vt_, nullptr))
124 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
125 2 , storage_(std::exchange(other.storage_, nullptr))
126 2 , awaitable_active_(std::exchange(other.awaitable_active_, false))
127 {
128 2 }
129
130 /** Move assignment operator.
131
132 Destroys any owned stream and releases existing resources,
133 then transfers ownership from `other`.
134
135 @param other The wrapper to move from.
136 @return Reference to this wrapper.
137 */
138 any_read_stream&
139 operator=(any_read_stream&& other) noexcept;
140
141 /** Construct by taking ownership of a ReadStream.
142
143 Allocates storage and moves the stream into this wrapper.
144 The wrapper owns the stream and will destroy it.
145
146 @param s The stream to take ownership of.
147 */
148 template<ReadStream S>
149 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
150 any_read_stream(S s);
151
152 /** Construct by wrapping a ReadStream without ownership.
153
154 Wraps the given stream by pointer. The stream must remain
155 valid for the lifetime of this wrapper.
156
157 @param s Pointer to the stream to wrap.
158 */
159 template<ReadStream S>
160 any_read_stream(S* s);
161
162 /** Check if the wrapper contains a valid stream.
163
164 @return `true` if wrapping a stream, `false` if default-constructed
165 or moved-from.
166 */
167 bool
168 25 has_value() const noexcept
169 {
170 25 return stream_ != nullptr;
171 }
172
173 /** Check if the wrapper contains a valid stream.
174
175 @return `true` if wrapping a stream, `false` if default-constructed
176 or moved-from.
177 */
178 explicit
179 3 operator bool() const noexcept
180 {
181 3 return has_value();
182 }
183
184 /** Initiate an asynchronous read operation.
185
186 Reads data into the provided buffer sequence. The operation
187 completes when at least one byte has been read, or an error
188 occurs.
189
190 @param buffers The buffer sequence to read into. Passed by
191 value to ensure the sequence lives in the coroutine frame
192 across suspension points.
193
194 @return An awaitable yielding `(error_code,std::size_t)`.
195
196 @par Immediate Completion
197 The operation completes immediately without suspending
198 the calling coroutine when the underlying stream's
199 awaitable reports immediate readiness via `await_ready`.
200
201 @note This is a partial operation and may not process the
202 entire buffer sequence. Use the composed @ref read algorithm
203 for guaranteed complete transfer.
204
205 @par Preconditions
206 The wrapper must contain a valid stream (`has_value() == true`).
207 The caller must not call this function again after a prior
208 call returned an error (including EOF).
209 */
210 template<MutableBufferSequence MB>
211 auto
212 read_some(MB buffers);
213
214 protected:
215 /** Rebind to a new stream after move.
216
217 Updates the internal pointer to reference a new stream object.
218 Used by owning wrappers after move assignment when the owned
219 object has moved to a new location.
220
221 @param new_stream The new stream to bind to. Must be the same
222 type as the original stream.
223
224 @note Terminates if called with a stream of different type
225 than the original.
226 */
227 template<ReadStream S>
228 void
229 rebind(S& new_stream) noexcept
230 {
231 if(vt_ != &vtable_for_impl<S>::value)
232 std::terminate();
233 stream_ = &new_stream;
234 }
235 };
236
237 //----------------------------------------------------------
238
239 struct any_read_stream::vtable
240 {
241 // ordered by call frequency for cache line coherence
242 void (*construct_awaitable)(
243 void* stream,
244 void* storage,
245 std::span<mutable_buffer const> buffers);
246 bool (*await_ready)(void*);
247 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
248 io_result<std::size_t> (*await_resume)(void*);
249 void (*destroy_awaitable)(void*) noexcept;
250 std::size_t awaitable_size;
251 std::size_t awaitable_align;
252 void (*destroy)(void*) noexcept;
253 };
254
255 template<ReadStream S>
256 struct any_read_stream::vtable_for_impl
257 {
258 using Awaitable = decltype(std::declval<S&>().read_some(
259 std::span<mutable_buffer const>{}));
260
261 static void
262 1 do_destroy_impl(void* stream) noexcept
263 {
264 1 static_cast<S*>(stream)->~S();
265 1 }
266
267 static void
268 91 construct_awaitable_impl(
269 void* stream,
270 void* storage,
271 std::span<mutable_buffer const> buffers)
272 {
273 91 auto& s = *static_cast<S*>(stream);
274 91 ::new(storage) Awaitable(s.read_some(buffers));
275 91 }
276
277 static constexpr vtable value = {
278 &construct_awaitable_impl,
279 91 +[](void* p) {
280 91 return static_cast<Awaitable*>(p)->await_ready();
281 },
282 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
283 return detail::call_await_suspend(
284 static_cast<Awaitable*>(p), h, env);
285 },
286 89 +[](void* p) {
287 89 return static_cast<Awaitable*>(p)->await_resume();
288 },
289 93 +[](void* p) noexcept {
290 16 static_cast<Awaitable*>(p)->~Awaitable();
291 },
292 sizeof(Awaitable),
293 alignof(Awaitable),
294 &do_destroy_impl
295 };
296 };
297
298 //----------------------------------------------------------
299
300 inline
301 101 any_read_stream::~any_read_stream()
302 {
303
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 100 times.
101 if(storage_)
304 {
305 1 vt_->destroy(stream_);
306 1 ::operator delete(storage_);
307 }
308
2/2
✓ Branch 0 taken 91 times.
✓ Branch 1 taken 10 times.
101 if(cached_awaitable_)
309 {
310
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 90 times.
91 if(awaitable_active_)
311 1 vt_->destroy_awaitable(cached_awaitable_);
312 91 ::operator delete(cached_awaitable_);
313 }
314 101 }
315
316 inline any_read_stream&
317 5 any_read_stream::operator=(any_read_stream&& other) noexcept
318 {
319
1/2
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
5 if(this != &other)
320 {
321
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if(storage_)
322 {
323 vt_->destroy(stream_);
324 ::operator delete(storage_);
325 }
326
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
5 if(cached_awaitable_)
327 {
328
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 if(awaitable_active_)
329 1 vt_->destroy_awaitable(cached_awaitable_);
330 2 ::operator delete(cached_awaitable_);
331 }
332 5 stream_ = std::exchange(other.stream_, nullptr);
333 5 vt_ = std::exchange(other.vt_, nullptr);
334 5 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
335 5 storage_ = std::exchange(other.storage_, nullptr);
336 5 awaitable_active_ = std::exchange(other.awaitable_active_, false);
337 }
338 5 return *this;
339 }
340
341 template<ReadStream S>
342 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
343 1 any_read_stream::any_read_stream(S s)
344 1 : vt_(&vtable_for_impl<S>::value)
345 {
346 struct guard {
347 any_read_stream* self;
348 bool committed = false;
349 1 ~guard() {
350
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1 if(!committed && self->storage_) {
351 self->vt_->destroy(self->stream_);
352 ::operator delete(self->storage_);
353 self->storage_ = nullptr;
354 self->stream_ = nullptr;
355 }
356 1 }
357 1 } g{this};
358
359
1/1
✓ Branch 1 taken 1 time.
1 storage_ = ::operator new(sizeof(S));
360 1 stream_ = ::new(storage_) S(std::move(s));
361
362 // Preallocate the awaitable storage
363
1/1
✓ Branch 1 taken 1 time.
1 cached_awaitable_ = ::operator new(vt_->awaitable_size);
364
365 1 g.committed = true;
366 1 }
367
368 template<ReadStream S>
369 92 any_read_stream::any_read_stream(S* s)
370 92 : stream_(s)
371 92 , vt_(&vtable_for_impl<S>::value)
372 {
373 // Preallocate the awaitable storage
374 92 cached_awaitable_ = ::operator new(vt_->awaitable_size);
375 92 }
376
377 //----------------------------------------------------------
378
379 template<MutableBufferSequence MB>
380 auto
381 91 any_read_stream::read_some(MB buffers)
382 {
383 // VFALCO in theory, we could use if constexpr to detect a
384 // span and then pass that through to read_some without the array
385 struct awaitable
386 {
387 any_read_stream* self_;
388 mutable_buffer_array<detail::max_iovec_> ba_;
389
390 bool
391 14 await_ready()
392 {
393 14 self_->vt_->construct_awaitable(
394 14 self_->stream_,
395
1/1
✓ Branch 1 taken 14 times.
14 self_->cached_awaitable_,
396 14 ba_.to_span());
397 14 self_->awaitable_active_ = true;
398
399 28 return self_->vt_->await_ready(
400 14 self_->cached_awaitable_);
401 }
402
403 std::coroutine_handle<>
404 await_suspend(std::coroutine_handle<> h, io_env const* env)
405 {
406 return self_->vt_->await_suspend(
407 self_->cached_awaitable_, h, env);
408 }
409
410 io_result<std::size_t>
411 14 await_resume()
412 {
413 struct guard {
414 any_read_stream* self;
415 14 ~guard() {
416 14 self->vt_->destroy_awaitable(self->cached_awaitable_);
417 14 self->awaitable_active_ = false;
418 14 }
419 14 } g{self_};
420 14 return self_->vt_->await_resume(
421
1/1
✓ Branch 1 taken 10 times.
24 self_->cached_awaitable_);
422 14 }
423 };
424 return awaitable{this,
425 91 mutable_buffer_array<detail::max_iovec_>(buffers)};
426 91 }
427
428 } // namespace capy
429 } // namespace boost
430
431 #endif
432