1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
21  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/ex/io_env.hpp>
22  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/io_result.hpp>
23  
#include <boost/capy/io_task.hpp>
23  
#include <boost/capy/io_task.hpp>
24  

24  

25  
#include <concepts>
25  
#include <concepts>
26  
#include <coroutine>
26  
#include <coroutine>
27  
#include <cstddef>
27  
#include <cstddef>
28  
#include <exception>
28  
#include <exception>
29  
#include <new>
29  
#include <new>
30  
#include <span>
30  
#include <span>
31  
#include <stop_token>
31  
#include <stop_token>
32  
#include <system_error>
32  
#include <system_error>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
namespace boost {
35  
namespace boost {
36  
namespace capy {
36  
namespace capy {
37  

37  

38  
/** Type-erased wrapper for any BufferSink.
38  
/** Type-erased wrapper for any BufferSink.
39  

39  

40  
    This class provides type erasure for any type satisfying the
40  
    This class provides type erasure for any type satisfying the
41  
    @ref BufferSink concept, enabling runtime polymorphism for
41  
    @ref BufferSink concept, enabling runtime polymorphism for
42  
    buffer sink operations. It uses cached awaitable storage to achieve
42  
    buffer sink operations. It uses cached awaitable storage to achieve
43  
    zero steady-state allocation after construction.
43  
    zero steady-state allocation after construction.
44  

44  

45  
    The wrapper exposes two interfaces for producing data:
45  
    The wrapper exposes two interfaces for producing data:
46  
    the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
46  
    the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47  
    and the @ref WriteSink interface (`write_some`, `write`,
47  
    and the @ref WriteSink interface (`write_some`, `write`,
48  
    `write_eof`). Choose the interface that matches how your data
48  
    `write_eof`). Choose the interface that matches how your data
49  
    is produced:
49  
    is produced:
50  

50  

51  
    @par Choosing an Interface
51  
    @par Choosing an Interface
52  

52  

53  
    Use the **BufferSink** interface when you are a generator that
53  
    Use the **BufferSink** interface when you are a generator that
54  
    produces data into externally-provided buffers. The sink owns
54  
    produces data into externally-provided buffers. The sink owns
55  
    the memory; you call @ref prepare to obtain writable buffers,
55  
    the memory; you call @ref prepare to obtain writable buffers,
56  
    fill them, then call @ref commit or @ref commit_eof.
56  
    fill them, then call @ref commit or @ref commit_eof.
57  

57  

58  
    Use the **WriteSink** interface when you already have buffers
58  
    Use the **WriteSink** interface when you already have buffers
59  
    containing the data to write:
59  
    containing the data to write:
60  
    - If the entire body is available up front, call
60  
    - If the entire body is available up front, call
61  
      @ref write_eof(buffers) to send everything atomically.
61  
      @ref write_eof(buffers) to send everything atomically.
62  
    - If data arrives incrementally, call @ref write or
62  
    - If data arrives incrementally, call @ref write or
63  
      @ref write_some in a loop, then @ref write_eof() when done.
63  
      @ref write_some in a loop, then @ref write_eof() when done.
64  
      Prefer `write` (complete) unless your streaming pattern
64  
      Prefer `write` (complete) unless your streaming pattern
65  
      benefits from partial writes via `write_some`.
65  
      benefits from partial writes via `write_some`.
66  

66  

67  
    If the wrapped type only satisfies @ref BufferSink, the
67  
    If the wrapped type only satisfies @ref BufferSink, the
68  
    @ref WriteSink operations are provided automatically.
68  
    @ref WriteSink operations are provided automatically.
69  

69  

70  
    @par Construction Modes
70  
    @par Construction Modes
71  

71  

72  
    - **Owning**: Pass by value to transfer ownership. The wrapper
72  
    - **Owning**: Pass by value to transfer ownership. The wrapper
73  
      allocates storage and owns the sink.
73  
      allocates storage and owns the sink.
74  
    - **Reference**: Pass a pointer to wrap without ownership. The
74  
    - **Reference**: Pass a pointer to wrap without ownership. The
75  
      pointed-to sink must outlive this wrapper.
75  
      pointed-to sink must outlive this wrapper.
76  

76  

77  
    @par Awaitable Preallocation
77  
    @par Awaitable Preallocation
78  
    The constructor preallocates storage for the type-erased awaitable.
78  
    The constructor preallocates storage for the type-erased awaitable.
79  
    This reserves all virtual address space at server startup
79  
    This reserves all virtual address space at server startup
80  
    so memory usage can be measured up front, rather than
80  
    so memory usage can be measured up front, rather than
81  
    allocating piecemeal as traffic arrives.
81  
    allocating piecemeal as traffic arrives.
82  

82  

83  
    @par Thread Safety
83  
    @par Thread Safety
84  
    Not thread-safe. Concurrent operations on the same wrapper
84  
    Not thread-safe. Concurrent operations on the same wrapper
85  
    are undefined behavior.
85  
    are undefined behavior.
86  

86  

87  
    @par Example
87  
    @par Example
88  
    @code
88  
    @code
89  
    // Owning - takes ownership of the sink
89  
    // Owning - takes ownership of the sink
90  
    any_buffer_sink abs(some_buffer_sink{args...});
90  
    any_buffer_sink abs(some_buffer_sink{args...});
91  

91  

92  
    // Reference - wraps without ownership
92  
    // Reference - wraps without ownership
93  
    some_buffer_sink sink;
93  
    some_buffer_sink sink;
94  
    any_buffer_sink abs(&sink);
94  
    any_buffer_sink abs(&sink);
95  

95  

96  
    // BufferSink interface: generate into callee-owned buffers
96  
    // BufferSink interface: generate into callee-owned buffers
97  
    mutable_buffer arr[16];
97  
    mutable_buffer arr[16];
98  
    auto bufs = abs.prepare(arr);
98  
    auto bufs = abs.prepare(arr);
99  
    // Write data into bufs[0..bufs.size())
99  
    // Write data into bufs[0..bufs.size())
100  
    auto [ec] = co_await abs.commit(bytes_written);
100  
    auto [ec] = co_await abs.commit(bytes_written);
101  
    auto [ec2] = co_await abs.commit_eof(0);
101  
    auto [ec2] = co_await abs.commit_eof(0);
102  

102  

103  
    // WriteSink interface: send caller-owned buffers
103  
    // WriteSink interface: send caller-owned buffers
104  
    auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
104  
    auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105  
    auto [ec4] = co_await abs.write_eof();
105  
    auto [ec4] = co_await abs.write_eof();
106  

106  

107  
    // Or send everything at once
107  
    // Or send everything at once
108  
    auto [ec5, n2] = co_await abs.write_eof(
108  
    auto [ec5, n2] = co_await abs.write_eof(
109  
        make_buffer(body_data));
109  
        make_buffer(body_data));
110  
    @endcode
110  
    @endcode
111  

111  

112  
    @see any_buffer_source, BufferSink, WriteSink
112  
    @see any_buffer_source, BufferSink, WriteSink
113  
*/
113  
*/
114  
class any_buffer_sink
114  
class any_buffer_sink
115  
{
115  
{
116  
    struct vtable;
116  
    struct vtable;
117  
    struct awaitable_ops;
117  
    struct awaitable_ops;
118  
    struct write_awaitable_ops;
118  
    struct write_awaitable_ops;
119  

119  

120  
    template<BufferSink S>
120  
    template<BufferSink S>
121  
    struct vtable_for_impl;
121  
    struct vtable_for_impl;
122  

122  

123  
    // hot-path members first for cache locality
123  
    // hot-path members first for cache locality
124  
    void* sink_ = nullptr;
124  
    void* sink_ = nullptr;
125  
    vtable const* vt_ = nullptr;
125  
    vtable const* vt_ = nullptr;
126  
    void* cached_awaitable_ = nullptr;
126  
    void* cached_awaitable_ = nullptr;
127  
    awaitable_ops const* active_ops_ = nullptr;
127  
    awaitable_ops const* active_ops_ = nullptr;
128  
    write_awaitable_ops const* active_write_ops_ = nullptr;
128  
    write_awaitable_ops const* active_write_ops_ = nullptr;
129  
    void* storage_ = nullptr;
129  
    void* storage_ = nullptr;
130  

130  

131  
public:
131  
public:
132  
    /** Destructor.
132  
    /** Destructor.
133  

133  

134  
        Destroys the owned sink (if any) and releases the cached
134  
        Destroys the owned sink (if any) and releases the cached
135  
        awaitable storage.
135  
        awaitable storage.
136  
    */
136  
    */
137  
    ~any_buffer_sink();
137  
    ~any_buffer_sink();
138  

138  

139  
    /** Default constructor.
139  
    /** Default constructor.
140  

140  

141  
        Constructs an empty wrapper. Operations on a default-constructed
141  
        Constructs an empty wrapper. Operations on a default-constructed
142  
        wrapper result in undefined behavior.
142  
        wrapper result in undefined behavior.
143  
    */
143  
    */
144  
    any_buffer_sink() = default;
144  
    any_buffer_sink() = default;
145  

145  

146  
    /** Non-copyable.
146  
    /** Non-copyable.
147  

147  

148  
        The awaitable cache is per-instance and cannot be shared.
148  
        The awaitable cache is per-instance and cannot be shared.
149  
    */
149  
    */
150  
    any_buffer_sink(any_buffer_sink const&) = delete;
150  
    any_buffer_sink(any_buffer_sink const&) = delete;
151  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
151  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152  

152  

153  
    /** Move constructor.
153  
    /** Move constructor.
154  

154  

155  
        Transfers ownership of the wrapped sink (if owned) and
155  
        Transfers ownership of the wrapped sink (if owned) and
156  
        cached awaitable storage from `other`. After the move, `other` is
156  
        cached awaitable storage from `other`. After the move, `other` is
157  
        in a default-constructed state.
157  
        in a default-constructed state.
158  

158  

159  
        @param other The wrapper to move from.
159  
        @param other The wrapper to move from.
160  
    */
160  
    */
161  
    any_buffer_sink(any_buffer_sink&& other) noexcept
161  
    any_buffer_sink(any_buffer_sink&& other) noexcept
162  
        : sink_(std::exchange(other.sink_, nullptr))
162  
        : sink_(std::exchange(other.sink_, nullptr))
163  
        , vt_(std::exchange(other.vt_, nullptr))
163  
        , vt_(std::exchange(other.vt_, nullptr))
164  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
164  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
165  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
166  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
166  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167  
        , storage_(std::exchange(other.storage_, nullptr))
167  
        , storage_(std::exchange(other.storage_, nullptr))
168  
    {
168  
    {
169  
    }
169  
    }
170  

170  

171  
    /** Move assignment operator.
171  
    /** Move assignment operator.
172  

172  

173  
        Destroys any owned sink and releases existing resources,
173  
        Destroys any owned sink and releases existing resources,
174  
        then transfers ownership from `other`.
174  
        then transfers ownership from `other`.
175  

175  

176  
        @param other The wrapper to move from.
176  
        @param other The wrapper to move from.
177  
        @return Reference to this wrapper.
177  
        @return Reference to this wrapper.
178  
    */
178  
    */
179  
    any_buffer_sink&
179  
    any_buffer_sink&
180  
    operator=(any_buffer_sink&& other) noexcept;
180  
    operator=(any_buffer_sink&& other) noexcept;
181  

181  

182  
    /** Construct by taking ownership of a BufferSink.
182  
    /** Construct by taking ownership of a BufferSink.
183  

183  

184  
        Allocates storage and moves the sink into this wrapper.
184  
        Allocates storage and moves the sink into this wrapper.
185  
        The wrapper owns the sink and will destroy it. If `S` also
185  
        The wrapper owns the sink and will destroy it. If `S` also
186  
        satisfies @ref WriteSink, native write operations are
186  
        satisfies @ref WriteSink, native write operations are
187  
        forwarded through the virtual boundary.
187  
        forwarded through the virtual boundary.
188  

188  

189  
        @param s The sink to take ownership of.
189  
        @param s The sink to take ownership of.
190  
    */
190  
    */
191  
    template<BufferSink S>
191  
    template<BufferSink S>
192  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
192  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193  
    any_buffer_sink(S s);
193  
    any_buffer_sink(S s);
194  

194  

195  
    /** Construct by wrapping a BufferSink without ownership.
195  
    /** Construct by wrapping a BufferSink without ownership.
196  

196  

197  
        Wraps the given sink by pointer. The sink must remain
197  
        Wraps the given sink by pointer. The sink must remain
198  
        valid for the lifetime of this wrapper. If `S` also
198  
        valid for the lifetime of this wrapper. If `S` also
199  
        satisfies @ref WriteSink, native write operations are
199  
        satisfies @ref WriteSink, native write operations are
200  
        forwarded through the virtual boundary.
200  
        forwarded through the virtual boundary.
201  

201  

202  
        @param s Pointer to the sink to wrap.
202  
        @param s Pointer to the sink to wrap.
203  
    */
203  
    */
204  
    template<BufferSink S>
204  
    template<BufferSink S>
205  
    any_buffer_sink(S* s);
205  
    any_buffer_sink(S* s);
206  

206  

207  
    /** Check if the wrapper contains a valid sink.
207  
    /** Check if the wrapper contains a valid sink.
208  

208  

209  
        @return `true` if wrapping a sink, `false` if default-constructed
209  
        @return `true` if wrapping a sink, `false` if default-constructed
210  
            or moved-from.
210  
            or moved-from.
211  
    */
211  
    */
212  
    bool
212  
    bool
213  
    has_value() const noexcept
213  
    has_value() const noexcept
214  
    {
214  
    {
215  
        return sink_ != nullptr;
215  
        return sink_ != nullptr;
216  
    }
216  
    }
217  

217  

218  
    /** Check if the wrapper contains a valid sink.
218  
    /** Check if the wrapper contains a valid sink.
219  

219  

220  
        @return `true` if wrapping a sink, `false` if default-constructed
220  
        @return `true` if wrapping a sink, `false` if default-constructed
221  
            or moved-from.
221  
            or moved-from.
222  
    */
222  
    */
223  
    explicit
223  
    explicit
224  
    operator bool() const noexcept
224  
    operator bool() const noexcept
225  
    {
225  
    {
226  
        return has_value();
226  
        return has_value();
227  
    }
227  
    }
228  

228  

229  
    /** Prepare writable buffers.
229  
    /** Prepare writable buffers.
230  

230  

231  
        Fills the provided span with mutable buffer descriptors
231  
        Fills the provided span with mutable buffer descriptors
232  
        pointing to the underlying sink's internal storage. This
232  
        pointing to the underlying sink's internal storage. This
233  
        operation is synchronous.
233  
        operation is synchronous.
234  

234  

235  
        @param dest Span of mutable_buffer to fill.
235  
        @param dest Span of mutable_buffer to fill.
236  

236  

237  
        @return A span of filled buffers.
237  
        @return A span of filled buffers.
238  

238  

239  
        @par Preconditions
239  
        @par Preconditions
240  
        The wrapper must contain a valid sink (`has_value() == true`).
240  
        The wrapper must contain a valid sink (`has_value() == true`).
241  
    */
241  
    */
242  
    std::span<mutable_buffer>
242  
    std::span<mutable_buffer>
243  
    prepare(std::span<mutable_buffer> dest);
243  
    prepare(std::span<mutable_buffer> dest);
244  

244  

245  
    /** Commit bytes written to the prepared buffers.
245  
    /** Commit bytes written to the prepared buffers.
246  

246  

247  
        Commits `n` bytes written to the buffers returned by the
247  
        Commits `n` bytes written to the buffers returned by the
248  
        most recent call to @ref prepare. The operation may trigger
248  
        most recent call to @ref prepare. The operation may trigger
249  
        underlying I/O.
249  
        underlying I/O.
250  

250  

251  
        @param n The number of bytes to commit.
251  
        @param n The number of bytes to commit.
252  

252  

253  
        @return An awaitable yielding `(error_code)`.
253  
        @return An awaitable yielding `(error_code)`.
254  

254  

255  
        @par Preconditions
255  
        @par Preconditions
256  
        The wrapper must contain a valid sink (`has_value() == true`).
256  
        The wrapper must contain a valid sink (`has_value() == true`).
257  
    */
257  
    */
258  
    auto
258  
    auto
259  
    commit(std::size_t n);
259  
    commit(std::size_t n);
260  

260  

261  
    /** Commit final bytes and signal end-of-stream.
261  
    /** Commit final bytes and signal end-of-stream.
262  

262  

263  
        Commits `n` bytes written to the buffers returned by the
263  
        Commits `n` bytes written to the buffers returned by the
264  
        most recent call to @ref prepare and finalizes the sink.
264  
        most recent call to @ref prepare and finalizes the sink.
265  
        After success, no further operations are permitted.
265  
        After success, no further operations are permitted.
266  

266  

267  
        @param n The number of bytes to commit.
267  
        @param n The number of bytes to commit.
268  

268  

269  
        @return An awaitable yielding `(error_code)`.
269  
        @return An awaitable yielding `(error_code)`.
270  

270  

271  
        @par Preconditions
271  
        @par Preconditions
272  
        The wrapper must contain a valid sink (`has_value() == true`).
272  
        The wrapper must contain a valid sink (`has_value() == true`).
273  
    */
273  
    */
274  
    auto
274  
    auto
275  
    commit_eof(std::size_t n);
275  
    commit_eof(std::size_t n);
276  

276  

277  
    /** Write some data from a buffer sequence.
277  
    /** Write some data from a buffer sequence.
278  

278  

279  
        Writes one or more bytes from the buffer sequence to the
279  
        Writes one or more bytes from the buffer sequence to the
280  
        underlying sink. May consume less than the full sequence.
280  
        underlying sink. May consume less than the full sequence.
281  

281  

282  
        When the wrapped type provides native @ref WriteSink support,
282  
        When the wrapped type provides native @ref WriteSink support,
283  
        the operation forwards directly. Otherwise it is synthesized
283  
        the operation forwards directly. Otherwise it is synthesized
284  
        from @ref prepare and @ref commit with a buffer copy.
284  
        from @ref prepare and @ref commit with a buffer copy.
285  

285  

286  
        @param buffers The buffer sequence to write.
286  
        @param buffers The buffer sequence to write.
287  

287  

288  
        @return An awaitable yielding `(error_code,std::size_t)`.
288  
        @return An awaitable yielding `(error_code,std::size_t)`.
289  

289  

290  
        @par Preconditions
290  
        @par Preconditions
291  
        The wrapper must contain a valid sink (`has_value() == true`).
291  
        The wrapper must contain a valid sink (`has_value() == true`).
292  
    */
292  
    */
293  
    template<ConstBufferSequence CB>
293  
    template<ConstBufferSequence CB>
294  
    io_task<std::size_t>
294  
    io_task<std::size_t>
295  
    write_some(CB buffers);
295  
    write_some(CB buffers);
296  

296  

297  
    /** Write all data from a buffer sequence.
297  
    /** Write all data from a buffer sequence.
298  

298  

299  
        Writes all data from the buffer sequence to the underlying
299  
        Writes all data from the buffer sequence to the underlying
300  
        sink. This method satisfies the @ref WriteSink concept.
300  
        sink. This method satisfies the @ref WriteSink concept.
301  

301  

302  
        When the wrapped type provides native @ref WriteSink support,
302  
        When the wrapped type provides native @ref WriteSink support,
303  
        each window is forwarded directly. Otherwise the data is
303  
        each window is forwarded directly. Otherwise the data is
304  
        copied into the sink via @ref prepare and @ref commit.
304  
        copied into the sink via @ref prepare and @ref commit.
305  

305  

306  
        @param buffers The buffer sequence to write.
306  
        @param buffers The buffer sequence to write.
307  

307  

308  
        @return An awaitable yielding `(error_code,std::size_t)`.
308  
        @return An awaitable yielding `(error_code,std::size_t)`.
309  

309  

310  
        @par Preconditions
310  
        @par Preconditions
311  
        The wrapper must contain a valid sink (`has_value() == true`).
311  
        The wrapper must contain a valid sink (`has_value() == true`).
312  
    */
312  
    */
313  
    template<ConstBufferSequence CB>
313  
    template<ConstBufferSequence CB>
314  
    io_task<std::size_t>
314  
    io_task<std::size_t>
315  
    write(CB buffers);
315  
    write(CB buffers);
316  

316  

317  
    /** Atomically write data and signal end-of-stream.
317  
    /** Atomically write data and signal end-of-stream.
318  

318  

319  
        Writes all data from the buffer sequence to the underlying
319  
        Writes all data from the buffer sequence to the underlying
320  
        sink and then signals end-of-stream.
320  
        sink and then signals end-of-stream.
321  

321  

322  
        When the wrapped type provides native @ref WriteSink support,
322  
        When the wrapped type provides native @ref WriteSink support,
323  
        the final window is sent atomically via the underlying
323  
        the final window is sent atomically via the underlying
324  
        `write_eof(buffers)`. Otherwise the data is synthesized
324  
        `write_eof(buffers)`. Otherwise the data is synthesized
325  
        through @ref prepare, @ref commit, and @ref commit_eof.
325  
        through @ref prepare, @ref commit, and @ref commit_eof.
326  

326  

327  
        @param buffers The buffer sequence to write.
327  
        @param buffers The buffer sequence to write.
328  

328  

329  
        @return An awaitable yielding `(error_code,std::size_t)`.
329  
        @return An awaitable yielding `(error_code,std::size_t)`.
330  

330  

331  
        @par Preconditions
331  
        @par Preconditions
332  
        The wrapper must contain a valid sink (`has_value() == true`).
332  
        The wrapper must contain a valid sink (`has_value() == true`).
333  
    */
333  
    */
334  
    template<ConstBufferSequence CB>
334  
    template<ConstBufferSequence CB>
335  
    io_task<std::size_t>
335  
    io_task<std::size_t>
336  
    write_eof(CB buffers);
336  
    write_eof(CB buffers);
337  

337  

338  
    /** Signal end-of-stream.
338  
    /** Signal end-of-stream.
339  

339  

340  
        Indicates that no more data will be written to the sink.
340  
        Indicates that no more data will be written to the sink.
341  
        This method satisfies the @ref WriteSink concept.
341  
        This method satisfies the @ref WriteSink concept.
342  

342  

343  
        When the wrapped type provides native @ref WriteSink support,
343  
        When the wrapped type provides native @ref WriteSink support,
344  
        the underlying `write_eof()` is called. Otherwise the
344  
        the underlying `write_eof()` is called. Otherwise the
345  
        operation is implemented as `commit_eof(0)`.
345  
        operation is implemented as `commit_eof(0)`.
346  

346  

347  
        @return An awaitable yielding `(error_code)`.
347  
        @return An awaitable yielding `(error_code)`.
348  

348  

349  
        @par Preconditions
349  
        @par Preconditions
350  
        The wrapper must contain a valid sink (`has_value() == true`).
350  
        The wrapper must contain a valid sink (`has_value() == true`).
351  
    */
351  
    */
352  
    auto
352  
    auto
353  
    write_eof();
353  
    write_eof();
354  

354  

355  
protected:
355  
protected:
356  
    /** Rebind to a new sink after move.
356  
    /** Rebind to a new sink after move.
357  

357  

358  
        Updates the internal pointer to reference a new sink object.
358  
        Updates the internal pointer to reference a new sink object.
359  
        Used by owning wrappers after move assignment when the owned
359  
        Used by owning wrappers after move assignment when the owned
360  
        object has moved to a new location.
360  
        object has moved to a new location.
361  

361  

362  
        @param new_sink The new sink to bind to. Must be the same
362  
        @param new_sink The new sink to bind to. Must be the same
363  
            type as the original sink.
363  
            type as the original sink.
364  

364  

365  
        @note Terminates if called with a sink of different type
365  
        @note Terminates if called with a sink of different type
366  
            than the original.
366  
            than the original.
367  
    */
367  
    */
368  
    template<BufferSink S>
368  
    template<BufferSink S>
369  
    void
369  
    void
370  
    rebind(S& new_sink) noexcept
370  
    rebind(S& new_sink) noexcept
371  
    {
371  
    {
372  
        if(vt_ != &vtable_for_impl<S>::value)
372  
        if(vt_ != &vtable_for_impl<S>::value)
373  
            std::terminate();
373  
            std::terminate();
374  
        sink_ = &new_sink;
374  
        sink_ = &new_sink;
375  
    }
375  
    }
376  

376  

377  
private:
377  
private:
378  
    /** Forward a partial write through the vtable.
378  
    /** Forward a partial write through the vtable.
379  

379  

380  
        Constructs the underlying `write_some` awaitable in
380  
        Constructs the underlying `write_some` awaitable in
381  
        cached storage and returns a type-erased awaitable.
381  
        cached storage and returns a type-erased awaitable.
382  
    */
382  
    */
383  
    auto
383  
    auto
384  
    write_some_(std::span<const_buffer const> buffers);
384  
    write_some_(std::span<const_buffer const> buffers);
385  

385  

386  
    /** Forward a complete write through the vtable.
386  
    /** Forward a complete write through the vtable.
387  

387  

388  
        Constructs the underlying `write` awaitable in
388  
        Constructs the underlying `write` awaitable in
389  
        cached storage and returns a type-erased awaitable.
389  
        cached storage and returns a type-erased awaitable.
390  
    */
390  
    */
391  
    auto
391  
    auto
392  
    write_(std::span<const_buffer const> buffers);
392  
    write_(std::span<const_buffer const> buffers);
393  

393  

394  
    /** Forward an atomic write-with-EOF through the vtable.
394  
    /** Forward an atomic write-with-EOF through the vtable.
395  

395  

396  
        Constructs the underlying `write_eof(buffers)` awaitable
396  
        Constructs the underlying `write_eof(buffers)` awaitable
397  
        in cached storage and returns a type-erased awaitable.
397  
        in cached storage and returns a type-erased awaitable.
398  
    */
398  
    */
399  
    auto
399  
    auto
400  
    write_eof_buffers_(std::span<const_buffer const> buffers);
400  
    write_eof_buffers_(std::span<const_buffer const> buffers);
401  
};
401  
};
402  

402  

403  
//----------------------------------------------------------
403  
//----------------------------------------------------------
404  

404  

405  
/** Type-erased ops for awaitables yielding `io_result<>`. */
405  
/** Type-erased ops for awaitables yielding `io_result<>`. */
406  
struct any_buffer_sink::awaitable_ops
406  
struct any_buffer_sink::awaitable_ops
407  
{
407  
{
408  
    bool (*await_ready)(void*);
408  
    bool (*await_ready)(void*);
409  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
409  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
410  
    io_result<> (*await_resume)(void*);
410  
    io_result<> (*await_resume)(void*);
411  
    void (*destroy)(void*) noexcept;
411  
    void (*destroy)(void*) noexcept;
412  
};
412  
};
413  

413  

414  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
414  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
415  
struct any_buffer_sink::write_awaitable_ops
415  
struct any_buffer_sink::write_awaitable_ops
416  
{
416  
{
417  
    bool (*await_ready)(void*);
417  
    bool (*await_ready)(void*);
418  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
418  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
419  
    io_result<std::size_t> (*await_resume)(void*);
419  
    io_result<std::size_t> (*await_resume)(void*);
420  
    void (*destroy)(void*) noexcept;
420  
    void (*destroy)(void*) noexcept;
421  
};
421  
};
422  

422  

423  
struct any_buffer_sink::vtable
423  
struct any_buffer_sink::vtable
424  
{
424  
{
425  
    void (*destroy)(void*) noexcept;
425  
    void (*destroy)(void*) noexcept;
426  
    std::span<mutable_buffer> (*do_prepare)(
426  
    std::span<mutable_buffer> (*do_prepare)(
427  
        void* sink,
427  
        void* sink,
428  
        std::span<mutable_buffer> dest);
428  
        std::span<mutable_buffer> dest);
429  
    std::size_t awaitable_size;
429  
    std::size_t awaitable_size;
430  
    std::size_t awaitable_align;
430  
    std::size_t awaitable_align;
431  
    awaitable_ops const* (*construct_commit_awaitable)(
431  
    awaitable_ops const* (*construct_commit_awaitable)(
432  
        void* sink,
432  
        void* sink,
433  
        void* storage,
433  
        void* storage,
434  
        std::size_t n);
434  
        std::size_t n);
435  
    awaitable_ops const* (*construct_commit_eof_awaitable)(
435  
    awaitable_ops const* (*construct_commit_eof_awaitable)(
436  
        void* sink,
436  
        void* sink,
437  
        void* storage,
437  
        void* storage,
438  
        std::size_t n);
438  
        std::size_t n);
439  

439  

440  
    // WriteSink forwarding (null when wrapped type is BufferSink-only)
440  
    // WriteSink forwarding (null when wrapped type is BufferSink-only)
441  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
441  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
442  
        void* sink,
442  
        void* sink,
443  
        void* storage,
443  
        void* storage,
444  
        std::span<const_buffer const> buffers);
444  
        std::span<const_buffer const> buffers);
445  
    write_awaitable_ops const* (*construct_write_awaitable)(
445  
    write_awaitable_ops const* (*construct_write_awaitable)(
446  
        void* sink,
446  
        void* sink,
447  
        void* storage,
447  
        void* storage,
448  
        std::span<const_buffer const> buffers);
448  
        std::span<const_buffer const> buffers);
449  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
449  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
450  
        void* sink,
450  
        void* sink,
451  
        void* storage,
451  
        void* storage,
452  
        std::span<const_buffer const> buffers);
452  
        std::span<const_buffer const> buffers);
453  
    awaitable_ops const* (*construct_write_eof_awaitable)(
453  
    awaitable_ops const* (*construct_write_eof_awaitable)(
454  
        void* sink,
454  
        void* sink,
455  
        void* storage);
455  
        void* storage);
456  
};
456  
};
457  

