Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/buffers/slice.hpp>
19 : #include <boost/capy/concept/buffer_source.hpp>
20 : #include <boost/capy/concept/io_awaitable.hpp>
21 : #include <boost/capy/concept/read_source.hpp>
22 : #include <boost/capy/error.hpp>
23 : #include <boost/capy/ex/io_env.hpp>
24 : #include <boost/capy/io_result.hpp>
25 : #include <boost/capy/io_task.hpp>
26 :
27 : #include <concepts>
28 : #include <coroutine>
29 : #include <cstddef>
30 : #include <exception>
31 : #include <new>
32 : #include <span>
33 : #include <stop_token>
34 : #include <system_error>
35 : #include <utility>
36 :
37 : namespace boost {
38 : namespace capy {
39 :
40 : /** Type-erased wrapper for any BufferSource.
41 :
42 : This class provides type erasure for any type satisfying the
43 : @ref BufferSource concept, enabling runtime polymorphism for
44 : buffer pull operations. It uses cached awaitable storage to achieve
45 : zero steady-state allocation after construction.
46 :
47 : The wrapper also satisfies @ref ReadSource. When the wrapped type
48 : satisfies only @ref BufferSource, the read operations are
49 : synthesized using @ref pull and @ref consume with an extra
50 : buffer copy. When the wrapped type satisfies both @ref BufferSource
51 : and @ref ReadSource, the native read operations are forwarded
52 : directly across the virtual boundary, avoiding the copy.
53 :
54 : The wrapper supports two construction modes:
55 : - **Owning**: Pass by value to transfer ownership. The wrapper
56 : allocates storage and owns the source.
57 : - **Reference**: Pass a pointer to wrap without ownership. The
58 : pointed-to source must outlive this wrapper.
59 :
60 : Within each mode, the vtable is populated at compile time based
61 : on whether the wrapped type also satisfies @ref ReadSource:
62 : - **BufferSource only**: @ref read_some and @ref read are
63 : synthesized from @ref pull and @ref consume, incurring one
64 : buffer copy per operation.
65 : - **BufferSource + ReadSource**: All read operations are
66 : forwarded natively through the type-erased boundary with
67 : no extra copy.
68 :
69 : @par Awaitable Preallocation
70 : The constructor preallocates storage for the type-erased awaitable.
71 : This reserves all virtual address space at server startup
72 : so memory usage can be measured up front, rather than
73 : allocating piecemeal as traffic arrives.
74 :
75 : @par Thread Safety
76 : Not thread-safe. Concurrent operations on the same wrapper
77 : are undefined behavior.
78 :
79 : @par Example
80 : @code
81 : // Owning - takes ownership of the source
82 : any_buffer_source abs(some_buffer_source{args...});
83 :
84 : // Reference - wraps without ownership
85 : some_buffer_source src;
86 : any_buffer_source abs(&src);
87 :
88 : const_buffer arr[16];
89 : auto [ec, bufs] = co_await abs.pull(arr);
90 :
91 : // ReadSource interface also available
92 : char buf[64];
93 : auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
94 : @endcode
95 :
96 : @see any_buffer_sink, BufferSource, ReadSource
97 : */
98 : class any_buffer_source
99 : {
100 : struct vtable;
101 : struct awaitable_ops;
102 : struct read_awaitable_ops;
103 :
104 : template<BufferSource S>
105 : struct vtable_for_impl;
106 :
107 : // hot-path members first for cache locality
108 : void* source_ = nullptr;
109 : vtable const* vt_ = nullptr;
110 : void* cached_awaitable_ = nullptr;
111 : awaitable_ops const* active_ops_ = nullptr;
112 : read_awaitable_ops const* active_read_ops_ = nullptr;
113 : void* storage_ = nullptr;
114 :
115 : public:
116 : /** Destructor.
117 :
118 : Destroys the owned source (if any) and releases the cached
119 : awaitable storage.
120 : */
121 : ~any_buffer_source();
122 :
123 : /** Default constructor.
124 :
125 : Constructs an empty wrapper. Operations on a default-constructed
126 : wrapper result in undefined behavior.
127 : */
128 : any_buffer_source() = default;
129 :
130 : /** Non-copyable.
131 :
132 : The awaitable cache is per-instance and cannot be shared.
133 : */
134 : any_buffer_source(any_buffer_source const&) = delete;
135 : any_buffer_source& operator=(any_buffer_source const&) = delete;
136 :
137 : /** Move constructor.
138 :
139 : Transfers ownership of the wrapped source (if owned) and
140 : cached awaitable storage from `other`. After the move, `other` is
141 : in a default-constructed state.
142 :
143 : @param other The wrapper to move from.
144 : */
145 2 : any_buffer_source(any_buffer_source&& other) noexcept
146 2 : : source_(std::exchange(other.source_, nullptr))
147 2 : , vt_(std::exchange(other.vt_, nullptr))
148 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
149 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
150 2 : , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
151 2 : , storage_(std::exchange(other.storage_, nullptr))
152 : {
153 2 : }
154 :
155 : /** Move assignment operator.
156 :
157 : Destroys any owned source and releases existing resources,
158 : then transfers ownership from `other`.
159 :
160 : @param other The wrapper to move from.
161 : @return Reference to this wrapper.
162 : */
163 : any_buffer_source&
164 : operator=(any_buffer_source&& other) noexcept;
165 :
166 : /** Construct by taking ownership of a BufferSource.
167 :
168 : Allocates storage and moves the source into this wrapper.
169 : The wrapper owns the source and will destroy it. If `S` also
170 : satisfies @ref ReadSource, native read operations are
171 : forwarded through the virtual boundary.
172 :
173 : @param s The source to take ownership of.
174 : */
175 : template<BufferSource S>
176 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
177 : any_buffer_source(S s);
178 :
179 : /** Construct by wrapping a BufferSource without ownership.
180 :
181 : Wraps the given source by pointer. The source must remain
182 : valid for the lifetime of this wrapper. If `S` also
183 : satisfies @ref ReadSource, native read operations are
184 : forwarded through the virtual boundary.
185 :
186 : @param s Pointer to the source to wrap.
187 : */
188 : template<BufferSource S>
189 : any_buffer_source(S* s);
190 :
191 : /** Check if the wrapper contains a valid source.
192 :
193 : @return `true` if wrapping a source, `false` if default-constructed
194 : or moved-from.
195 : */
196 : bool
197 16 : has_value() const noexcept
198 : {
199 16 : return source_ != nullptr;
200 : }
201 :
202 : /** Check if the wrapper contains a valid source.
203 :
204 : @return `true` if wrapping a source, `false` if default-constructed
205 : or moved-from.
206 : */
207 : explicit
208 2 : operator bool() const noexcept
209 : {
210 2 : return has_value();
211 : }
212 :
213 : /** Consume bytes from the source.
214 :
215 : Advances the internal read position of the underlying source
216 : by the specified number of bytes. The next call to @ref pull
217 : returns data starting after the consumed bytes.
218 :
219 : @param n The number of bytes to consume. Must not exceed the
220 : total size of buffers returned by the previous @ref pull.
221 :
222 : @par Preconditions
223 : The wrapper must contain a valid source (`has_value() == true`).
224 : */
225 : void
226 : consume(std::size_t n) noexcept;
227 :
228 : /** Pull buffer data from the source.
229 :
230 : Fills the provided span with buffer descriptors from the
231 : underlying source. The operation completes when data is
232 : available, the source is exhausted, or an error occurs.
233 :
234 : @param dest Span of const_buffer to fill.
235 :
236 : @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
237 : On success with data, a non-empty span of filled buffers.
238 : On EOF, `ec == cond::eof` and span is empty.
239 :
240 : @par Preconditions
241 : The wrapper must contain a valid source (`has_value() == true`).
242 : The caller must not call this function again after a prior
243 : call returned an error.
244 : */
245 : auto
246 : pull(std::span<const_buffer> dest);
247 :
248 : /** Read some data into a mutable buffer sequence.
249 :
250 : Reads one or more bytes into the caller's buffers. May fill
251 : less than the full sequence.
252 :
253 : When the wrapped type provides native @ref ReadSource support,
254 : the operation forwards directly. Otherwise it is synthesized
255 : from @ref pull, @ref buffer_copy, and @ref consume.
256 :
257 : @param buffers The buffer sequence to fill.
258 :
259 : @return An awaitable yielding `(error_code,std::size_t)`.
260 :
261 : @par Preconditions
262 : The wrapper must contain a valid source (`has_value() == true`).
263 : The caller must not call this function again after a prior
264 : call returned an error (including EOF).
265 :
266 : @see pull, consume
267 : */
268 : template<MutableBufferSequence MB>
269 : io_task<std::size_t>
270 : read_some(MB buffers);
271 :
272 : /** Read data into a mutable buffer sequence.
273 :
274 : Fills the provided buffer sequence completely. When the
275 : wrapped type provides native @ref ReadSource support, each
276 : window is forwarded directly. Otherwise the data is
277 : synthesized from @ref pull, @ref buffer_copy, and @ref consume.
278 :
279 : @param buffers The buffer sequence to fill.
280 :
281 : @return An awaitable yielding `(error_code,std::size_t)`.
282 : On success, `n == buffer_size(buffers)`.
283 : On EOF, `ec == error::eof` and `n` is bytes transferred.
284 :
285 : @par Preconditions
286 : The wrapper must contain a valid source (`has_value() == true`).
287 : The caller must not call this function again after a prior
288 : call returned an error (including EOF).
289 :
290 : @see pull, consume
291 : */
292 : template<MutableBufferSequence MB>
293 : io_task<std::size_t>
294 : read(MB buffers);
295 :
296 : protected:
297 : /** Rebind to a new source after move.
298 :
299 : Updates the internal pointer to reference a new source object.
300 : Used by owning wrappers after move assignment when the owned
301 : object has moved to a new location.
302 :
303 : @param new_source The new source to bind to. Must be the same
304 : type as the original source.
305 :
306 : @note Terminates if called with a source of different type
307 : than the original.
308 : */
309 : template<BufferSource S>
310 : void
311 : rebind(S& new_source) noexcept
312 : {
313 : if(vt_ != &vtable_for_impl<S>::value)
314 : std::terminate();
315 : source_ = &new_source;
316 : }
317 :
318 : private:
319 : /** Forward a partial read through the vtable.
320 :
321 : Constructs the underlying `read_some` awaitable in
322 : cached storage and returns a type-erased awaitable.
323 : */
324 : auto
325 : read_some_(std::span<mutable_buffer const> buffers);
326 :
327 : /** Forward a complete read through the vtable.
328 :
329 : Constructs the underlying `read` awaitable in
330 : cached storage and returns a type-erased awaitable.
331 : */
332 : auto
333 : read_(std::span<mutable_buffer const> buffers);
334 : };
335 :
336 : //----------------------------------------------------------
337 :
338 : /** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
339 : struct any_buffer_source::awaitable_ops
340 : {
341 : bool (*await_ready)(void*);
342 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
343 : io_result<std::span<const_buffer>> (*await_resume)(void*);
344 : void (*destroy)(void*) noexcept;
345 : };
346 :
347 : /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
348 : struct any_buffer_source::read_awaitable_ops
349 : {
350 : bool (*await_ready)(void*);
351 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
352 : io_result<std::size_t> (*await_resume)(void*);
353 : void (*destroy)(void*) noexcept;
354 : };
355 :
356 : struct any_buffer_source::vtable
357 : {
358 : // BufferSource ops (always populated)
359 : void (*destroy)(void*) noexcept;
360 : void (*do_consume)(void* source, std::size_t n) noexcept;
361 : std::size_t awaitable_size;
362 : std::size_t awaitable_align;
363 : awaitable_ops const* (*construct_awaitable)(
364 : void* source,
365 : void* storage,
366 : std::span<const_buffer> dest);
367 :
368 : // ReadSource forwarding (null when wrapped type is BufferSource-only)
369 : read_awaitable_ops const* (*construct_read_some_awaitable)(
370 : void* source,
371 : void* storage,
372 : std::span<mutable_buffer const> buffers);
373 : read_awaitable_ops const* (*construct_read_awaitable)(
374 : void* source,
375 : void* storage,
376 : std::span<mutable_buffer const> buffers);
377 : };
378 :
379 : template<BufferSource S>
380 : struct any_buffer_source::vtable_for_impl
381 : {
382 : using PullAwaitable = decltype(std::declval<S&>().pull(
383 : std::declval<std::span<const_buffer>>()));
384 :
385 : static void
386 7 : do_destroy_impl(void* source) noexcept
387 : {
388 7 : static_cast<S*>(source)->~S();
389 7 : }
390 :
391 : static void
392 45 : do_consume_impl(void* source, std::size_t n) noexcept
393 : {
394 45 : static_cast<S*>(source)->consume(n);
395 45 : }
396 :
397 : static awaitable_ops const*
398 110 : construct_awaitable_impl(
399 : void* source,
400 : void* storage,
401 : std::span<const_buffer> dest)
402 : {
403 110 : auto& s = *static_cast<S*>(source);
404 110 : ::new(storage) PullAwaitable(s.pull(dest));
405 :
406 : static constexpr awaitable_ops ops = {
407 110 : +[](void* p) {
408 110 : return static_cast<PullAwaitable*>(p)->await_ready();
409 : },
410 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
411 0 : return detail::call_await_suspend(
412 0 : static_cast<PullAwaitable*>(p), h, env);
413 : },
414 110 : +[](void* p) {
415 110 : return static_cast<PullAwaitable*>(p)->await_resume();
416 : },
417 110 : +[](void* p) noexcept {
418 110 : static_cast<PullAwaitable*>(p)->~PullAwaitable();
419 : }
420 : };
421 110 : return &ops;
422 : }
423 :
424 : //------------------------------------------------------
425 : // ReadSource forwarding (only instantiated when ReadSource<S>)
426 :
427 : static read_awaitable_ops const*
428 48 : construct_read_some_awaitable_impl(
429 : void* source,
430 : void* storage,
431 : std::span<mutable_buffer const> buffers)
432 : requires ReadSource<S>
433 : {
434 : using Aw = decltype(std::declval<S&>().read_some(
435 : std::span<mutable_buffer const>{}));
436 48 : auto& s = *static_cast<S*>(source);
437 48 : ::new(storage) Aw(s.read_some(buffers));
438 :
439 : static constexpr read_awaitable_ops ops = {
440 48 : +[](void* p) {
441 48 : return static_cast<Aw*>(p)->await_ready();
442 : },
443 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
444 0 : return detail::call_await_suspend(
445 0 : static_cast<Aw*>(p), h, env);
446 : },
447 48 : +[](void* p) {
448 48 : return static_cast<Aw*>(p)->await_resume();
449 : },
450 48 : +[](void* p) noexcept {
451 48 : static_cast<Aw*>(p)->~Aw();
452 : }
453 : };
454 48 : return &ops;
455 : }
456 :
457 : static read_awaitable_ops const*
458 18 : construct_read_awaitable_impl(
459 : void* source,
460 : void* storage,
461 : std::span<mutable_buffer const> buffers)
462 : requires ReadSource<S>
463 : {
464 : using Aw = decltype(std::declval<S&>().read(
465 : std::span<mutable_buffer const>{}));
466 18 : auto& s = *static_cast<S*>(source);
467 18 : ::new(storage) Aw(s.read(buffers));
468 :
469 : static constexpr read_awaitable_ops ops = {
470 18 : +[](void* p) {
471 18 : return static_cast<Aw*>(p)->await_ready();
472 : },
473 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
474 0 : return detail::call_await_suspend(
475 0 : static_cast<Aw*>(p), h, env);
476 : },
477 18 : +[](void* p) {
478 18 : return static_cast<Aw*>(p)->await_resume();
479 : },
480 18 : +[](void* p) noexcept {
481 18 : static_cast<Aw*>(p)->~Aw();
482 : }
483 : };
484 18 : return &ops;
485 : }
486 :
487 : //------------------------------------------------------
488 :
489 : static consteval std::size_t
490 : compute_max_size() noexcept
491 : {
492 : std::size_t s = sizeof(PullAwaitable);
493 : if constexpr (ReadSource<S>)
494 : {
495 : using RS = decltype(std::declval<S&>().read_some(
496 : std::span<mutable_buffer const>{}));
497 : using R = decltype(std::declval<S&>().read(
498 : std::span<mutable_buffer const>{}));
499 :
500 : if(sizeof(RS) > s) s = sizeof(RS);
501 : if(sizeof(R) > s) s = sizeof(R);
502 : }
503 : return s;
504 : }
505 :
506 : static consteval std::size_t
507 : compute_max_align() noexcept
508 : {
509 : std::size_t a = alignof(PullAwaitable);
510 : if constexpr (ReadSource<S>)
511 : {
512 : using RS = decltype(std::declval<S&>().read_some(
513 : std::span<mutable_buffer const>{}));
514 : using R = decltype(std::declval<S&>().read(
515 : std::span<mutable_buffer const>{}));
516 :
517 : if(alignof(RS) > a) a = alignof(RS);
518 : if(alignof(R) > a) a = alignof(R);
519 : }
520 : return a;
521 : }
522 :
523 : static consteval vtable
524 : make_vtable() noexcept
525 : {
526 : vtable v{};
527 : v.destroy = &do_destroy_impl;
528 : v.do_consume = &do_consume_impl;
529 : v.awaitable_size = compute_max_size();
530 : v.awaitable_align = compute_max_align();
531 : v.construct_awaitable = &construct_awaitable_impl;
532 : v.construct_read_some_awaitable = nullptr;
533 : v.construct_read_awaitable = nullptr;
534 :
535 : if constexpr (ReadSource<S>)
536 : {
537 : v.construct_read_some_awaitable =
538 : &construct_read_some_awaitable_impl;
539 : v.construct_read_awaitable =
540 : &construct_read_awaitable_impl;
541 : }
542 : return v;
543 : }
544 :
545 : static constexpr vtable value = make_vtable();
546 : };
547 :
548 : //----------------------------------------------------------
549 :
550 : inline
551 124 : any_buffer_source::~any_buffer_source()
552 : {
553 124 : if(storage_)
554 : {
555 7 : vt_->destroy(source_);
556 7 : ::operator delete(storage_);
557 : }
558 124 : if(cached_awaitable_)
559 119 : ::operator delete(cached_awaitable_);
560 124 : }
561 :
562 : inline any_buffer_source&
563 2 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
564 : {
565 2 : if(this != &other)
566 : {
567 2 : if(storage_)
568 : {
569 0 : vt_->destroy(source_);
570 0 : ::operator delete(storage_);
571 : }
572 2 : if(cached_awaitable_)
573 0 : ::operator delete(cached_awaitable_);
574 2 : source_ = std::exchange(other.source_, nullptr);
575 2 : vt_ = std::exchange(other.vt_, nullptr);
576 2 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
577 2 : storage_ = std::exchange(other.storage_, nullptr);
578 2 : active_ops_ = std::exchange(other.active_ops_, nullptr);
579 2 : active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
580 : }
581 2 : return *this;
582 : }
583 :
584 : template<BufferSource S>
585 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
586 7 : any_buffer_source::any_buffer_source(S s)
587 7 : : vt_(&vtable_for_impl<S>::value)
588 : {
589 : struct guard {
590 : any_buffer_source* self;
591 : bool committed = false;
592 7 : ~guard() {
593 7 : if(!committed && self->storage_) {
594 0 : self->vt_->destroy(self->source_);
595 0 : ::operator delete(self->storage_);
596 0 : self->storage_ = nullptr;
597 0 : self->source_ = nullptr;
598 : }
599 7 : }
600 7 : } g{this};
601 :
602 7 : storage_ = ::operator new(sizeof(S));
603 7 : source_ = ::new(storage_) S(std::move(s));
604 :
605 7 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
606 :
607 7 : g.committed = true;
608 7 : }
609 :
610 : template<BufferSource S>
611 112 : any_buffer_source::any_buffer_source(S* s)
612 112 : : source_(s)
613 112 : , vt_(&vtable_for_impl<S>::value)
614 : {
615 112 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
616 112 : }
617 :
618 : //----------------------------------------------------------
619 :
620 : inline void
621 45 : any_buffer_source::consume(std::size_t n) noexcept
622 : {
623 45 : vt_->do_consume(source_, n);
624 45 : }
625 :
626 : inline auto
627 110 : any_buffer_source::pull(std::span<const_buffer> dest)
628 : {
629 : struct awaitable
630 : {
631 : any_buffer_source* self_;
632 : std::span<const_buffer> dest_;
633 :
634 : bool
635 110 : await_ready()
636 : {
637 220 : self_->active_ops_ = self_->vt_->construct_awaitable(
638 110 : self_->source_,
639 110 : self_->cached_awaitable_,
640 : dest_);
641 110 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
642 : }
643 :
644 : std::coroutine_handle<>
645 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
646 : {
647 0 : return self_->active_ops_->await_suspend(
648 0 : self_->cached_awaitable_, h, env);
649 : }
650 :
651 : io_result<std::span<const_buffer>>
652 110 : await_resume()
653 : {
654 : struct guard {
655 : any_buffer_source* self;
656 110 : ~guard() {
657 110 : self->active_ops_->destroy(self->cached_awaitable_);
658 110 : self->active_ops_ = nullptr;
659 110 : }
660 110 : } g{self_};
661 110 : return self_->active_ops_->await_resume(
662 195 : self_->cached_awaitable_);
663 110 : }
664 : };
665 110 : return awaitable{this, dest};
666 : }
667 :
668 : //----------------------------------------------------------
669 : // Private helpers for native ReadSource forwarding
670 :
671 : inline auto
672 48 : any_buffer_source::read_some_(
673 : std::span<mutable_buffer const> buffers)
674 : {
675 : struct awaitable
676 : {
677 : any_buffer_source* self_;
678 : std::span<mutable_buffer const> buffers_;
679 :
680 : bool
681 48 : await_ready() const noexcept
682 : {
683 48 : return false;
684 : }
685 :
686 : std::coroutine_handle<>
687 48 : await_suspend(std::coroutine_handle<> h, io_env const* env)
688 : {
689 96 : self_->active_read_ops_ =
690 96 : self_->vt_->construct_read_some_awaitable(
691 48 : self_->source_,
692 48 : self_->cached_awaitable_,
693 : buffers_);
694 :
695 48 : if(self_->active_read_ops_->await_ready(
696 48 : self_->cached_awaitable_))
697 48 : return h;
698 :
699 0 : return self_->active_read_ops_->await_suspend(
700 0 : self_->cached_awaitable_, h, env);
701 : }
702 :
703 : io_result<std::size_t>
704 48 : await_resume()
705 : {
706 : struct guard {
707 : any_buffer_source* self;
708 48 : ~guard() {
709 48 : self->active_read_ops_->destroy(
710 48 : self->cached_awaitable_);
711 48 : self->active_read_ops_ = nullptr;
712 48 : }
713 48 : } g{self_};
714 48 : return self_->active_read_ops_->await_resume(
715 88 : self_->cached_awaitable_);
716 48 : }
717 : };
718 48 : return awaitable{this, buffers};
719 : }
720 :
721 : inline auto
722 18 : any_buffer_source::read_(
723 : std::span<mutable_buffer const> buffers)
724 : {
725 : struct awaitable
726 : {
727 : any_buffer_source* self_;
728 : std::span<mutable_buffer const> buffers_;
729 :
730 : bool
731 18 : await_ready() const noexcept
732 : {
733 18 : return false;
734 : }
735 :
736 : std::coroutine_handle<>
737 18 : await_suspend(std::coroutine_handle<> h, io_env const* env)
738 : {
739 36 : self_->active_read_ops_ =
740 36 : self_->vt_->construct_read_awaitable(
741 18 : self_->source_,
742 18 : self_->cached_awaitable_,
743 : buffers_);
744 :
745 18 : if(self_->active_read_ops_->await_ready(
746 18 : self_->cached_awaitable_))
747 18 : return h;
748 :
749 0 : return self_->active_read_ops_->await_suspend(
750 0 : self_->cached_awaitable_, h, env);
751 : }
752 :
753 : io_result<std::size_t>
754 18 : await_resume()
755 : {
756 : struct guard {
757 : any_buffer_source* self;
758 18 : ~guard() {
759 18 : self->active_read_ops_->destroy(
760 18 : self->cached_awaitable_);
761 18 : self->active_read_ops_ = nullptr;
762 18 : }
763 18 : } g{self_};
764 18 : return self_->active_read_ops_->await_resume(
765 30 : self_->cached_awaitable_);
766 18 : }
767 : };
768 18 : return awaitable{this, buffers};
769 : }
770 :
771 : //----------------------------------------------------------
772 : // Public ReadSource methods
773 :
774 : template<MutableBufferSequence MB>
775 : io_task<std::size_t>
776 58 : any_buffer_source::read_some(MB buffers)
777 : {
778 : buffer_param<MB> bp(buffers);
779 : auto dest = bp.data();
780 : if(dest.empty())
781 : co_return {{}, 0};
782 :
783 : // Native ReadSource path
784 : if(vt_->construct_read_some_awaitable)
785 : co_return co_await read_some_(dest);
786 :
787 : // Synthesized path: pull + buffer_copy + consume
788 : const_buffer arr[detail::max_iovec_];
789 : auto [ec, bufs] = co_await pull(arr);
790 : if(ec)
791 : co_return {ec, 0};
792 :
793 : auto n = buffer_copy(dest, bufs);
794 : consume(n);
795 : co_return {{}, n};
796 116 : }
797 :
798 : template<MutableBufferSequence MB>
799 : io_task<std::size_t>
800 24 : any_buffer_source::read(MB buffers)
801 : {
802 : buffer_param<MB> bp(buffers);
803 : std::size_t total = 0;
804 :
805 : // Native ReadSource path
806 : if(vt_->construct_read_awaitable)
807 : {
808 : for(;;)
809 : {
810 : auto dest = bp.data();
811 : if(dest.empty())
812 : break;
813 :
814 : auto [ec, n] = co_await read_(dest);
815 : total += n;
816 : if(ec)
817 : co_return {ec, total};
818 : bp.consume(n);
819 : }
820 : co_return {{}, total};
821 : }
822 :
823 : // Synthesized path: pull + buffer_copy + consume
824 : for(;;)
825 : {
826 : auto dest = bp.data();
827 : if(dest.empty())
828 : break;
829 :
830 : const_buffer arr[detail::max_iovec_];
831 : auto [ec, bufs] = co_await pull(arr);
832 :
833 : if(ec)
834 : co_return {ec, total};
835 :
836 : auto n = buffer_copy(dest, bufs);
837 : consume(n);
838 : total += n;
839 : bp.consume(n);
840 : }
841 :
842 : co_return {{}, total};
843 48 : }
844 :
845 : //----------------------------------------------------------
846 :
847 : static_assert(BufferSource<any_buffer_source>);
848 : static_assert(ReadSource<any_buffer_source>);
849 :
850 : } // namespace capy
851 : } // namespace boost
852 :
853 : #endif
|