libs/capy/include/boost/capy/io/any_buffer_source.hpp

87.7% Lines (143/163) 90.7% Functions (39/43) 68.0% Branches (17/25)
libs/capy/include/boost/capy/io/any_buffer_source.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/buffers/slice.hpp>
19 #include <boost/capy/concept/buffer_source.hpp>
20 #include <boost/capy/concept/io_awaitable.hpp>
21 #include <boost/capy/concept/read_source.hpp>
22 #include <boost/capy/error.hpp>
23 #include <boost/capy/ex/io_env.hpp>
24 #include <boost/capy/io_result.hpp>
25 #include <boost/capy/io_task.hpp>
26
27 #include <concepts>
28 #include <coroutine>
29 #include <cstddef>
30 #include <exception>
31 #include <new>
32 #include <span>
33 #include <stop_token>
34 #include <system_error>
35 #include <utility>
36
37 namespace boost {
38 namespace capy {
39
40 /** Type-erased wrapper for any BufferSource.
41
42 This class provides type erasure for any type satisfying the
43 @ref BufferSource concept, enabling runtime polymorphism for
44 buffer pull operations. It uses cached awaitable storage to achieve
45 zero steady-state allocation after construction.
46
47 The wrapper also satisfies @ref ReadSource. When the wrapped type
48 satisfies only @ref BufferSource, the read operations are
49 synthesized using @ref pull and @ref consume with an extra
50 buffer copy. When the wrapped type satisfies both @ref BufferSource
51 and @ref ReadSource, the native read operations are forwarded
52 directly across the virtual boundary, avoiding the copy.
53
54 The wrapper supports two construction modes:
55 - **Owning**: Pass by value to transfer ownership. The wrapper
56 allocates storage and owns the source.
57 - **Reference**: Pass a pointer to wrap without ownership. The
58 pointed-to source must outlive this wrapper.
59
60 Within each mode, the vtable is populated at compile time based
61 on whether the wrapped type also satisfies @ref ReadSource:
62 - **BufferSource only**: @ref read_some and @ref read are
63 synthesized from @ref pull and @ref consume, incurring one
64 buffer copy per operation.
65 - **BufferSource + ReadSource**: All read operations are
66 forwarded natively through the type-erased boundary with
67 no extra copy.
68
69 @par Awaitable Preallocation
70 The constructor preallocates storage for the type-erased awaitable.
71 This reserves all virtual address space at server startup
72 so memory usage can be measured up front, rather than
73 allocating piecemeal as traffic arrives.
74
75 @par Thread Safety
76 Not thread-safe. Concurrent operations on the same wrapper
77 are undefined behavior.
78
79 @par Example
80 @code
81 // Owning - takes ownership of the source
82 any_buffer_source abs(some_buffer_source{args...});
83
84 // Reference - wraps without ownership
85 some_buffer_source src;
86 any_buffer_source abs(&src);
87
88 const_buffer arr[16];
89 auto [ec, bufs] = co_await abs.pull(arr);
90
91 // ReadSource interface also available
92 char buf[64];
93 auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
94 @endcode
95
96 @see any_buffer_sink, BufferSource, ReadSource
97 */
98 class any_buffer_source
99 {
100 struct vtable;
101 struct awaitable_ops;
102 struct read_awaitable_ops;
103
104 template<BufferSource S>
105 struct vtable_for_impl;
106
107 // hot-path members first for cache locality
108 void* source_ = nullptr;
109 vtable const* vt_ = nullptr;
110 void* cached_awaitable_ = nullptr;
111 awaitable_ops const* active_ops_ = nullptr;
112 read_awaitable_ops const* active_read_ops_ = nullptr;
113 void* storage_ = nullptr;
114
115 public:
116 /** Destructor.
117
118 Destroys the owned source (if any) and releases the cached
119 awaitable storage.
120 */
121 ~any_buffer_source();
122
123 /** Default constructor.
124
125 Constructs an empty wrapper. Operations on a default-constructed
126 wrapper result in undefined behavior.
127 */
128 any_buffer_source() = default;
129
130 /** Non-copyable.
131
132 The awaitable cache is per-instance and cannot be shared.
133 */
134 any_buffer_source(any_buffer_source const&) = delete;
135 any_buffer_source& operator=(any_buffer_source const&) = delete;
136
137 /** Move constructor.
138
139 Transfers ownership of the wrapped source (if owned) and
140 cached awaitable storage from `other`. After the move, `other` is
141 in a default-constructed state.
142
143 @param other The wrapper to move from.
144 */
145 2 any_buffer_source(any_buffer_source&& other) noexcept
146 2 : source_(std::exchange(other.source_, nullptr))
147 2 , vt_(std::exchange(other.vt_, nullptr))
148 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
149 2 , active_ops_(std::exchange(other.active_ops_, nullptr))
150 2 , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
151 2 , storage_(std::exchange(other.storage_, nullptr))
152 {
153 2 }
154
155 /** Move assignment operator.
156
157 Destroys any owned source and releases existing resources,
158 then transfers ownership from `other`.
159
160 @param other The wrapper to move from.
161 @return Reference to this wrapper.
162 */
163 any_buffer_source&
164 operator=(any_buffer_source&& other) noexcept;
165
166 /** Construct by taking ownership of a BufferSource.
167
168 Allocates storage and moves the source into this wrapper.
169 The wrapper owns the source and will destroy it. If `S` also
170 satisfies @ref ReadSource, native read operations are
171 forwarded through the virtual boundary.
172
173 @param s The source to take ownership of.
174 */
175 template<BufferSource S>
176 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
177 any_buffer_source(S s);
178
179 /** Construct by wrapping a BufferSource without ownership.
180
181 Wraps the given source by pointer. The source must remain
182 valid for the lifetime of this wrapper. If `S` also
183 satisfies @ref ReadSource, native read operations are
184 forwarded through the virtual boundary.
185
186 @param s Pointer to the source to wrap.
187 */
188 template<BufferSource S>
189 any_buffer_source(S* s);
190
191 /** Check if the wrapper contains a valid source.
192
193 @return `true` if wrapping a source, `false` if default-constructed
194 or moved-from.
195 */
196 bool
197 16 has_value() const noexcept
198 {
199 16 return source_ != nullptr;
200 }
201
202 /** Check if the wrapper contains a valid source.
203
204 @return `true` if wrapping a source, `false` if default-constructed
205 or moved-from.
206 */
207 explicit
208 2 operator bool() const noexcept
209 {
210 2 return has_value();
211 }
212
213 /** Consume bytes from the source.
214
215 Advances the internal read position of the underlying source
216 by the specified number of bytes. The next call to @ref pull
217 returns data starting after the consumed bytes.
218
219 @param n The number of bytes to consume. Must not exceed the
220 total size of buffers returned by the previous @ref pull.
221
222 @par Preconditions
223 The wrapper must contain a valid source (`has_value() == true`).
224 */
225 void
226 consume(std::size_t n) noexcept;
227
228 /** Pull buffer data from the source.
229
230 Fills the provided span with buffer descriptors from the
231 underlying source. The operation completes when data is
232 available, the source is exhausted, or an error occurs.
233
234 @param dest Span of const_buffer to fill.
235
236 @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
237 On success with data, a non-empty span of filled buffers.
238 On EOF, `ec == cond::eof` and span is empty.
239
240 @par Preconditions
241 The wrapper must contain a valid source (`has_value() == true`).
242 The caller must not call this function again after a prior
243 call returned an error.
244 */
245 auto
246 pull(std::span<const_buffer> dest);
247
248 /** Read some data into a mutable buffer sequence.
249
250 Reads one or more bytes into the caller's buffers. May fill
251 less than the full sequence.
252
253 When the wrapped type provides native @ref ReadSource support,
254 the operation forwards directly. Otherwise it is synthesized
255 from @ref pull, @ref buffer_copy, and @ref consume.
256
257 @param buffers The buffer sequence to fill.
258
259 @return An awaitable yielding `(error_code,std::size_t)`.
260
261 @par Preconditions
262 The wrapper must contain a valid source (`has_value() == true`).
263 The caller must not call this function again after a prior
264 call returned an error (including EOF).
265
266 @see pull, consume
267 */
268 template<MutableBufferSequence MB>
269 io_task<std::size_t>
270 read_some(MB buffers);
271
272 /** Read data into a mutable buffer sequence.
273
274 Fills the provided buffer sequence completely. When the
275 wrapped type provides native @ref ReadSource support, each
276 window is forwarded directly. Otherwise the data is
277 synthesized from @ref pull, @ref buffer_copy, and @ref consume.
278
279 @param buffers The buffer sequence to fill.
280
281 @return An awaitable yielding `(error_code,std::size_t)`.
282 On success, `n == buffer_size(buffers)`.
283 On EOF, `ec == error::eof` and `n` is bytes transferred.
284
285 @par Preconditions
286 The wrapper must contain a valid source (`has_value() == true`).
287 The caller must not call this function again after a prior
288 call returned an error (including EOF).
289
290 @see pull, consume
291 */
292 template<MutableBufferSequence MB>
293 io_task<std::size_t>
294 read(MB buffers);
295
296 protected:
297 /** Rebind to a new source after move.
298
299 Updates the internal pointer to reference a new source object.
300 Used by owning wrappers after move assignment when the owned
301 object has moved to a new location.
302
303 @param new_source The new source to bind to. Must be the same
304 type as the original source.
305
306 @note Terminates if called with a source of different type
307 than the original.
308 */
309 template<BufferSource S>
310 void
311 rebind(S& new_source) noexcept
312 {
313 if(vt_ != &vtable_for_impl<S>::value)
314 std::terminate();
315 source_ = &new_source;
316 }
317
318 private:
319 /** Forward a partial read through the vtable.
320
321 Constructs the underlying `read_some` awaitable in
322 cached storage and returns a type-erased awaitable.
323 */
324 auto
325 read_some_(std::span<mutable_buffer const> buffers);
326
327 /** Forward a complete read through the vtable.
328
329 Constructs the underlying `read` awaitable in
330 cached storage and returns a type-erased awaitable.
331 */
332 auto
333 read_(std::span<mutable_buffer const> buffers);
334 };
335
336 //----------------------------------------------------------
337
338 /** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
339 struct any_buffer_source::awaitable_ops
340 {
341 bool (*await_ready)(void*);
342 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
343 io_result<std::span<const_buffer>> (*await_resume)(void*);
344 void (*destroy)(void*) noexcept;
345 };
346
347 /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
348 struct any_buffer_source::read_awaitable_ops
349 {
350 bool (*await_ready)(void*);
351 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
352 io_result<std::size_t> (*await_resume)(void*);
353 void (*destroy)(void*) noexcept;
354 };
355
356 struct any_buffer_source::vtable
357 {
358 // BufferSource ops (always populated)
359 void (*destroy)(void*) noexcept;
360 void (*do_consume)(void* source, std::size_t n) noexcept;
361 std::size_t awaitable_size;
362 std::size_t awaitable_align;
363 awaitable_ops const* (*construct_awaitable)(
364 void* source,
365 void* storage,
366 std::span<const_buffer> dest);
367
368 // ReadSource forwarding (null when wrapped type is BufferSource-only)
369 read_awaitable_ops const* (*construct_read_some_awaitable)(
370 void* source,
371 void* storage,
372 std::span<mutable_buffer const> buffers);
373 read_awaitable_ops const* (*construct_read_awaitable)(
374 void* source,
375 void* storage,
376 std::span<mutable_buffer const> buffers);
377 };
378
379 template<BufferSource S>
380 struct any_buffer_source::vtable_for_impl
381 {
382 using PullAwaitable = decltype(std::declval<S&>().pull(
383 std::declval<std::span<const_buffer>>()));
384
385 static void
386 7 do_destroy_impl(void* source) noexcept
387 {
388 7 static_cast<S*>(source)->~S();
389 7 }
390
391 static void
392 45 do_consume_impl(void* source, std::size_t n) noexcept
393 {
394 45 static_cast<S*>(source)->consume(n);
395 45 }
396
397 static awaitable_ops const*
398 110 construct_awaitable_impl(
399 void* source,
400 void* storage,
401 std::span<const_buffer> dest)
402 {
403 110 auto& s = *static_cast<S*>(source);
404 110 ::new(storage) PullAwaitable(s.pull(dest));
405
406 static constexpr awaitable_ops ops = {
407 +[](void* p) {
408 return static_cast<PullAwaitable*>(p)->await_ready();
409 },
410 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
411 return detail::call_await_suspend(
412 static_cast<PullAwaitable*>(p), h, env);
413 },
414 +[](void* p) {
415 return static_cast<PullAwaitable*>(p)->await_resume();
416 },
417 +[](void* p) noexcept {
418 static_cast<PullAwaitable*>(p)->~PullAwaitable();
419 }
420 };
421 110 return &ops;
422 }
423
424 //------------------------------------------------------
425 // ReadSource forwarding (only instantiated when ReadSource<S>)
426
427 static read_awaitable_ops const*
428 48 construct_read_some_awaitable_impl(
429 void* source,
430 void* storage,
431 std::span<mutable_buffer const> buffers)
432 requires ReadSource<S>
433 {
434 using Aw = decltype(std::declval<S&>().read_some(
435 std::span<mutable_buffer const>{}));
436 48 auto& s = *static_cast<S*>(source);
437 48 ::new(storage) Aw(s.read_some(buffers));
438
439 static constexpr read_awaitable_ops ops = {
440 48 +[](void* p) {
441 48 return static_cast<Aw*>(p)->await_ready();
442 },
443 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
444 return detail::call_await_suspend(
445 static_cast<Aw*>(p), h, env);
446 },
447 48 +[](void* p) {
448 48 return static_cast<Aw*>(p)->await_resume();
449 },
450 48 +[](void* p) noexcept {
451 48 static_cast<Aw*>(p)->~Aw();
452 }
453 };
454 48 return &ops;
455 }
456
457 static read_awaitable_ops const*
458 18 construct_read_awaitable_impl(
459 void* source,
460 void* storage,
461 std::span<mutable_buffer const> buffers)
462 requires ReadSource<S>
463 {
464 using Aw = decltype(std::declval<S&>().read(
465 std::span<mutable_buffer const>{}));
466 18 auto& s = *static_cast<S*>(source);
467 18 ::new(storage) Aw(s.read(buffers));
468
469 static constexpr read_awaitable_ops ops = {
470 18 +[](void* p) {
471 18 return static_cast<Aw*>(p)->await_ready();
472 },
473 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
474 return detail::call_await_suspend(
475 static_cast<Aw*>(p), h, env);
476 },
477 18 +[](void* p) {
478 18 return static_cast<Aw*>(p)->await_resume();
479 },
480 18 +[](void* p) noexcept {
481 18 static_cast<Aw*>(p)->~Aw();
482 }
483 };
484 18 return &ops;
485 }
486
487 //------------------------------------------------------
488
489 static consteval std::size_t
490 compute_max_size() noexcept
491 {
492 std::size_t s = sizeof(PullAwaitable);
493 if constexpr (ReadSource<S>)
494 {
495 using RS = decltype(std::declval<S&>().read_some(
496 std::span<mutable_buffer const>{}));
497 using R = decltype(std::declval<S&>().read(
498 std::span<mutable_buffer const>{}));
499
500 if(sizeof(RS) > s) s = sizeof(RS);
501 if(sizeof(R) > s) s = sizeof(R);
502 }
503 return s;
504 }
505
506 static consteval std::size_t
507 compute_max_align() noexcept
508 {
509 std::size_t a = alignof(PullAwaitable);
510 if constexpr (ReadSource<S>)
511 {
512 using RS = decltype(std::declval<S&>().read_some(
513 std::span<mutable_buffer const>{}));
514 using R = decltype(std::declval<S&>().read(
515 std::span<mutable_buffer const>{}));
516
517 if(alignof(RS) > a) a = alignof(RS);
518 if(alignof(R) > a) a = alignof(R);
519 }
520 return a;
521 }
522
523 static consteval vtable
524 make_vtable() noexcept
525 {
526 vtable v{};
527 v.destroy = &do_destroy_impl;
528 v.do_consume = &do_consume_impl;
529 v.awaitable_size = compute_max_size();
530 v.awaitable_align = compute_max_align();
531 v.construct_awaitable = &construct_awaitable_impl;
532 v.construct_read_some_awaitable = nullptr;
533 v.construct_read_awaitable = nullptr;
534
535 if constexpr (ReadSource<S>)
536 {
537 v.construct_read_some_awaitable =
538 &construct_read_some_awaitable_impl;
539 v.construct_read_awaitable =
540 &construct_read_awaitable_impl;
541 }
542 return v;
543 }
544
545 static constexpr vtable value = make_vtable();
546 };
547
548 //----------------------------------------------------------
549
550 inline
551 124 any_buffer_source::~any_buffer_source()
552 {
553
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 117 times.
124 if(storage_)
554 {
555 7 vt_->destroy(source_);
556 7 ::operator delete(storage_);
557 }
558
2/2
✓ Branch 0 taken 119 times.
✓ Branch 1 taken 5 times.
124 if(cached_awaitable_)
559 119 ::operator delete(cached_awaitable_);
560 124 }
561
562 inline any_buffer_source&
563 2 any_buffer_source::operator=(any_buffer_source&& other) noexcept
564 {
565
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if(this != &other)
566 {
567
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if(storage_)
568 {
569 vt_->destroy(source_);
570 ::operator delete(storage_);
571 }
572
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if(cached_awaitable_)
573 ::operator delete(cached_awaitable_);
574 2 source_ = std::exchange(other.source_, nullptr);
575 2 vt_ = std::exchange(other.vt_, nullptr);
576 2 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
577 2 storage_ = std::exchange(other.storage_, nullptr);
578 2 active_ops_ = std::exchange(other.active_ops_, nullptr);
579 2 active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
580 }
581 2 return *this;
582 }
583
584 template<BufferSource S>
585 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
586 7 any_buffer_source::any_buffer_source(S s)
587 7 : vt_(&vtable_for_impl<S>::value)
588 {
589 struct guard {
590 any_buffer_source* self;
591 bool committed = false;
592 7 ~guard() {
593
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
7 if(!committed && self->storage_) {
594 self->vt_->destroy(self->source_);
595 ::operator delete(self->storage_);
596 self->storage_ = nullptr;
597 self->source_ = nullptr;
598 }
599 7 }
600 7 } g{this};
601
602
1/1
✓ Branch 1 taken 7 times.
7 storage_ = ::operator new(sizeof(S));
603 7 source_ = ::new(storage_) S(std::move(s));
604
605
1/1
✓ Branch 1 taken 7 times.
7 cached_awaitable_ = ::operator new(vt_->awaitable_size);
606
607 7 g.committed = true;
608 7 }
609
610 template<BufferSource S>
611 112 any_buffer_source::any_buffer_source(S* s)
612 112 : source_(s)
613 112 , vt_(&vtable_for_impl<S>::value)
614 {
615 112 cached_awaitable_ = ::operator new(vt_->awaitable_size);
616 112 }
617
618 //----------------------------------------------------------
619
620 inline void
621 45 any_buffer_source::consume(std::size_t n) noexcept
622 {
623 45 vt_->do_consume(source_, n);
624 45 }
625
626 inline auto
627 110 any_buffer_source::pull(std::span<const_buffer> dest)
628 {
629 struct awaitable
630 {
631 any_buffer_source* self_;
632 std::span<const_buffer> dest_;
633
634 bool
635 110 await_ready()
636 {
637 220 self_->active_ops_ = self_->vt_->construct_awaitable(
638 110 self_->source_,
639 110 self_->cached_awaitable_,
640 dest_);
641 110 return self_->active_ops_->await_ready(self_->cached_awaitable_);
642 }
643
644 std::coroutine_handle<>
645 await_suspend(std::coroutine_handle<> h, io_env const* env)
646 {
647 return self_->active_ops_->await_suspend(
648 self_->cached_awaitable_, h, env);
649 }
650
651 io_result<std::span<const_buffer>>
652 110 await_resume()
653 {
654 struct guard {
655 any_buffer_source* self;
656 110 ~guard() {
657 110 self->active_ops_->destroy(self->cached_awaitable_);
658 110 self->active_ops_ = nullptr;
659 110 }
660 110 } g{self_};
661 110 return self_->active_ops_->await_resume(
662
1/1
✓ Branch 1 taken 85 times.
195 self_->cached_awaitable_);
663 110 }
664 };
665 110 return awaitable{this, dest};
666 }
667
668 //----------------------------------------------------------
669 // Private helpers for native ReadSource forwarding
670
671 inline auto
672 48 any_buffer_source::read_some_(
673 std::span<mutable_buffer const> buffers)
674 {
675 struct awaitable
676 {
677 any_buffer_source* self_;
678 std::span<mutable_buffer const> buffers_;
679
680 bool
681 48 await_ready() const noexcept
682 {
683 48 return false;
684 }
685
686 std::coroutine_handle<>
687 48 await_suspend(std::coroutine_handle<> h, io_env const* env)
688 {
689 96 self_->active_read_ops_ =
690 96 self_->vt_->construct_read_some_awaitable(
691 48 self_->source_,
692 48 self_->cached_awaitable_,
693 buffers_);
694
695
1/2
✓ Branch 0 taken 48 times.
✗ Branch 1 not taken.
48 if(self_->active_read_ops_->await_ready(
696 48 self_->cached_awaitable_))
697 48 return h;
698
699 return self_->active_read_ops_->await_suspend(
700 self_->cached_awaitable_, h, env);
701 }
702
703 io_result<std::size_t>
704 48 await_resume()
705 {
706 struct guard {
707 any_buffer_source* self;
708 48 ~guard() {
709 48 self->active_read_ops_->destroy(
710 48 self->cached_awaitable_);
711 48 self->active_read_ops_ = nullptr;
712 48 }
713 48 } g{self_};
714 48 return self_->active_read_ops_->await_resume(
715
1/1
✓ Branch 1 taken 40 times.
88 self_->cached_awaitable_);
716 48 }
717 };
718 48 return awaitable{this, buffers};
719 }
720
721 inline auto
722 18 any_buffer_source::read_(
723 std::span<mutable_buffer const> buffers)
724 {
725 struct awaitable
726 {
727 any_buffer_source* self_;
728 std::span<mutable_buffer const> buffers_;
729
730 bool
731 18 await_ready() const noexcept
732 {
733 18 return false;
734 }
735
736 std::coroutine_handle<>
737 18 await_suspend(std::coroutine_handle<> h, io_env const* env)
738 {
739 36 self_->active_read_ops_ =
740 36 self_->vt_->construct_read_awaitable(
741 18 self_->source_,
742 18 self_->cached_awaitable_,
743 buffers_);
744
745
1/2
✓ Branch 0 taken 18 times.
✗ Branch 1 not taken.
18 if(self_->active_read_ops_->await_ready(
746 18 self_->cached_awaitable_))
747 18 return h;
748
749 return self_->active_read_ops_->await_suspend(
750 self_->cached_awaitable_, h, env);
751 }
752
753 io_result<std::size_t>
754 18 await_resume()
755 {
756 struct guard {
757 any_buffer_source* self;
758 18 ~guard() {
759 18 self->active_read_ops_->destroy(
760 18 self->cached_awaitable_);
761 18 self->active_read_ops_ = nullptr;
762 18 }
763 18 } g{self_};
764 18 return self_->active_read_ops_->await_resume(
765
1/1
✓ Branch 1 taken 12 times.
30 self_->cached_awaitable_);
766 18 }
767 };
768 18 return awaitable{this, buffers};
769 }
770
771 //----------------------------------------------------------
772 // Public ReadSource methods
773
774 template<MutableBufferSequence MB>
775 io_task<std::size_t>
776
1/1
✓ Branch 1 taken 58 times.
58 any_buffer_source::read_some(MB buffers)
777 {
778 buffer_param<MB> bp(buffers);
779 auto dest = bp.data();
780 if(dest.empty())
781 co_return {{}, 0};
782
783 // Native ReadSource path
784 if(vt_->construct_read_some_awaitable)
785 co_return co_await read_some_(dest);
786
787 // Synthesized path: pull + buffer_copy + consume
788 const_buffer arr[detail::max_iovec_];
789 auto [ec, bufs] = co_await pull(arr);
790 if(ec)
791 co_return {ec, 0};
792
793 auto n = buffer_copy(dest, bufs);
794 consume(n);
795 co_return {{}, n};
796 116 }
797
798 template<MutableBufferSequence MB>
799 io_task<std::size_t>
800
1/1
✓ Branch 1 taken 24 times.
24 any_buffer_source::read(MB buffers)
801 {
802 buffer_param<MB> bp(buffers);
803 std::size_t total = 0;
804
805 // Native ReadSource path
806 if(vt_->construct_read_awaitable)
807 {
808 for(;;)
809 {
810 auto dest = bp.data();
811 if(dest.empty())
812 break;
813
814 auto [ec, n] = co_await read_(dest);
815 total += n;
816 if(ec)
817 co_return {ec, total};
818 bp.consume(n);
819 }
820 co_return {{}, total};
821 }
822
823 // Synthesized path: pull + buffer_copy + consume
824 for(;;)
825 {
826 auto dest = bp.data();
827 if(dest.empty())
828 break;
829
830 const_buffer arr[detail::max_iovec_];
831 auto [ec, bufs] = co_await pull(arr);
832
833 if(ec)
834 co_return {ec, total};
835
836 auto n = buffer_copy(dest, bufs);
837 consume(n);
838 total += n;
839 bp.consume(n);
840 }
841
842 co_return {{}, total};
843 48 }
844
845 //----------------------------------------------------------
846
847 static_assert(BufferSource<any_buffer_source>);
848 static_assert(ReadSource<any_buffer_source>);
849
850 } // namespace capy
851 } // namespace boost
852
853 #endif
854