457  

458  
template<BufferSink S>
458  
template<BufferSink S>
459  
struct any_buffer_sink::vtable_for_impl
459  
struct any_buffer_sink::vtable_for_impl
460  
{
460  
{
461  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
461  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
462  
        std::size_t{}));
462  
        std::size_t{}));
463  
    using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
463  
    using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
464  
        std::size_t{}));
464  
        std::size_t{}));
465  

465  

466  
    static void
466  
    static void
467  
    do_destroy_impl(void* sink) noexcept
467  
    do_destroy_impl(void* sink) noexcept
468  
    {
468  
    {
469  
        static_cast<S*>(sink)->~S();
469  
        static_cast<S*>(sink)->~S();
470  
    }
470  
    }
471  

471  

472  
    static std::span<mutable_buffer>
472  
    static std::span<mutable_buffer>
473  
    do_prepare_impl(
473  
    do_prepare_impl(
474  
        void* sink,
474  
        void* sink,
475  
        std::span<mutable_buffer> dest)
475  
        std::span<mutable_buffer> dest)
476  
    {
476  
    {
477  
        auto& s = *static_cast<S*>(sink);
477  
        auto& s = *static_cast<S*>(sink);
478  
        return s.prepare(dest);
478  
        return s.prepare(dest);
479  
    }
479  
    }
