LCOV - code coverage report
Current view: top level - capy/io - write_now.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 90.4 % 73 66
Test Date: 2026-02-12 16:53:28 Functions: 88.6 % 35 31

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
      11              : #define BOOST_CAPY_IO_WRITE_NOW_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/consuming_buffers.hpp>
      17              : #include <boost/capy/concept/io_awaitable.hpp>
      18              : #include <boost/capy/concept/write_stream.hpp>
      19              : #include <coroutine>
      20              : #include <boost/capy/ex/executor_ref.hpp>
      21              : #include <boost/capy/ex/io_env.hpp>
      22              : #include <boost/capy/io_result.hpp>
      23              : 
      24              : #include <cstddef>
      25              : #include <exception>
      26              : #include <new>
      27              : #include <stop_token>
      28              : #include <utility>
      29              : 
      30              : #ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
      31              : # if defined(__GNUC__) && !defined(__clang__)
      32              : #  define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
      33              : # else
      34              : #  define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
      35              : # endif
      36              : #endif
      37              : 
      38              : namespace boost {
      39              : namespace capy {
      40              : 
      41              : /** Eagerly writes complete buffer sequences with frame caching.
      42              : 
      43              :     This class wraps a @ref WriteStream and provides an `operator()`
      44              :     that writes an entire buffer sequence, attempting to complete
      45              :     synchronously. If every `write_some` completes without suspending,
      46              :     the entire operation finishes in `await_ready` with no coroutine
      47              :     suspension.
      48              : 
      49              :     The class maintains a one-element coroutine frame cache. After
      50              :     the first call, subsequent calls reuse the cached frame memory,
      51              :     avoiding repeated allocation for the internal coroutine.
      52              : 
      53              :     @tparam Stream The stream type, must satisfy @ref WriteStream.
      54              : 
      55              :     @par Thread Safety
      56              :     Distinct objects: Safe.
      57              :     Shared objects: Unsafe.
      58              : 
      59              :     @par Preconditions
      60              :     Only one operation may be outstanding at a time. A new call to
      61              :     `operator()` must not be made until the previous operation has
      62              :     completed (i.e., the returned awaitable has been fully consumed).
      63              : 
      64              :     @par Example
      65              : 
      66              :     @code
      67              :     template< WriteStream Stream >
      68              :     task<> send_messages( Stream& stream )
      69              :     {
      70              :         write_now wn( stream );
      71              :         auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
      72              :         if( ec1 )
      73              :             detail::throw_system_error( ec1 );
      74              :         auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
      75              :         if( ec2 )
      76              :             detail::throw_system_error( ec2 );
      77              :     }
      78              :     @endcode
      79              : 
      80              :     @see write, write_some, WriteStream, ConstBufferSequence
      81              : */
      82              : template<class Stream>
      83              :     requires WriteStream<Stream>
      84              : class write_now
      85              : {
      86              :     Stream& stream_;
      87              :     void* cached_frame_ = nullptr;
      88              :     std::size_t cached_size_ = 0;
      89              : 
      90              :     struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
      91              :         op_type
      92              :     {
      93              :         struct promise_type
      94              :         {
      95              :             io_result<std::size_t> result_;
      96              :             std::exception_ptr ep_;
      97              :             std::coroutine_handle<> cont_{nullptr};
      98              :             io_env const* env_ = nullptr;
      99              :             bool done_ = false;
     100              : 
     101           68 :             op_type get_return_object()
     102              :             {
     103              :                 return op_type{
     104              :                     std::coroutine_handle<
     105           68 :                         promise_type>::from_promise(*this)};
     106              :             }
     107              : 
     108           68 :             auto initial_suspend() noexcept
     109              :             {
     110              : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
     111           68 :                 return std::suspend_always{};
     112              : #else
     113              :                 return std::suspend_never{};
     114              : #endif
     115              :             }
     116              : 
     117           68 :             auto final_suspend() noexcept
     118              :             {
     119              :                 struct awaiter
     120              :                 {
     121              :                     promise_type* p_;
     122              : 
     123           68 :                     bool await_ready() const noexcept
     124              :                     {
     125           68 :                         return false;
     126              :                     }
     127              : 
     128           68 :                     std::coroutine_handle<> await_suspend(std::coroutine_handle<>) const noexcept
     129              :                     {
     130           68 :                         p_->done_ = true;
     131           68 :                         if(!p_->cont_)
     132            0 :                             return std::noop_coroutine();
     133           68 :                         return p_->cont_;
     134              :                     }
     135              : 
     136            0 :                     void await_resume() const noexcept
     137              :                     {
     138            0 :                     }
     139              :                 };
     140           68 :                 return awaiter{this};
     141              :             }
     142              : 
     143           46 :             void return_value(
     144              :                 io_result<std::size_t> r) noexcept
     145              :             {
     146           46 :                 result_ = r;
     147           46 :             }
     148              : 
     149           22 :             void unhandled_exception()
     150              :             {
     151           22 :                 ep_ = std::current_exception();
     152           22 :             }
     153              : 
     154              :             std::suspend_always yield_value(int) noexcept
     155              :             {
     156              :                 return {};
     157              :             }
     158              : 
     159              :             template<class A>
     160           84 :             auto await_transform(A&& a)
     161              :             {
     162              :                 using decayed = std::decay_t<A>;
     163              :                 if constexpr (IoAwaitable<decayed>)
     164              :                 {
     165              :                     struct wrapper
     166              :                     {
     167              :                         decayed inner_;
     168              :                         promise_type* p_;
     169              : 
     170           84 :                         bool await_ready()
     171              :                         {
     172           84 :                             return inner_.await_ready();
     173              :                         }
     174              : 
     175            0 :                         std::coroutine_handle<> await_suspend(std::coroutine_handle<> h)
     176              :                         {
     177            0 :                             return detail::call_await_suspend(
     178              :                                 &inner_, h,
     179            0 :                                 p_->env_);
     180              :                         }
     181              : 
     182           84 :                         decltype(auto) await_resume()
     183              :                         {
     184           84 :                             return inner_.await_resume();
     185              :                         }
     186              :                     };
     187              :                     return wrapper{
     188           84 :                         std::forward<A>(a), this};
     189              :                 }
     190              :                 else
     191              :                 {
     192              :                     return std::forward<A>(a);
     193              :                 }
     194              :             }
     195              : 
     196              :             static void*
     197           68 :             operator new(
     198              :                 std::size_t size,
     199              :                 write_now& self,
     200              :                 auto&)
     201              :             {
     202           68 :                 if(self.cached_frame_ &&
     203            4 :                     self.cached_size_ >= size)
     204            4 :                     return self.cached_frame_;
     205           64 :                 void* p = ::operator new(size);
     206           64 :                 if(self.cached_frame_)
     207            0 :                     ::operator delete(self.cached_frame_);
     208           64 :                 self.cached_frame_ = p;
     209           64 :                 self.cached_size_ = size;
     210           64 :                 return p;
     211              :             }
     212              : 
     213              :             static void
     214           68 :             operator delete(void*, std::size_t) noexcept
     215              :             {
     216           68 :             }
     217              :         };
     218              : 
     219              :         std::coroutine_handle<promise_type> h_;
     220              : 
     221          136 :         ~op_type()
     222              :         {
     223          136 :             if(h_)
     224           68 :                 h_.destroy();
     225          136 :         }
     226              : 
     227              :         op_type(op_type const&) = delete;
     228              :         op_type& operator=(op_type const&) = delete;
     229              : 
     230           68 :         op_type(op_type&& other) noexcept
     231           68 :             : h_(std::exchange(other.h_, nullptr))
     232              :         {
     233           68 :         }
     234              : 
     235              :         op_type& operator=(op_type&&) = delete;
     236              : 
     237           68 :         bool await_ready() const noexcept
     238              :         {
     239           68 :             return h_.promise().done_;
     240              :         }
     241              : 
     242           68 :         std::coroutine_handle<> await_suspend(
     243              :             std::coroutine_handle<> cont,
     244              :             io_env const* env)
     245              :         {
     246           68 :             auto& p = h_.promise();
     247           68 :             p.cont_ = cont;
     248           68 :             p.env_ = env;
     249           68 :             return h_;
     250              :         }
     251              : 
     252           68 :         io_result<std::size_t> await_resume()
     253              :         {
     254           68 :             auto& p = h_.promise();
     255           68 :             if(p.ep_)
     256           22 :                 std::rethrow_exception(p.ep_);
     257           46 :             return p.result_;
     258              :         }
     259              : 
     260              :     private:
     261           68 :         explicit op_type(
     262              :             std::coroutine_handle<promise_type> h)
     263           68 :             : h_(h)
     264              :         {
     265           68 :         }
     266              :     };
     267              : 
     268              : public:
     269              :     /** Destructor. Frees the cached coroutine frame. */
     270           64 :     ~write_now()
     271              :     {
     272           64 :         if(cached_frame_)
     273           64 :             ::operator delete(cached_frame_);
     274           64 :     }
     275              : 
     276              :     /** Construct from a stream reference.
     277              : 
     278              :         @param s The stream to write to. Must outlive this object.
     279              :     */
     280              :     explicit
     281           64 :     write_now(Stream& s) noexcept
     282           64 :         : stream_(s)
     283              :     {
     284           64 :     }
     285              : 
     286              :     write_now(write_now const&) = delete;
     287              :     write_now& operator=(write_now const&) = delete;
     288              : 
     289              :     /** Eagerly write the entire buffer sequence.
     290              : 
     291              :         Writes data to the stream by calling `write_some` repeatedly
     292              :         until the entire buffer sequence is written or an error
     293              :         occurs. The operation attempts to complete synchronously:
     294              :         if every `write_some` completes without suspending, the
     295              :         entire operation finishes in `await_ready`.
     296              : 
     297              :         When the fast path cannot complete, the coroutine suspends
     298              :         and continues asynchronously. The internal coroutine frame
     299              :         is cached and reused across calls.
     300              : 
     301              :         @param buffers The buffer sequence to write. Passed by
     302              :             value to ensure the sequence lives in the coroutine
     303              :             frame across suspension points.
     304              : 
     305              :         @return An awaitable yielding `(error_code,std::size_t)`.
     306              :             On success, `n` equals `buffer_size(buffers)`. On
     307              :             error, `n` is the number of bytes written before the
     308              :             error. Compare error codes to conditions:
     309              :             @li `cond::canceled` - Operation was cancelled
     310              :             @li `std::errc::broken_pipe` - Peer closed connection
     311              : 
     312              :         @par Example
     313              : 
     314              :         @code
     315              :         write_now wn( stream );
     316              :         auto [ec, n] = co_await wn( make_buffer( body ) );
     317              :         if( ec )
     318              :             detail::throw_system_error( ec );
     319              :         @endcode
     320              : 
     321              :         @see write, write_some, WriteStream
     322              :     */
     323              : // GCC falsely warns that the coroutine promise's
     324              : // placement operator new(size_t, write_now&, auto&)
     325              : // mismatches operator delete(void*, size_t). Per the
     326              : // standard, coroutine deallocation lookup is separate.
     327              : #if defined(__GNUC__) && !defined(__clang__)
     328              : #pragma GCC diagnostic push
     329              : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
     330              : #endif
     331              : 
     332              : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
     333              :     template<ConstBufferSequence Buffers>
     334              :     op_type
     335           68 :     operator()(Buffers buffers)
     336              :     {
     337              :         std::size_t const total_size = buffer_size(buffers);
     338              :         std::size_t total_written = 0;
     339              :         consuming_buffers cb(buffers);
     340              :         while(total_written < total_size)
     341              :         {
     342              :             auto r =
     343              :                 co_await stream_.write_some(cb);
     344              :             if(r.ec)
     345              :                 co_return io_result<std::size_t>{
     346              :                     r.ec, total_written};
     347              :             cb.consume(r.t1);
     348              :             total_written += r.t1;
     349              :         }
     350              :         co_return io_result<std::size_t>{
     351              :             {}, total_written};
     352          136 :     }
     353              : #else
     354              :     template<ConstBufferSequence Buffers>
     355              :     op_type
     356              :     operator()(Buffers buffers)
     357              :     {
     358              :         std::size_t const total_size = buffer_size(buffers);
     359              :         std::size_t total_written = 0;
     360              : 
     361              :         // GCC ICE in expand_expr_real_1 (expr.cc:11376)
     362              :         // when consuming_buffers spans a co_yield, so
     363              :         // the GCC path uses a separate simple coroutine.
     364              :         consuming_buffers cb(buffers);
     365              :         while(total_written < total_size)
     366              :         {
     367              :             auto inner = stream_.write_some(cb);
     368              :             if(!inner.await_ready())
     369              :                 break;
     370              :             auto r = inner.await_resume();
     371              :             if(r.ec)
     372              :                 co_return io_result<std::size_t>{
     373              :                     r.ec, total_written};
     374              :             cb.consume(r.t1);
     375              :             total_written += r.t1;
     376              :         }
     377              : 
     378              :         if(total_written >= total_size)
     379              :             co_return io_result<std::size_t>{
     380              :                 {}, total_written};
     381              : 
     382              :         co_yield 0;
     383              : 
     384              :         while(total_written < total_size)
     385              :         {
     386              :             auto r =
     387              :                 co_await stream_.write_some(cb);
     388              :             if(r.ec)
     389              :                 co_return io_result<std::size_t>{
     390              :                     r.ec, total_written};
     391              :             cb.consume(r.t1);
     392              :             total_written += r.t1;
     393              :         }
     394              :         co_return io_result<std::size_t>{
     395              :             {}, total_written};
     396              :     }
     397              : #endif
     398              : 
     399              : #if defined(__GNUC__) && !defined(__clang__)
     400              : #pragma GCC diagnostic pop
     401              : #endif
     402              : };
     403              : 
     404              : template<WriteStream S>
     405              : write_now(S&) -> write_now<S>;
     406              : 
     407              : } // namespace capy
     408              : } // namespace boost
     409              : 
     410              : #endif
        

Generated by: LCOV version 2.3