LCOV - code coverage report
Current view: top level - capy/io - any_read_source.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 91.1 % 135 123
Test Date: 2026-02-12 16:53:28 Functions: 82.4 % 51 42

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
      11              : #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_array.hpp>
      17              : #include <boost/capy/buffers/buffer_param.hpp>
      18              : #include <boost/capy/concept/io_awaitable.hpp>
      19              : #include <boost/capy/concept/read_source.hpp>
      20              : #include <boost/capy/ex/io_env.hpp>
      21              : #include <boost/capy/io_result.hpp>
      22              : #include <boost/capy/io_task.hpp>
      23              : 
      24              : #include <concepts>
      25              : #include <coroutine>
      26              : #include <cstddef>
      27              : #include <new>
      28              : #include <span>
      29              : #include <stop_token>
      30              : #include <system_error>
      31              : #include <utility>
      32              : 
      33              : namespace boost {
      34              : namespace capy {
      35              : 
      36              : /** Type-erased wrapper for any ReadSource.
      37              : 
      38              :     This class provides type erasure for any type satisfying the
      39              :     @ref ReadSource concept, enabling runtime polymorphism for
      40              :     source read operations. It uses cached awaitable storage to achieve
      41              :     zero steady-state allocation after construction.
      42              : 
      43              :     The wrapper supports two construction modes:
      44              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      45              :       allocates storage and owns the source.
      46              :     - **Reference**: Pass a pointer to wrap without ownership. The
      47              :       pointed-to source must outlive this wrapper.
      48              : 
      49              :     @par Awaitable Preallocation
      50              :     The constructor preallocates storage for the type-erased awaitable.
      51              :     This reserves all virtual address space at server startup
      52              :     so memory usage can be measured up front, rather than
      53              :     allocating piecemeal as traffic arrives.
      54              : 
      55              :     @par Immediate Completion
      56              :     Operations complete immediately without suspending when the
      57              :     buffer sequence is empty, or when the underlying source's
      58              :     awaitable reports readiness via `await_ready`.
      59              : 
      60              :     @par Thread Safety
      61              :     Not thread-safe. Concurrent operations on the same wrapper
      62              :     are undefined behavior.
      63              : 
      64              :     @par Example
      65              :     @code
      66              :     // Owning - takes ownership of the source
      67              :     any_read_source rs(some_source{args...});
      68              : 
      69              :     // Reference - wraps without ownership
      70              :     some_source source;
      71              :     any_read_source rs(&source);
      72              : 
      73              :     mutable_buffer buf(data, size);
      74              :     auto [ec, n] = co_await rs.read(std::span(&buf, 1));
      75              :     @endcode
      76              : 
      77              :     @see any_read_stream, ReadSource
      78              : */
      79              : class any_read_source
      80              : {
      81              :     struct vtable;
      82              :     struct awaitable_ops;
      83              : 
      84              :     template<ReadSource S>
      85              :     struct vtable_for_impl;
      86              : 
      87              :     void* source_ = nullptr;
      88              :     vtable const* vt_ = nullptr;
      89              :     void* cached_awaitable_ = nullptr;
      90              :     void* storage_ = nullptr;
      91              :     awaitable_ops const* active_ops_ = nullptr;
      92              : 
      93              : public:
      94              :     /** Destructor.
      95              : 
      96              :         Destroys the owned source (if any) and releases the cached
      97              :         awaitable storage.
      98              :     */
      99              :     ~any_read_source();
     100              : 
     101              :     /** Default constructor.
     102              : 
     103              :         Constructs an empty wrapper. Operations on a default-constructed
     104              :         wrapper result in undefined behavior.
     105              :     */
     106              :     any_read_source() = default;
     107              : 
     108              :     /** Non-copyable.
     109              : 
     110              :         The awaitable cache is per-instance and cannot be shared.
     111              :     */
     112              :     any_read_source(any_read_source const&) = delete;
     113              :     any_read_source& operator=(any_read_source const&) = delete;
     114              : 
     115              :     /** Move constructor.
     116              : 
     117              :         Transfers ownership of the wrapped source (if owned) and
     118              :         cached awaitable storage from `other`. After the move, `other` is
     119              :         in a default-constructed state.
     120              : 
     121              :         @param other The wrapper to move from.
     122              :     */
     123            1 :     any_read_source(any_read_source&& other) noexcept
     124            1 :         : source_(std::exchange(other.source_, nullptr))
     125            1 :         , vt_(std::exchange(other.vt_, nullptr))
     126            1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     127            1 :         , storage_(std::exchange(other.storage_, nullptr))
     128            1 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     129              :     {
     130            1 :     }
     131              : 
     132              :     /** Move assignment operator.
     133              : 
     134              :         Destroys any owned source and releases existing resources,
     135              :         then transfers ownership from `other`.
     136              : 
     137              :         @param other The wrapper to move from.
     138              :         @return Reference to this wrapper.
     139              :     */
     140              :     any_read_source&
     141              :     operator=(any_read_source&& other) noexcept;
     142              : 
     143              :     /** Construct by taking ownership of a ReadSource.
     144              : 
     145              :         Allocates storage and moves the source into this wrapper.
     146              :         The wrapper owns the source and will destroy it.
     147              : 
     148              :         @param s The source to take ownership of.
     149              :     */
     150              :     template<ReadSource S>
     151              :         requires (!std::same_as<std::decay_t<S>, any_read_source>)
     152              :     any_read_source(S s);
     153              : 
     154              :     /** Construct by wrapping a ReadSource without ownership.
     155              : 
     156              :         Wraps the given source by pointer. The source must remain
     157              :         valid for the lifetime of this wrapper.
     158              : 
     159              :         @param s Pointer to the source to wrap.
     160              :     */
     161              :     template<ReadSource S>
     162              :     any_read_source(S* s);
     163              : 
     164              :     /** Check if the wrapper contains a valid source.
     165              : 
     166              :         @return `true` if wrapping a source, `false` if default-constructed
     167              :             or moved-from.
     168              :     */
     169              :     bool
     170           27 :     has_value() const noexcept
     171              :     {
     172           27 :         return source_ != nullptr;
     173              :     }
     174              : 
     175              :     /** Check if the wrapper contains a valid source.
     176              : 
     177              :         @return `true` if wrapping a source, `false` if default-constructed
     178              :             or moved-from.
     179              :     */
     180              :     explicit
     181            8 :     operator bool() const noexcept
     182              :     {
     183            8 :         return has_value();
     184              :     }
     185              : 
     186              :     /** Initiate a partial read operation.
     187              : 
     188              :         Reads one or more bytes into the provided buffer sequence.
     189              :         May fill less than the full sequence.
     190              : 
     191              :         @param buffers The buffer sequence to read into.
     192              : 
     193              :         @return An awaitable yielding `(error_code,std::size_t)`.
     194              : 
     195              :         @par Immediate Completion
     196              :         The operation completes immediately without suspending
     197              :         the calling coroutine when:
     198              :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     199              :         @li The underlying source's awaitable reports immediate
     200              :             readiness via `await_ready`.
     201              : 
     202              :         @note This is a partial operation and may not process the
     203              :         entire buffer sequence. Use @ref read for guaranteed
     204              :         complete transfer.
     205              : 
     206              :         @par Preconditions
     207              :         The wrapper must contain a valid source (`has_value() == true`).
     208              :         The caller must not call this function again after a prior
     209              :         call returned an error (including EOF).
     210              :     */
     211              :     template<MutableBufferSequence MB>
     212              :     auto
     213              :     read_some(MB buffers);
     214              : 
     215              :     /** Initiate a complete read operation.
     216              : 
     217              :         Reads data into the provided buffer sequence by forwarding
     218              :         to the underlying source's `read` operation. Large buffer
     219              :         sequences are processed in windows, with each window
     220              :         forwarded as a separate `read` call to the underlying source.
     221              :         The operation completes when the entire buffer sequence is
     222              :         filled, end-of-file is reached, or an error occurs.
     223              : 
     224              :         @param buffers The buffer sequence to read into.
     225              : 
     226              :         @return An awaitable yielding `(error_code,std::size_t)`.
     227              : 
     228              :         @par Immediate Completion
     229              :         The operation completes immediately without suspending
     230              :         the calling coroutine when:
     231              :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     232              :         @li The underlying source's `read` awaitable reports
     233              :             immediate readiness via `await_ready`.
     234              : 
     235              :         @par Postconditions
     236              :         Exactly one of the following is true on return:
     237              :         @li **Success**: `!ec` and `n == buffer_size(buffers)`.
     238              :             The entire buffer was filled.
     239              :         @li **End-of-stream or Error**: `ec` and `n` indicates
     240              :             the number of bytes transferred before the failure.
     241              : 
     242              :         @par Preconditions
     243              :         The wrapper must contain a valid source (`has_value() == true`).
     244              :         The caller must not call this function again after a prior
     245              :         call returned an error (including EOF).
     246              :     */
     247              :     template<MutableBufferSequence MB>
     248              :     io_task<std::size_t>
     249              :     read(MB buffers);
     250              : 
     251              : protected:
     252              :     /** Rebind to a new source after move.
     253              : 
     254              :         Updates the internal pointer to reference a new source object.
     255              :         Used by owning wrappers after move assignment when the owned
     256              :         object has moved to a new location.
     257              : 
     258              :         @param new_source The new source to bind to. Must be the same
     259              :             type as the original source.
     260              : 
     261              :         @note Terminates if called with a source of different type
     262              :             than the original.
     263              :     */
     264              :     template<ReadSource S>
     265              :     void
     266              :     rebind(S& new_source) noexcept
     267              :     {
     268              :         if(vt_ != &vtable_for_impl<S>::value)
     269              :             std::terminate();
     270              :         source_ = &new_source;
     271              :     }
     272              : 
     273              : private:
     274              :     auto
     275              :     read_(std::span<mutable_buffer const> buffers);
     276              : };
     277              : 
     278              : //----------------------------------------------------------
     279              : 
     280              : // ordered by call sequence for cache line coherence
     281              : struct any_read_source::awaitable_ops
     282              : {
     283              :     bool (*await_ready)(void*);
     284              :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     285              :     io_result<std::size_t> (*await_resume)(void*);
     286              :     void (*destroy)(void*) noexcept;
     287              : };
     288              : 
     289              : // ordered by call frequency for cache line coherence
     290              : struct any_read_source::vtable
     291              : {
     292              :     awaitable_ops const* (*construct_read_some_awaitable)(
     293              :         void* source,
     294              :         void* storage,
     295              :         std::span<mutable_buffer const> buffers);
     296              :     awaitable_ops const* (*construct_read_awaitable)(
     297              :         void* source,
     298              :         void* storage,
     299              :         std::span<mutable_buffer const> buffers);
     300              :     std::size_t awaitable_size;
     301              :     std::size_t awaitable_align;
     302              :     void (*destroy)(void*) noexcept;
     303              : };
     304              : 
     305              : template<ReadSource S>
     306              : struct any_read_source::vtable_for_impl
     307              : {
     308              :     using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
     309              :         std::span<mutable_buffer const>{}));
     310              :     using ReadAwaitable = decltype(std::declval<S&>().read(
     311              :         std::span<mutable_buffer const>{}));
     312              : 
     313              :     static void
     314            6 :     do_destroy_impl(void* source) noexcept
     315              :     {
     316            6 :         static_cast<S*>(source)->~S();
     317            6 :     }
     318              : 
     319              :     static awaitable_ops const*
     320           52 :     construct_read_some_awaitable_impl(
     321              :         void* source,
     322              :         void* storage,
     323              :         std::span<mutable_buffer const> buffers)
     324              :     {
     325           52 :         auto& s = *static_cast<S*>(source);
     326           52 :         ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
     327              : 
     328              :         static constexpr awaitable_ops ops = {
     329           52 :             +[](void* p) {
     330           52 :                 return static_cast<ReadSomeAwaitable*>(p)->await_ready();
     331              :             },
     332            2 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     333            2 :                 return detail::call_await_suspend(
     334            2 :                     static_cast<ReadSomeAwaitable*>(p), h, env);
     335              :             },
     336           50 :             +[](void* p) {
     337           50 :                 return static_cast<ReadSomeAwaitable*>(p)->await_resume();
     338              :             },
     339           54 :             +[](void* p) noexcept {
     340            2 :                 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
     341              :             }
     342              :         };
     343           52 :         return &ops;
     344              :     }
     345              : 
     346              :     static awaitable_ops const*
     347          116 :     construct_read_awaitable_impl(
     348              :         void* source,
     349              :         void* storage,
     350              :         std::span<mutable_buffer const> buffers)
     351              :     {
     352          116 :         auto& s = *static_cast<S*>(source);
     353          116 :         ::new(storage) ReadAwaitable(s.read(buffers));
     354              : 
     355              :         static constexpr awaitable_ops ops = {
     356          116 :             +[](void* p) {
     357          116 :                 return static_cast<ReadAwaitable*>(p)->await_ready();
     358              :             },
     359            0 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     360            0 :                 return detail::call_await_suspend(
     361            0 :                     static_cast<ReadAwaitable*>(p), h, env);
     362              :             },
     363          116 :             +[](void* p) {
     364          116 :                 return static_cast<ReadAwaitable*>(p)->await_resume();
     365              :             },
     366          116 :             +[](void* p) noexcept {
     367            0 :                 static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
     368              :             }
     369              :         };
     370          116 :         return &ops;
     371              :     }
     372              : 
     373              :     static constexpr std::size_t max_awaitable_size =
     374              :         sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
     375              :             ? sizeof(ReadSomeAwaitable)
     376              :             : sizeof(ReadAwaitable);
     377              :     static constexpr std::size_t max_awaitable_align =
     378              :         alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
     379              :             ? alignof(ReadSomeAwaitable)
     380              :             : alignof(ReadAwaitable);
     381              : 
     382              :     static constexpr vtable value = {
     383              :         &construct_read_some_awaitable_impl,
     384              :         &construct_read_awaitable_impl,
     385              :         max_awaitable_size,
     386              :         max_awaitable_align,
     387              :         &do_destroy_impl
     388              :     };
     389              : };
     390              : 
     391              : //----------------------------------------------------------
     392              : 
     393              : inline
     394          145 : any_read_source::~any_read_source()
     395              : {
     396          145 :     if(storage_)
     397              :     {
     398            6 :         vt_->destroy(source_);
     399            6 :         ::operator delete(storage_);
     400              :     }
     401          145 :     if(cached_awaitable_)
     402              :     {
     403          139 :         if(active_ops_)
     404            1 :             active_ops_->destroy(cached_awaitable_);
     405          139 :         ::operator delete(cached_awaitable_);
     406              :     }
     407          145 : }
     408              : 
     409              : inline any_read_source&
     410            4 : any_read_source::operator=(any_read_source&& other) noexcept
     411              : {
     412            4 :     if(this != &other)
     413              :     {
     414            3 :         if(storage_)
     415              :         {
     416            0 :             vt_->destroy(source_);
     417            0 :             ::operator delete(storage_);
     418              :         }
     419            3 :         if(cached_awaitable_)
     420              :         {
     421            2 :             if(active_ops_)
     422            1 :                 active_ops_->destroy(cached_awaitable_);
     423            2 :             ::operator delete(cached_awaitable_);
     424              :         }
     425            3 :         source_ = std::exchange(other.source_, nullptr);
     426            3 :         vt_ = std::exchange(other.vt_, nullptr);
     427            3 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     428            3 :         storage_ = std::exchange(other.storage_, nullptr);
     429            3 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     430              :     }
     431            4 :     return *this;
     432              : }
     433              : 
     434              : template<ReadSource S>
     435              :     requires (!std::same_as<std::decay_t<S>, any_read_source>)
     436            6 : any_read_source::any_read_source(S s)
     437            6 :     : vt_(&vtable_for_impl<S>::value)
     438              : {
     439              :     struct guard {
     440              :         any_read_source* self;
     441              :         bool committed = false;
     442            6 :         ~guard() {
     443            6 :             if(!committed && self->storage_) {
     444            0 :                 self->vt_->destroy(self->source_);
     445            0 :                 ::operator delete(self->storage_);
     446            0 :                 self->storage_ = nullptr;
     447            0 :                 self->source_ = nullptr;
     448              :             }
     449            6 :         }
     450            6 :     } g{this};
     451              : 
     452            6 :     storage_ = ::operator new(sizeof(S));
     453            6 :     source_ = ::new(storage_) S(std::move(s));
     454              : 
     455              :     // Preallocate the awaitable storage
     456            6 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     457              : 
     458            6 :     g.committed = true;
     459            6 : }
     460              : 
     461              : template<ReadSource S>
     462          135 : any_read_source::any_read_source(S* s)
     463          135 :     : source_(s)
     464          135 :     , vt_(&vtable_for_impl<S>::value)
     465              : {
     466              :     // Preallocate the awaitable storage
     467          135 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     468          135 : }
     469              : 
     470              : //----------------------------------------------------------
     471              : 
     472              : template<MutableBufferSequence MB>
     473              : auto
     474           54 : any_read_source::read_some(MB buffers)
     475              : {
     476              :     struct awaitable
     477              :     {
     478              :         any_read_source* self_;
     479              :         mutable_buffer_array<detail::max_iovec_> ba_;
     480              : 
     481           54 :         awaitable(any_read_source* self, MB const& buffers)
     482           54 :             : self_(self)
     483           54 :             , ba_(buffers)
     484              :         {
     485           54 :         }
     486              : 
     487              :         bool
     488           54 :         await_ready() const noexcept
     489              :         {
     490           54 :             return ba_.to_span().empty();
     491              :         }
     492              : 
     493              :         std::coroutine_handle<>
     494           52 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     495              :         {
     496           52 :             self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
     497           52 :                 self_->source_,
     498           52 :                 self_->cached_awaitable_,
     499           52 :                 ba_.to_span());
     500              : 
     501           52 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     502           50 :                 return h;
     503              : 
     504            2 :             return self_->active_ops_->await_suspend(
     505            2 :                 self_->cached_awaitable_, h, env);
     506              :         }
     507              : 
     508              :         io_result<std::size_t>
     509           52 :         await_resume()
     510              :         {
     511           52 :             if(ba_.to_span().empty())
     512            2 :                 return {{}, 0};
     513              : 
     514              :             struct guard {
     515              :                 any_read_source* self;
     516           50 :                 ~guard() {
     517           50 :                     self->active_ops_->destroy(self->cached_awaitable_);
     518           50 :                     self->active_ops_ = nullptr;
     519           50 :                 }
     520           50 :             } g{self_};
     521           50 :             return self_->active_ops_->await_resume(
     522           50 :                 self_->cached_awaitable_);
     523           50 :         }
     524              :     };
     525           54 :     return awaitable(this, buffers);
     526              : }
     527              : 
     528              : inline auto
     529          116 : any_read_source::read_(std::span<mutable_buffer const> buffers)
     530              : {
     531              :     struct awaitable
     532              :     {
     533              :         any_read_source* self_;
     534              :         std::span<mutable_buffer const> buffers_;
     535              : 
     536              :         bool
     537          116 :         await_ready() const noexcept
     538              :         {
     539          116 :             return false;
     540              :         }
     541              : 
     542              :         std::coroutine_handle<>
     543          116 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     544              :         {
     545          232 :             self_->active_ops_ = self_->vt_->construct_read_awaitable(
     546          116 :                 self_->source_,
     547          116 :                 self_->cached_awaitable_,
     548              :                 buffers_);
     549              : 
     550          116 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     551          116 :                 return h;
     552              : 
     553            0 :             return self_->active_ops_->await_suspend(
     554            0 :                 self_->cached_awaitable_, h, env);
     555              :         }
     556              : 
     557              :         io_result<std::size_t>
     558          116 :         await_resume()
     559              :         {
     560              :             struct guard {
     561              :                 any_read_source* self;
     562          116 :                 ~guard() {
     563          116 :                     self->active_ops_->destroy(self->cached_awaitable_);
     564          116 :                     self->active_ops_ = nullptr;
     565          116 :                 }
     566          116 :             } g{self_};
     567          116 :             return self_->active_ops_->await_resume(
     568          200 :                 self_->cached_awaitable_);
     569          116 :         }
     570              :     };
     571          116 :     return awaitable{this, buffers};
     572              : }
     573              : 
     574              : template<MutableBufferSequence MB>
     575              : io_task<std::size_t>
     576          110 : any_read_source::read(MB buffers)
     577              : {
     578              :     buffer_param bp(buffers);
     579              :     std::size_t total = 0;
     580              : 
     581              :     for(;;)
     582              :     {
     583              :         auto bufs = bp.data();
     584              :         if(bufs.empty())
     585              :             break;
     586              : 
     587              :         auto [ec, n] = co_await read_(bufs);
     588              :         total += n;
     589              :         if(ec)
     590              :             co_return {ec, total};
     591              :         bp.consume(n);
     592              :     }
     593              : 
     594              :     co_return {{}, total};
     595          220 : }
     596              : 
     597              : } // namespace capy
     598              : } // namespace boost
     599              : 
     600              : #endif
        

Generated by: LCOV version 2.3