480  

480  

481  
    static awaitable_ops const*
481  
    static awaitable_ops const*
482  
    construct_commit_awaitable_impl(
482  
    construct_commit_awaitable_impl(
483  
        void* sink,
483  
        void* sink,
484  
        void* storage,
484  
        void* storage,
485  
        std::size_t n)
485  
        std::size_t n)
486  
    {
486  
    {
487  
        auto& s = *static_cast<S*>(sink);
487  
        auto& s = *static_cast<S*>(sink);
488  
        ::new(storage) CommitAwaitable(s.commit(n));
488  
        ::new(storage) CommitAwaitable(s.commit(n));
489  

489  

490  
        static constexpr awaitable_ops ops = {
490  
        static constexpr awaitable_ops ops = {
491  
            +[](void* p) {
491  
            +[](void* p) {
492  
                return static_cast<CommitAwaitable*>(p)->await_ready();
492  
                return static_cast<CommitAwaitable*>(p)->await_ready();
493  
            },
493  
            },
494  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
494  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
495  
                return detail::call_await_suspend(
495  
                return detail::call_await_suspend(
496  
                    static_cast<CommitAwaitable*>(p), h, env);
496  
                    static_cast<CommitAwaitable*>(p), h, env);
497  
            },
497  
            },
498  
            +[](void* p) {
498  
            +[](void* p) {
499  
                return static_cast<CommitAwaitable*>(p)->await_resume();
499  
                return static_cast<CommitAwaitable*>(p)->await_resume();
500  
            },
500  
            },
501  
            +[](void* p) noexcept {
501  
            +[](void* p) noexcept {
502  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
502  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
503  
            }
503  
            }
504  
        };
504  
        };
505  
        return &ops;
505  
        return &ops;
506  
    }
506  
    }
507  

507  

508  
    static awaitable_ops const*
508  
    static awaitable_ops const*
509  
    construct_commit_eof_awaitable_impl(
509  
    construct_commit_eof_awaitable_impl(
510  
        void* sink,
510  
        void* sink,
511  
        void* storage,
511  
        void* storage,
512  
        std::size_t n)
512  
        std::size_t n)
513  
    {
513  
    {
514  
        auto& s = *static_cast<S*>(sink);
514  
        auto& s = *static_cast<S*>(sink);
515  
        ::new(storage) CommitEofAwaitable(s.commit_eof(n));
515  
        ::new(storage) CommitEofAwaitable(s.commit_eof(n));
516  

516  

517  
        static constexpr awaitable_ops ops = {
517  
        static constexpr awaitable_ops ops = {
518  
            +[](void* p) {
518  
            +[](void* p) {
519  
                return static_cast<CommitEofAwaitable*>(p)->await_ready();
519  
                return static_cast<CommitEofAwaitable*>(p)->await_ready();
520  
            },
520  
            },
521  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
521  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
522  
                return detail::call_await_suspend(
522  
                return detail::call_await_suspend(
523  
                    static_cast<CommitEofAwaitable*>(p), h, env);
523  
                    static_cast<CommitEofAwaitable*>(p), h, env);
524  
            },
524  
            },
525  
            +[](void* p) {
525  
            +[](void* p) {
526  
                return static_cast<CommitEofAwaitable*>(p)->await_resume();
526  
                return static_cast<CommitEofAwaitable*>(p)->await_resume();
527  
            },
527  
            },
528  
            +[](void* p) noexcept {
528  
            +[](void* p) noexcept {
529  
                static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
529  
                static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
530  
            }
530  
            }
531  
        };
531  
        };
532  
        return &ops;
532  
        return &ops;
533  
    }
533  
    }
534  

534  

535  
    //------------------------------------------------------
535  
    //------------------------------------------------------
536  
    // WriteSink forwarding (only instantiated when WriteSink<S>)
536  
    // WriteSink forwarding (only instantiated when WriteSink<S>)
537  

537  

538  
    static write_awaitable_ops const*
538  
    static write_awaitable_ops const*
539  
    construct_write_some_awaitable_impl(
539  
    construct_write_some_awaitable_impl(
540  
        void* sink,
540  
        void* sink,
541  
        void* storage,
541  
        void* storage,
542  
        std::span<const_buffer const> buffers)
542  
        std::span<const_buffer const> buffers)
543  
        requires WriteSink<S>
543  
        requires WriteSink<S>
544  
    {
544  
    {
545  
        using Aw = decltype(std::declval<S&>().write_some(
545  
        using Aw = decltype(std::declval<S&>().write_some(
546  
            std::span<const_buffer const>{}));
546  
            std::span<const_buffer const>{}));
547  
        auto& s = *static_cast<S*>(sink);
547  
        auto& s = *static_cast<S*>(sink);
548  
        ::new(storage) Aw(s.write_some(buffers));
548  
        ::new(storage) Aw(s.write_some(buffers));
549  

549  

550  
        static constexpr write_awaitable_ops ops = {
550  
        static constexpr write_awaitable_ops ops = {
551  
            +[](void* p) {
551  
            +[](void* p) {
552  
                return static_cast<Aw*>(p)->await_ready();
552  
                return static_cast<Aw*>(p)->await_ready();
553  
            },
553  
            },
554  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
554  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
555  
                return detail::call_await_suspend(
555  
                return detail::call_await_suspend(
556  
                    static_cast<Aw*>(p), h, env);
556  
                    static_cast<Aw*>(p), h, env);
557  
            },
557  
            },
558  
            +[](void* p) {
558  
            +[](void* p) {
559  
                return static_cast<Aw*>(p)->await_resume();
559  
                return static_cast<Aw*>(p)->await_resume();
560  
            },
560  
            },
561  
            +[](void* p) noexcept {
561  
            +[](void* p) noexcept {
562  
                static_cast<Aw*>(p)->~Aw();
562  
                static_cast<Aw*>(p)->~Aw();
563  
            }
563  
            }
564  
        };
564  
        };
565  
        return &ops;
565  
        return &ops;
566  
    }
566  
    }
567  

567  

568  
    static write_awaitable_ops const*
568  
    static write_awaitable_ops const*
569  
    construct_write_awaitable_impl(
569  
    construct_write_awaitable_impl(
570  
        void* sink,
570  
        void* sink,
571  
        void* storage,
571  
        void* storage,
572  
        std::span<const_buffer const> buffers)
572  
        std::span<const_buffer const> buffers)
573  
        requires WriteSink<S>
573  
        requires WriteSink<S>
574  
    {
574  
    {
575  
        using Aw = decltype(std::declval<S&>().write(
575  
        using Aw = decltype(std::declval<S&>().write(
576  
            std::span<const_buffer const>{}));
576  
            std::span<const_buffer const>{}));
577  
        auto& s = *static_cast<S*>(sink);
577  
        auto& s = *static_cast<S*>(sink);
578  
        ::new(storage) Aw(s.write(buffers));
578  
        ::new(storage) Aw(s.write(buffers));
579  

579  

580  
        static constexpr write_awaitable_ops ops = {
580  
        static constexpr write_awaitable_ops ops = {
581  
            +[](void* p) {
581  
            +[](void* p) {
582  
                return static_cast<Aw*>(p)->await_ready();
582  
                return static_cast<Aw*>(p)->await_ready();
583  
            },
583  
            },
584  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
584  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
585  
                return detail::call_await_suspend(
585  
                return detail::call_await_suspend(
586  
                    static_cast<Aw*>(p), h, env);
586  
                    static_cast<Aw*>(p), h, env);
587  
            },
587  
            },
588  
            +[](void* p) {
588  
            +[](void* p) {
589  
                return static_cast<Aw*>(p)->await_resume();
589  
                return static_cast<Aw*>(p)->await_resume();
590  
            },
590  
            },
591  
            +[](void* p) noexcept {
591  
            +[](void* p) noexcept {
592  
                static_cast<Aw*>(p)->~Aw();
592  
                static_cast<Aw*>(p)->~Aw();
593  
            }
593  
            }
594  
        };
594  
        };
595  
        return &ops;
595  
        return &ops;
596  
    }
596  
    }
597  

597  

598  
    static write_awaitable_ops const*
598  
    static write_awaitable_ops const*
599  
    construct_write_eof_buffers_awaitable_impl(
599  
    construct_write_eof_buffers_awaitable_impl(
600  
        void* sink,
600  
        void* sink,
601  
        void* storage,
601  
        void* storage,
602  
        std::span<const_buffer const> buffers)
602  
        std::span<const_buffer const> buffers)
603  
        requires WriteSink<S>
603  
        requires WriteSink<S>
604  
    {
604  
    {
605  
        using Aw = decltype(std::declval<S&>().write_eof(
605  
        using Aw = decltype(std::declval<S&>().write_eof(
606  
            std::span<const_buffer const>{}));
606  
            std::span<const_buffer const>{}));
607  
        auto& s = *static_cast<S*>(sink);
607  
        auto& s = *static_cast<S*>(sink);
608  
        ::new(storage) Aw(s.write_eof(buffers));
608  
        ::new(storage) Aw(s.write_eof(buffers));
609  

609  

610  
        static constexpr write_awaitable_ops ops = {
610  
        static constexpr write_awaitable_ops ops = {
611  
            +[](void* p) {
611  
            +[](void* p) {
612  
                return static_cast<Aw*>(p)->await_ready();
612  
                return static_cast<Aw*>(p)->await_ready();
613  
            },
613  
            },
614  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
614  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
615  
                return detail::call_await_suspend(
615  
                return detail::call_await_suspend(
616  
                    static_cast<Aw*>(p), h, env);
616  
                    static_cast<Aw*>(p), h, env);
617  
            },
617  
            },
618  
            +[](void* p) {
618  
            +[](void* p) {
619  
                return static_cast<Aw*>(p)->await_resume();
619  
                return static_cast<Aw*>(p)->await_resume();
620  
            },
620  
            },
621  
            +[](void* p) noexcept {
621  
            +[](void* p) noexcept {
622  
                static_cast<Aw*>(p)->~Aw();
622  
                static_cast<Aw*>(p)->~Aw();
623  
            }
623  
            }
624  
        };
624  
        };
625  
        return &ops;
625  
        return &ops;
626  
    }
626  
    }
627  

627  

628  
    static awaitable_ops const*
628  
    static awaitable_ops const*
629  
    construct_write_eof_awaitable_impl(
629  
    construct_write_eof_awaitable_impl(
630  
        void* sink,
630  
        void* sink,
631  
        void* storage)
631  
        void* storage)
632  
        requires WriteSink<S>
632  
        requires WriteSink<S>
633  
    {
633  
    {
634  
        using Aw = decltype(std::declval<S&>().write_eof());
634  
        using Aw = decltype(std::declval<S&>().write_eof());
635  
        auto& s = *static_cast<S*>(sink);
635  
        auto& s = *static_cast<S*>(sink);
636  
        ::new(storage) Aw(s.write_eof());
636  
        ::new(storage) Aw(s.write_eof());
637  

637  

638  
        static constexpr awaitable_ops ops = {
638  
        static constexpr awaitable_ops ops = {
639  
            +[](void* p) {
639  
            +[](void* p) {
640  
                return static_cast<Aw*>(p)->await_ready();
640  
                return static_cast<Aw*>(p)->await_ready();
641  
            },
641  
            },
642  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
642  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
643  
                return detail::call_await_suspend(
643  
                return detail::call_await_suspend(
644  
                    static_cast<Aw*>(p), h, env);
644  
                    static_cast<Aw*>(p), h, env);
645  
            },
645  
            },
646  
            +[](void* p) {
646  
            +[](void* p) {
647  
                return static_cast<Aw*>(p)->await_resume();
647  
                return static_cast<Aw*>(p)->await_resume();
648  
            },
648  
            },
649  
            +[](void* p) noexcept {
649  
            +[](void* p) noexcept {
650  
                static_cast<Aw*>(p)->~Aw();
650  
                static_cast<Aw*>(p)->~Aw();
651  
            }
651  
            }
652  
        };
652  
        };
653  
        return &ops;
653  
        return &ops;
654  
    }
654  
    }
655  

655  

656  
    //------------------------------------------------------
656  
    //------------------------------------------------------
657  

657  

658  
    static consteval std::size_t
658  
    static consteval std::size_t
659  
    compute_max_size() noexcept
659  
    compute_max_size() noexcept
660  
    {
660  
    {
661  
        std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
661  
        std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
662  
            ? sizeof(CommitAwaitable)
662  
            ? sizeof(CommitAwaitable)
663  
            : sizeof(CommitEofAwaitable);
663  
            : sizeof(CommitEofAwaitable);
664  
        if constexpr (WriteSink<S>)
664  
        if constexpr (WriteSink<S>)
665  
        {
665  
        {
666  
            using WS = decltype(std::declval<S&>().write_some(
666  
            using WS = decltype(std::declval<S&>().write_some(
667  
                std::span<const_buffer const>{}));
667  
                std::span<const_buffer const>{}));
668  
            using W = decltype(std::declval<S&>().write(
668  
            using W = decltype(std::declval<S&>().write(
669  
                std::span<const_buffer const>{}));
669  
                std::span<const_buffer const>{}));
670  
            using WEB = decltype(std::declval<S&>().write_eof(
670  
            using WEB = decltype(std::declval<S&>().write_eof(
671  
                std::span<const_buffer const>{}));
671  
                std::span<const_buffer const>{}));
672  
            using WE = decltype(std::declval<S&>().write_eof());
672  
            using WE = decltype(std::declval<S&>().write_eof());
673  

673  

674  
            if(sizeof(WS) > s) s = sizeof(WS);
674  
            if(sizeof(WS) > s) s = sizeof(WS);
675  
            if(sizeof(W) > s) s = sizeof(W);
675  
            if(sizeof(W) > s) s = sizeof(W);
676  
            if(sizeof(WEB) > s) s = sizeof(WEB);
676  
            if(sizeof(WEB) > s) s = sizeof(WEB);
677  
            if(sizeof(WE) > s) s = sizeof(WE);
677  
            if(sizeof(WE) > s) s = sizeof(WE);
678  
        }
678  
        }
679  
        return s;
679  
        return s;
680  
    }
680  
    }
681  

681  

682  
    static consteval std::size_t
682  
    static consteval std::size_t
683  
    compute_max_align() noexcept
683  
    compute_max_align() noexcept
684  
    {
684  
    {
685  
        std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
685  
        std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
686  
            ? alignof(CommitAwaitable)
686  
            ? alignof(CommitAwaitable)
687  
            : alignof(CommitEofAwaitable);
687  
            : alignof(CommitEofAwaitable);
688  
        if constexpr (WriteSink<S>)
688  
        if constexpr (WriteSink<S>)
689  
        {
689  
        {
690  
            using WS = decltype(std::declval<S&>().write_some(
690  
            using WS = decltype(std::declval<S&>().write_some(
691  
                std::span<const_buffer const>{}));
691  
                std::span<const_buffer const>{}));
692  
            using W = decltype(std::declval<S&>().write(
692  
            using W = decltype(std::declval<S&>().write(
693  
                std::span<const_buffer const>{}));
693  
                std::span<const_buffer const>{}));
694  
            using WEB = decltype(std::declval<S&>().write_eof(
694  
            using WEB = decltype(std::declval<S&>().write_eof(
695  
                std::span<const_buffer const>{}));
695  
                std::span<const_buffer const>{}));
696  
            using WE = decltype(std::declval<S&>().write_eof());
696  
            using WE = decltype(std::declval<S&>().write_eof());
697  

697  

698  
            if(alignof(WS) > a) a = alignof(WS);
698  
            if(alignof(WS) > a) a = alignof(WS);
699  
            if(alignof(W) > a) a = alignof(W);
699  
            if(alignof(W) > a) a = alignof(W);
700  
            if(alignof(WEB) > a) a = alignof(WEB);
700  
            if(alignof(WEB) > a) a = alignof(WEB);
701  
            if(alignof(WE) > a) a = alignof(WE);
701  
            if(alignof(WE) > a) a = alignof(WE);
702  
        }
702  
        }
703  
        return a;
703  
        return a;
704  
    }
704  
    }
705  

705  

706  
    static consteval vtable
706  
    static consteval vtable
707  
    make_vtable() noexcept
707  
    make_vtable() noexcept
708  
    {
708  
    {
709  
        vtable v{};
709  
        vtable v{};
710  
        v.destroy = &do_destroy_impl;
710  
        v.destroy = &do_destroy_impl;
711  
        v.do_prepare = &do_prepare_impl;
711  
        v.do_prepare = &do_prepare_impl;
712  
        v.awaitable_size = compute_max_size();
712  
        v.awaitable_size = compute_max_size();
713  
        v.awaitable_align = compute_max_align();
713  
        v.awaitable_align = compute_max_align();
714  
        v.construct_commit_awaitable = &construct_commit_awaitable_impl;
714  
        v.construct_commit_awaitable = &construct_commit_awaitable_impl;
715  
        v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
715  
        v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
716  
        v.construct_write_some_awaitable = nullptr;
716  
        v.construct_write_some_awaitable = nullptr;
717  
        v.construct_write_awaitable = nullptr;
717  
        v.construct_write_awaitable = nullptr;
718  
        v.construct_write_eof_buffers_awaitable = nullptr;
718  
        v.construct_write_eof_buffers_awaitable = nullptr;
719  
        v.construct_write_eof_awaitable = nullptr;
719  
        v.construct_write_eof_awaitable = nullptr;
720  

720  

721  
        if constexpr (WriteSink<S>)
721  
        if constexpr (WriteSink<S>)
722  
        {
722  
        {
723  
            v.construct_write_some_awaitable =
723  
            v.construct_write_some_awaitable =
724  
                &construct_write_some_awaitable_impl;
724  
                &construct_write_some_awaitable_impl;
725  
            v.construct_write_awaitable =
725  
            v.construct_write_awaitable =
726  
                &construct_write_awaitable_impl;
726  
                &construct_write_awaitable_impl;
727  
            v.construct_write_eof_buffers_awaitable =
727  
            v.construct_write_eof_buffers_awaitable =
728  
                &construct_write_eof_buffers_awaitable_impl;
728  
                &construct_write_eof_buffers_awaitable_impl;
729  
            v.construct_write_eof_awaitable =
729  
            v.construct_write_eof_awaitable =
730  
                &construct_write_eof_awaitable_impl;
730  
                &construct_write_eof_awaitable_impl;
731  
        }
731  
        }
732  
        return v;
732  
        return v;
733  
    }
733  
    }
734  

734  

735  
    static constexpr vtable value = make_vtable();
735  
    static constexpr vtable value = make_vtable();
736  
};
736  
};
737  

737  

738  
//----------------------------------------------------------
738  
//----------------------------------------------------------
739  

739  

740  
inline
740  
inline
741  
any_buffer_sink::~any_buffer_sink()
741  
any_buffer_sink::~any_buffer_sink()
742  
{
742  
{
743  
    if(storage_)
743  
    if(storage_)
744  
    {
744  
    {
745  
        vt_->destroy(sink_);
745  
        vt_->destroy(sink_);
746  
        ::operator delete(storage_);
746  
        ::operator delete(storage_);
747  
    }
747  
    }
748  
    if(cached_awaitable_)
748  
    if(cached_awaitable_)
749  
        ::operator delete(cached_awaitable_);
749  
        ::operator delete(cached_awaitable_);
750  
}
750  
}
751  

751  

752  
inline any_buffer_sink&
752  
inline any_buffer_sink&
753  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
753  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
754  
{
754  
{
755  
    if(this != &other)
755  
    if(this != &other)
756  
    {
756  
    {
757  
        if(storage_)
757  
        if(storage_)
758  
        {
758  
        {
759  
            vt_->destroy(sink_);
759  
            vt_->destroy(sink_);
760  
            ::operator delete(storage_);
760  
            ::operator delete(storage_);
761  
        }
761  
        }
762  
        if(cached_awaitable_)
762  
        if(cached_awaitable_)
763  
            ::operator delete(cached_awaitable_);
763  
            ::operator delete(cached_awaitable_);
764  
        sink_ = std::exchange(other.sink_, nullptr);
764  
        sink_ = std::exchange(other.sink_, nullptr);
765  
        vt_ = std::exchange(other.vt_, nullptr);
765  
        vt_ = std::exchange(other.vt_, nullptr);
766  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
766  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
767  
        storage_ = std::exchange(other.storage_, nullptr);
767  
        storage_ = std::exchange(other.storage_, nullptr);
768  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
768  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
769  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
769  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
770  
    }
770  
    }
771  
    return *this;
771  
    return *this;
772  
}
772  
}
773  

773  

774  
template<BufferSink S>
774  
template<BufferSink S>
775  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
775  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
776  
any_buffer_sink::any_buffer_sink(S s)
776  
any_buffer_sink::any_buffer_sink(S s)
777  
    : vt_(&vtable_for_impl<S>::value)
777  
    : vt_(&vtable_for_impl<S>::value)
778  
{
778  
{
779  
    struct guard {
779  
    struct guard {
780  
        any_buffer_sink* self;
780  
        any_buffer_sink* self;
781  
        bool committed = false;
781  
        bool committed = false;
782  
        ~guard() {
782  
        ~guard() {
783  
            if(!committed && self->storage_) {
783  
            if(!committed && self->storage_) {
784  
                self->vt_->destroy(self->sink_);
784  
                self->vt_->destroy(self->sink_);
785  
                ::operator delete(self->storage_);
785  
                ::operator delete(self->storage_);
786  
                self->storage_ = nullptr;
786  
                self->storage_ = nullptr;
787  
                self->sink_ = nullptr;
787  
                self->sink_ = nullptr;
788  
            }
788  
            }
789  
        }
789  
        }
790  
    } g{this};
790  
    } g{this};
791  

791  

792  
    storage_ = ::operator new(sizeof(S));
792  
    storage_ = ::operator new(sizeof(S));
793  
    sink_ = ::new(storage_) S(std::move(s));
793  
    sink_ = ::new(storage_) S(std::move(s));
794  

794  

795  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
795  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
796  

796  

797  
    g.committed = true;
797  
    g.committed = true;
798  
}
798  
}
799  

799  

800  
template<BufferSink S>
800  
template<BufferSink S>
801  
any_buffer_sink::any_buffer_sink(S* s)
801  
any_buffer_sink::any_buffer_sink(S* s)
802  
    : sink_(s)
802  
    : sink_(s)
803  
    , vt_(&vtable_for_impl<S>::value)
803  
    , vt_(&vtable_for_impl<S>::value)
804  
{
804  
{
805  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
805  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
806  
}
806  
}
807  

807  

808  
//----------------------------------------------------------
808  
//----------------------------------------------------------
809  

809  

810  
inline std::span<mutable_buffer>
810  
inline std::span<mutable_buffer>
811  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
811  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
812  
{
812  
{
813  
    return vt_->do_prepare(sink_, dest);
813  
    return vt_->do_prepare(sink_, dest);
814  
}
814  
}
815  

815  

816  
inline auto
816  
inline auto
817  
any_buffer_sink::commit(std::size_t n)
817  
any_buffer_sink::commit(std::size_t n)
818  
{
818  
{
819  
    struct awaitable
819  
    struct awaitable
820  
    {
820  
    {
821  
        any_buffer_sink* self_;
821  
        any_buffer_sink* self_;
822  
        std::size_t n_;
822  
        std::size_t n_;
823  

823  

824  
        bool
824  
        bool
825  
        await_ready()
825  
        await_ready()
826  
        {
826  
        {
827  
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
827  
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
828  
                self_->sink_,
828  
                self_->sink_,
829  
                self_->cached_awaitable_,
829  
                self_->cached_awaitable_,
830  
                n_);
830  
                n_);
831  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
831  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
832  
        }
832  
        }
833  

833  

834  
        std::coroutine_handle<>
834  
        std::coroutine_handle<>
835  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
835  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
836  
        {
836  
        {
837  
            return self_->active_ops_->await_suspend(
837  
            return self_->active_ops_->await_suspend(
838  
                self_->cached_awaitable_, h, env);
838  
                self_->cached_awaitable_, h, env);
839  
        }
839  
        }
840  

840  

841  
        io_result<>
841  
        io_result<>
842  
        await_resume()
842  
        await_resume()
843  
        {
843  
        {
844  
            struct guard {
844  
            struct guard {
845  
                any_buffer_sink* self;
845  
                any_buffer_sink* self;
846  
                ~guard() {
846  
                ~guard() {
847  
                    self->active_ops_->destroy(self->cached_awaitable_);
847  
                    self->active_ops_->destroy(self->cached_awaitable_);
848  
                    self->active_ops_ = nullptr;
848  
                    self->active_ops_ = nullptr;
849  
                }
849  
                }
850  
            } g{self_};
850  
            } g{self_};
851  
            return self_->active_ops_->await_resume(
851  
            return self_->active_ops_->await_resume(
852  
                self_->cached_awaitable_);
852  
                self_->cached_awaitable_);
853  
        }
853  
        }
854  
    };
854  
    };
855  
    return awaitable{this, n};
855  
    return awaitable{this, n};
856  
}
856  
}
857  

857  

858  
inline auto
858  
inline auto
859  
any_buffer_sink::commit_eof(std::size_t n)
859  
any_buffer_sink::commit_eof(std::size_t n)
860  
{
860  
{
861  
    struct awaitable
861  
    struct awaitable
862  
    {
862  
    {
863  
        any_buffer_sink* self_;
863  
        any_buffer_sink* self_;
864  
        std::size_t n_;
864  
        std::size_t n_;
865  

865  

866  
        bool
866  
        bool
867  
        await_ready()
867  
        await_ready()
868  
        {
868  
        {
869  
            self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
869  
            self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
870  
                self_->sink_,
870  
                self_->sink_,
871  
                self_->cached_awaitable_,
871  
                self_->cached_awaitable_,
872  
                n_);
872  
                n_);
873  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
873  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
874  
        }
874  
        }
875  

875  

876  
        std::coroutine_handle<>
876  
        std::coroutine_handle<>
877  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
877  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
878  
        {
878  
        {
879  
            return self_->active_ops_->await_suspend(
879  
            return self_->active_ops_->await_suspend(
880  
                self_->cached_awaitable_, h, env);
880  
                self_->cached_awaitable_, h, env);
881  
        }
881  
        }
882  

882  

883  
        io_result<>
883  
        io_result<>
884  
        await_resume()
884  
        await_resume()
885  
        {
885  
        {
886  
            struct guard {
886  
            struct guard {
887  
                any_buffer_sink* self;
887  
                any_buffer_sink* self;
888  
                ~guard() {
888  
                ~guard() {
889  
                    self->active_ops_->destroy(self->cached_awaitable_);
889  
                    self->active_ops_->destroy(self->cached_awaitable_);
890  
                    self->active_ops_ = nullptr;
890  
                    self->active_ops_ = nullptr;
891  
                }
891  
                }
892  
            } g{self_};
892  
            } g{self_};
893  
            return self_->active_ops_->await_resume(
893  
            return self_->active_ops_->await_resume(
894  
                self_->cached_awaitable_);
894  
                self_->cached_awaitable_);
895  
        }
895  
        }
896  
    };
896  
    };
897  
    return awaitable{this, n};
897  
    return awaitable{this, n};
898  
}
898  
}
899  

899  

900  
//----------------------------------------------------------
900  
//----------------------------------------------------------
901  
// Private helpers for native WriteSink forwarding
901  
// Private helpers for native WriteSink forwarding
902  

902  

903  
inline auto
903  
inline auto
904  
any_buffer_sink::write_some_(
904  
any_buffer_sink::write_some_(
905  
    std::span<const_buffer const> buffers)
905  
    std::span<const_buffer const> buffers)
906  
{
906  
{
907  
    struct awaitable
907  
    struct awaitable
908  
    {
908  
    {
909  
        any_buffer_sink* self_;
909  
        any_buffer_sink* self_;
910  
        std::span<const_buffer const> buffers_;
910  
        std::span<const_buffer const> buffers_;
911  

911  

912  
        bool
912  
        bool
913  
        await_ready() const noexcept
913  
        await_ready() const noexcept
914  
        {
914  
        {
915  
            return false;
915  
            return false;
916  
        }
916  
        }
917  

917  

918  
        std::coroutine_handle<>
918  
        std::coroutine_handle<>
919  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
919  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
920  
        {
920  
        {
921  
            self_->active_write_ops_ =
921  
            self_->active_write_ops_ =
922  
                self_->vt_->construct_write_some_awaitable(
922  
                self_->vt_->construct_write_some_awaitable(
923  
                    self_->sink_,
923  
                    self_->sink_,
924  
                    self_->cached_awaitable_,
924  
                    self_->cached_awaitable_,
925  
                    buffers_);
925  
                    buffers_);
926  

926  

927  
            if(self_->active_write_ops_->await_ready(
927  
            if(self_->active_write_ops_->await_ready(
928  
                self_->cached_awaitable_))
928  
                self_->cached_awaitable_))
929  
                return h;
929  
                return h;
930  

930  

931  
            return self_->active_write_ops_->await_suspend(
931  
            return self_->active_write_ops_->await_suspend(
932  
                self_->cached_awaitable_, h, env);
932  
                self_->cached_awaitable_, h, env);
933  
        }
933  
        }
934  

934  

935  
        io_result<std::size_t>
935  
        io_result<std::size_t>
936  
        await_resume()
936  
        await_resume()
937  
        {
937  
        {
938  
            struct guard {
938  
            struct guard {
939  
                any_buffer_sink* self;
939  
                any_buffer_sink* self;
940  
                ~guard() {
940  
                ~guard() {
941  
                    self->active_write_ops_->destroy(
941  
                    self->active_write_ops_->destroy(
942  
                        self->cached_awaitable_);
942  
                        self->cached_awaitable_);
943  
                    self->active_write_ops_ = nullptr;
943  
                    self->active_write_ops_ = nullptr;
944  
                }
944  
                }
945  
            } g{self_};
945  
            } g{self_};
946  
            return self_->active_write_ops_->await_resume(
946  
            return self_->active_write_ops_->await_resume(
947  
                self_->cached_awaitable_);
947  
                self_->cached_awaitable_);
948  
        }
948  
        }
949  
    };
949  
    };
950  
    return awaitable{this, buffers};
950  
    return awaitable{this, buffers};
951  
}
951  
}
952  

952  

953  
inline auto
953  
inline auto
954  
any_buffer_sink::write_(
954  
any_buffer_sink::write_(
955  
    std::span<const_buffer const> buffers)
955  
    std::span<const_buffer const> buffers)
956  
{
956  
{
957  
    struct awaitable
957  
    struct awaitable
958  
    {
958  
    {
959  
        any_buffer_sink* self_;
959  
        any_buffer_sink* self_;
960  
        std::span<const_buffer const> buffers_;
960  
        std::span<const_buffer const> buffers_;
961  

961  

962  
        bool
962  
        bool
963  
        await_ready() const noexcept
963  
        await_ready() const noexcept
964  
        {
964  
        {
965  
            return false;
965  
            return false;
966  
        }
966  
        }
967  

967  

968  
        std::coroutine_handle<>
968  
        std::coroutine_handle<>
969  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
969  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
970  
        {
970  
        {
971  
            self_->active_write_ops_ =
971  
            self_->active_write_ops_ =
972  
                self_->vt_->construct_write_awaitable(
972  
                self_->vt_->construct_write_awaitable(
973  
                    self_->sink_,
973  
                    self_->sink_,
974  
                    self_->cached_awaitable_,
974  
                    self_->cached_awaitable_,
975  
                    buffers_);
975  
                    buffers_);
976  

976  

977  
            if(self_->active_write_ops_->await_ready(
977  
            if(self_->active_write_ops_->await_ready(
978  
                self_->cached_awaitable_))
978  
                self_->cached_awaitable_))
979  
                return h;
979  
                return h;
980  

980  

981  
            return self_->active_write_ops_->await_suspend(
981  
            return self_->active_write_ops_->await_suspend(
982  
                self_->cached_awaitable_, h, env);
982  
                self_->cached_awaitable_, h, env);
983  
        }
983  
        }
984  

984  

985  
        io_result<std::size_t>
985  
        io_result<std::size_t>
986  
        await_resume()
986  
        await_resume()
987  
        {
987  
        {
988  
            struct guard {
988  
            struct guard {
989  
                any_buffer_sink* self;
989  
                any_buffer_sink* self;
990  
                ~guard() {
990  
                ~guard() {
991  
                    self->active_write_ops_->destroy(
991  
                    self->active_write_ops_->destroy(
992  
                        self->cached_awaitable_);
992  
                        self->cached_awaitable_);
993  
                    self->active_write_ops_ = nullptr;
993  
                    self->active_write_ops_ = nullptr;
994  
                }
994  
                }
995  
            } g{self_};
995  
            } g{self_};
996  
            return self_->active_write_ops_->await_resume(
996  
            return self_->active_write_ops_->await_resume(
997  
                self_->cached_awaitable_);
997  
                self_->cached_awaitable_);
998  
        }
998  
        }
999  
    };
999  
    };
1000  
    return awaitable{this, buffers};
1000  
    return awaitable{this, buffers};
1001  
}
1001  
}
1002  

1002  

1003  
inline auto
1003  
inline auto
1004  
any_buffer_sink::write_eof_buffers_(
1004  
any_buffer_sink::write_eof_buffers_(
1005  
    std::span<const_buffer const> buffers)
1005  
    std::span<const_buffer const> buffers)
1006  
{
1006  
{
1007  
    struct awaitable
1007  
    struct awaitable
1008  
    {
1008  
    {
1009  
        any_buffer_sink* self_;
1009  
        any_buffer_sink* self_;
1010  
        std::span<const_buffer const> buffers_;
1010  
        std::span<const_buffer const> buffers_;
1011  

1011  

1012  
        bool
1012  
        bool
1013  
        await_ready() const noexcept
1013  
        await_ready() const noexcept
1014  
        {
1014  
        {
1015  
            return false;
1015  
            return false;
1016  
        }
1016  
        }
1017  

1017  

1018  
        std::coroutine_handle<>
1018  
        std::coroutine_handle<>
1019  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1019  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1020  
        {
1020  
        {
1021  
            self_->active_write_ops_ =
1021  
            self_->active_write_ops_ =
1022  
                self_->vt_->construct_write_eof_buffers_awaitable(
1022  
                self_->vt_->construct_write_eof_buffers_awaitable(
1023  
                    self_->sink_,
1023  
                    self_->sink_,
1024  
                    self_->cached_awaitable_,
1024  
                    self_->cached_awaitable_,
1025  
                    buffers_);
1025  
                    buffers_);
1026  

1026  

1027  
            if(self_->active_write_ops_->await_ready(
1027  
            if(self_->active_write_ops_->await_ready(
1028  
                self_->cached_awaitable_))
1028  
                self_->cached_awaitable_))
1029  
                return h;
1029  
                return h;
1030  

1030  

1031  
            return self_->active_write_ops_->await_suspend(
1031  
            return self_->active_write_ops_->await_suspend(
1032  
                self_->cached_awaitable_, h, env);
1032  
                self_->cached_awaitable_, h, env);
1033  
        }
1033  
        }
1034  

1034  

1035  
        io_result<std::size_t>
1035  
        io_result<std::size_t>
1036  
        await_resume()
1036  
        await_resume()
1037  
        {
1037  
        {
1038  
            struct guard {
1038  
            struct guard {
1039  
                any_buffer_sink* self;
1039  
                any_buffer_sink* self;
1040  
                ~guard() {
1040  
                ~guard() {
1041  
                    self->active_write_ops_->destroy(
1041  
                    self->active_write_ops_->destroy(
1042  
                        self->cached_awaitable_);
1042  
                        self->cached_awaitable_);
1043  
                    self->active_write_ops_ = nullptr;
1043  
                    self->active_write_ops_ = nullptr;
1044  
                }
1044  
                }
1045  
            } g{self_};
1045  
            } g{self_};
1046  
            return self_->active_write_ops_->await_resume(
1046  
            return self_->active_write_ops_->await_resume(
1047  
                self_->cached_awaitable_);
1047  
                self_->cached_awaitable_);
1048  
        }
1048  
        }
1049  
    };
1049  
    };
1050  
    return awaitable{this, buffers};
1050  
    return awaitable{this, buffers};
1051  
}
1051  
}
1052  

1052  

1053  
//----------------------------------------------------------
1053  
//----------------------------------------------------------
1054  
// Public WriteSink methods
1054  
// Public WriteSink methods
1055  

1055  

1056  
template<ConstBufferSequence CB>
1056  
template<ConstBufferSequence CB>
1057  
io_task<std::size_t>
1057  
io_task<std::size_t>
1058  
any_buffer_sink::write_some(CB buffers)
1058  
any_buffer_sink::write_some(CB buffers)
1059  
{
1059  
{
1060  
    buffer_param<CB> bp(buffers);
1060  
    buffer_param<CB> bp(buffers);
1061  
    auto src = bp.data();
1061  
    auto src = bp.data();
1062  
    if(src.empty())
1062  
    if(src.empty())
1063  
        co_return {{}, 0};
1063  
        co_return {{}, 0};
1064  

1064  

1065  
    // Native WriteSink path
1065  
    // Native WriteSink path
1066  
    if(vt_->construct_write_some_awaitable)
1066  
    if(vt_->construct_write_some_awaitable)
1067  
        co_return co_await write_some_(src);
1067  
        co_return co_await write_some_(src);
1068  

1068  

1069  
    // Synthesized path: prepare + buffer_copy + commit
1069  
    // Synthesized path: prepare + buffer_copy + commit
1070  
    mutable_buffer arr[detail::max_iovec_];
1070  
    mutable_buffer arr[detail::max_iovec_];
1071  
    auto dst_bufs = prepare(arr);
1071  
    auto dst_bufs = prepare(arr);
1072  
    if(dst_bufs.empty())
1072  
    if(dst_bufs.empty())
1073  
    {
1073  
    {
1074  
        auto [ec] = co_await commit(0);
1074  
        auto [ec] = co_await commit(0);
1075  
        if(ec)
1075  
        if(ec)
1076  
            co_return {ec, 0};
1076  
            co_return {ec, 0};
1077  
        dst_bufs = prepare(arr);
1077  
        dst_bufs = prepare(arr);
1078  
        if(dst_bufs.empty())
1078  
        if(dst_bufs.empty())
1079  
            co_return {{}, 0};
1079  
            co_return {{}, 0};
1080  
    }
1080  
    }
1081  

1081  

1082  
    auto n = buffer_copy(dst_bufs, src);
1082  
    auto n = buffer_copy(dst_bufs, src);
1083  
    auto [ec] = co_await commit(n);
1083  
    auto [ec] = co_await commit(n);
1084  
    if(ec)
1084  
    if(ec)
1085  
        co_return {ec, 0};
1085  
        co_return {ec, 0};
1086  
    co_return {{}, n};
1086  
    co_return {{}, n};
1087  
}
1087  
}
1088  

1088  

1089  
template<ConstBufferSequence CB>
1089  
template<ConstBufferSequence CB>
1090  
io_task<std::size_t>
1090  
io_task<std::size_t>
1091  
any_buffer_sink::write(CB buffers)
1091  
any_buffer_sink::write(CB buffers)
1092  
{
1092  
{
1093  
    buffer_param<CB> bp(buffers);
1093  
    buffer_param<CB> bp(buffers);
1094  
    std::size_t total = 0;
1094  
    std::size_t total = 0;
1095  

1095  

1096  
    // Native WriteSink path
1096  
    // Native WriteSink path
1097  
    if(vt_->construct_write_awaitable)
1097  
    if(vt_->construct_write_awaitable)
1098  
    {
1098  
    {
1099  
        for(;;)
1099  
        for(;;)
1100  
        {
1100  
        {
1101  
            auto bufs = bp.data();
1101  
            auto bufs = bp.data();
1102  
            if(bufs.empty())
1102  
            if(bufs.empty())
1103  
                break;
1103  
                break;
1104  

1104  

1105  
            auto [ec, n] = co_await write_(bufs);
1105  
            auto [ec, n] = co_await write_(bufs);
1106  
            total += n;
1106  
            total += n;
1107  
            if(ec)
1107  
            if(ec)
1108  
                co_return {ec, total};
1108  
                co_return {ec, total};
1109  
            bp.consume(n);
1109  
            bp.consume(n);
1110  
        }
1110  
        }
1111  
        co_return {{}, total};
1111  
        co_return {{}, total};
1112  
    }
1112  
    }
1113  

1113  

1114  
    // Synthesized path: prepare + buffer_copy + commit
1114  
    // Synthesized path: prepare + buffer_copy + commit
1115  
    for(;;)
1115  
    for(;;)
1116  
    {
1116  
    {
1117  
        auto src = bp.data();
1117  
        auto src = bp.data();
1118  
        if(src.empty())
1118  
        if(src.empty())
1119  
            break;
1119  
            break;
1120  

1120  

1121  
        mutable_buffer arr[detail::max_iovec_];
1121  
        mutable_buffer arr[detail::max_iovec_];
1122  
        auto dst_bufs = prepare(arr);
1122  
        auto dst_bufs = prepare(arr);
1123  
        if(dst_bufs.empty())
1123  
        if(dst_bufs.empty())
1124  
        {
1124  
        {
1125  
            auto [ec] = co_await commit(0);
1125  
            auto [ec] = co_await commit(0);
1126  
            if(ec)
1126  
            if(ec)
1127  
                co_return {ec, total};
1127  
                co_return {ec, total};
1128  
            continue;
1128  
            continue;
1129  
        }
1129  
        }
1130  

1130  

1131  
        auto n = buffer_copy(dst_bufs, src);
1131  
        auto n = buffer_copy(dst_bufs, src);
1132  
        auto [ec] = co_await commit(n);
1132  
        auto [ec] = co_await commit(n);
1133  
        if(ec)
1133  
        if(ec)
1134  
            co_return {ec, total};
1134  
            co_return {ec, total};
1135  
        bp.consume(n);
1135  
        bp.consume(n);
1136  
        total += n;
1136  
        total += n;
1137  
    }
1137  
    }
1138  

1138  

1139  
    co_return {{}, total};
1139  
    co_return {{}, total};
1140  
}
1140  
}
1141  

1141  

1142  
inline auto
1142  
inline auto
1143  
any_buffer_sink::write_eof()
1143  
any_buffer_sink::write_eof()
1144  
{
1144  
{
1145  
    struct awaitable
1145  
    struct awaitable
1146  
    {
1146  
    {
1147  
        any_buffer_sink* self_;
1147  
        any_buffer_sink* self_;
1148  

1148  

1149  
        bool
1149  
        bool
1150  
        await_ready()
1150  
        await_ready()
1151  
        {
1151  
        {
1152  
            if(self_->vt_->construct_write_eof_awaitable)
1152  
            if(self_->vt_->construct_write_eof_awaitable)
1153  
            {
1153  
            {
1154  
                // Native WriteSink: forward to underlying write_eof()
1154  
                // Native WriteSink: forward to underlying write_eof()
1155  
                self_->active_ops_ =
1155  
                self_->active_ops_ =
1156  
                    self_->vt_->construct_write_eof_awaitable(
1156  
                    self_->vt_->construct_write_eof_awaitable(
1157  
                        self_->sink_,
1157  
                        self_->sink_,
1158  
                        self_->cached_awaitable_);
1158  
                        self_->cached_awaitable_);
1159  
            }
1159  
            }
1160  
            else
1160  
            else
1161  
            {
1161  
            {
1162  
                // Synthesized: commit_eof(0)
1162  
                // Synthesized: commit_eof(0)
1163  
                self_->active_ops_ =
1163  
                self_->active_ops_ =
1164  
                    self_->vt_->construct_commit_eof_awaitable(
1164  
                    self_->vt_->construct_commit_eof_awaitable(
1165  
                        self_->sink_,
1165  
                        self_->sink_,
1166  
                        self_->cached_awaitable_,
1166  
                        self_->cached_awaitable_,
1167  
                        0);
1167  
                        0);
1168  
            }
1168  
            }
1169  
            return self_->active_ops_->await_ready(
1169  
            return self_->active_ops_->await_ready(
1170  
                self_->cached_awaitable_);
1170  
                self_->cached_awaitable_);
1171  
        }
1171  
        }
1172  

1172  

1173  
        std::coroutine_handle<>
1173  
        std::coroutine_handle<>
1174  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1174  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1175  
        {
1175  
        {
1176  
            return self_->active_ops_->await_suspend(
1176  
            return self_->active_ops_->await_suspend(
1177  
                self_->cached_awaitable_, h, env);
1177  
                self_->cached_awaitable_, h, env);
1178  
        }
1178  
        }
1179  

1179  

1180  
        io_result<>
1180  
        io_result<>
1181  
        await_resume()
1181  
        await_resume()
1182  
        {
1182  
        {
1183  
            struct guard {
1183  
            struct guard {
1184  
                any_buffer_sink* self;
1184  
                any_buffer_sink* self;
1185  
                ~guard() {
1185  
                ~guard() {
1186  
                    self->active_ops_->destroy(self->cached_awaitable_);
1186  
                    self->active_ops_->destroy(self->cached_awaitable_);
1187  
                    self->active_ops_ = nullptr;
1187  
                    self->active_ops_ = nullptr;
1188  
                }
1188  
                }
1189  
            } g{self_};
1189  
            } g{self_};
1190  
            return self_->active_ops_->await_resume(
1190  
            return self_->active_ops_->await_resume(
1191  
                self_->cached_awaitable_);
1191  
                self_->cached_awaitable_);
1192  
        }
1192  
        }
1193  
    };
1193  
    };
1194  
    return awaitable{this};
1194  
    return awaitable{this};
1195  
}
1195  
}
1196  

1196  

1197  
template<ConstBufferSequence CB>
1197  
template<ConstBufferSequence CB>
1198  
io_task<std::size_t>
1198  
io_task<std::size_t>
1199  
any_buffer_sink::write_eof(CB buffers)
1199  
any_buffer_sink::write_eof(CB buffers)
1200  
{
1200  
{
1201  
    // Native WriteSink path
1201  
    // Native WriteSink path
1202  
    if(vt_->construct_write_eof_buffers_awaitable)
1202  
    if(vt_->construct_write_eof_buffers_awaitable)
1203  
    {
1203  
    {
1204  
        const_buffer_param<CB> bp(buffers);
1204  
        const_buffer_param<CB> bp(buffers);
1205  
        std::size_t total = 0;
1205  
        std::size_t total = 0;
1206  

1206  

1207  
        for(;;)
1207  
        for(;;)
1208  
        {
1208  
        {
1209  
            auto bufs = bp.data();
1209  
            auto bufs = bp.data();
1210  
            if(bufs.empty())
1210  
            if(bufs.empty())
1211  
            {
1211  
            {
1212  
                auto [ec] = co_await write_eof();
1212  
                auto [ec] = co_await write_eof();
1213  
                co_return {ec, total};
1213  
                co_return {ec, total};
1214  
            }
1214  
            }
1215  

1215  

1216  
            if(!bp.more())
1216  
            if(!bp.more())
1217  
            {
1217  
            {
1218  
                // Last window: send atomically with EOF
1218  
                // Last window: send atomically with EOF
1219  
                auto [ec, n] = co_await write_eof_buffers_(bufs);
1219  
                auto [ec, n] = co_await write_eof_buffers_(bufs);
1220  
                total += n;
1220  
                total += n;
1221  
                co_return {ec, total};
1221  
                co_return {ec, total};
1222  
            }
1222  
            }
1223  

1223  

1224  
            auto [ec, n] = co_await write_(bufs);
1224  
            auto [ec, n] = co_await write_(bufs);
1225  
            total += n;
1225  
            total += n;
1226  
            if(ec)
1226  
            if(ec)
1227  
                co_return {ec, total};
1227  
                co_return {ec, total};
1228  
            bp.consume(n);
1228  
            bp.consume(n);
1229  
        }
1229  
        }
1230  
    }
1230  
    }
1231  

1231  

1232  
    // Synthesized path: prepare + buffer_copy + commit + commit_eof
1232  
    // Synthesized path: prepare + buffer_copy + commit + commit_eof
1233  
    buffer_param<CB> bp(buffers);
1233  
    buffer_param<CB> bp(buffers);
1234  
    std::size_t total = 0;
1234  
    std::size_t total = 0;
1235  

1235  

1236  
    for(;;)
1236  
    for(;;)
1237  
    {
1237  
    {
1238  
        auto src = bp.data();
1238  
        auto src = bp.data();
1239  
        if(src.empty())
1239  
        if(src.empty())
1240  
            break;
1240  
            break;
1241  

1241  

1242  
        mutable_buffer arr[detail::max_iovec_];
1242  
        mutable_buffer arr[detail::max_iovec_];
1243  
        auto dst_bufs = prepare(arr);
1243  
        auto dst_bufs = prepare(arr);
1244  
        if(dst_bufs.empty())
1244  
        if(dst_bufs.empty())
1245  
        {
1245  
        {
1246  
            auto [ec] = co_await commit(0);
1246  
            auto [ec] = co_await commit(0);
1247  
            if(ec)
1247  
            if(ec)
1248  
                co_return {ec, total};
1248  
                co_return {ec, total};
1249  
            continue;
1249  
            continue;
1250  
        }
1250  
        }
1251  

1251  

1252  
        auto n = buffer_copy(dst_bufs, src);
1252  
        auto n = buffer_copy(dst_bufs, src);
1253  
        auto [ec] = co_await commit(n);
1253  
        auto [ec] = co_await commit(n);
1254  
        if(ec)
1254  
        if(ec)
1255  
            co_return {ec, total};
1255  
            co_return {ec, total};
1256  
        bp.consume(n);
1256  
        bp.consume(n);
1257  
        total += n;
1257  
        total += n;
1258  
    }
1258  
    }
1259  

1259  

1260  
    auto [ec] = co_await commit_eof(0);
1260  
    auto [ec] = co_await commit_eof(0);
1261  
    if(ec)
1261  
    if(ec)
1262  
        co_return {ec, total};
1262  
        co_return {ec, total};
1263  

1263  

1264  
    co_return {{}, total};
1264  
    co_return {{}, total};
1265  
}
1265  
}
1266  

1266  

1267  
//----------------------------------------------------------
1267  
//----------------------------------------------------------
1268  

1268  

1269  
static_assert(BufferSink<any_buffer_sink>);
1269  
static_assert(BufferSink<any_buffer_sink>);
1270  
static_assert(WriteSink<any_buffer_sink>);
1270  
static_assert(WriteSink<any_buffer_sink>);
1271  

1271  

1272  
} // namespace capy
1272  
} // namespace capy
1273  
} // namespace boost
1273  
} // namespace boost
1274  

1274  

1275  
#endif
1275  
#endif