1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/read_source.hpp>
19  
#include <boost/capy/concept/read_source.hpp>
20  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/io_task.hpp>
22  
#include <boost/capy/io_task.hpp>
23  

23  

24  
#include <concepts>
24  
#include <concepts>
25  
#include <coroutine>
25  
#include <coroutine>
26  
#include <cstddef>
26  
#include <cstddef>
27  
#include <new>
27  
#include <new>
28  
#include <span>
28  
#include <span>
29  
#include <stop_token>
29  
#include <stop_token>
30  
#include <system_error>
30  
#include <system_error>
31  
#include <utility>
31  
#include <utility>
32  

32  

33  
namespace boost {
33  
namespace boost {
34  
namespace capy {
34  
namespace capy {
35  

35  

36  
/** Type-erased wrapper for any ReadSource.
36  
/** Type-erased wrapper for any ReadSource.
37  

37  

38  
    This class provides type erasure for any type satisfying the
38  
    This class provides type erasure for any type satisfying the
39  
    @ref ReadSource concept, enabling runtime polymorphism for
39  
    @ref ReadSource concept, enabling runtime polymorphism for
40  
    source read operations. It uses cached awaitable storage to achieve
40  
    source read operations. It uses cached awaitable storage to achieve
41  
    zero steady-state allocation after construction.
41  
    zero steady-state allocation after construction.
42  

42  

43  
    The wrapper supports two construction modes:
43  
    The wrapper supports two construction modes:
44  
    - **Owning**: Pass by value to transfer ownership. The wrapper
44  
    - **Owning**: Pass by value to transfer ownership. The wrapper
45  
      allocates storage and owns the source.
45  
      allocates storage and owns the source.
46  
    - **Reference**: Pass a pointer to wrap without ownership. The
46  
    - **Reference**: Pass a pointer to wrap without ownership. The
47  
      pointed-to source must outlive this wrapper.
47  
      pointed-to source must outlive this wrapper.
48  

48  

49  
    @par Awaitable Preallocation
49  
    @par Awaitable Preallocation
50  
    The constructor preallocates storage for the type-erased awaitable.
50  
    The constructor preallocates storage for the type-erased awaitable.
51  
    This reserves all virtual address space at server startup
51  
    This reserves all virtual address space at server startup
52  
    so memory usage can be measured up front, rather than
52  
    so memory usage can be measured up front, rather than
53  
    allocating piecemeal as traffic arrives.
53  
    allocating piecemeal as traffic arrives.
54  

54  

55  
    @par Immediate Completion
55  
    @par Immediate Completion
56  
    Operations complete immediately without suspending when the
56  
    Operations complete immediately without suspending when the
57  
    buffer sequence is empty, or when the underlying source's
57  
    buffer sequence is empty, or when the underlying source's
58  
    awaitable reports readiness via `await_ready`.
58  
    awaitable reports readiness via `await_ready`.
59  

59  

60  
    @par Thread Safety
60  
    @par Thread Safety
61  
    Not thread-safe. Concurrent operations on the same wrapper
61  
    Not thread-safe. Concurrent operations on the same wrapper
62  
    are undefined behavior.
62  
    are undefined behavior.
63  

63  

64  
    @par Example
64  
    @par Example
65  
    @code
65  
    @code
66  
    // Owning - takes ownership of the source
66  
    // Owning - takes ownership of the source
67  
    any_read_source rs(some_source{args...});
67  
    any_read_source rs(some_source{args...});
68  

68  

69  
    // Reference - wraps without ownership
69  
    // Reference - wraps without ownership
70  
    some_source source;
70  
    some_source source;
71  
    any_read_source rs(&source);
71  
    any_read_source rs(&source);
72  

72  

73  
    mutable_buffer buf(data, size);
73  
    mutable_buffer buf(data, size);
74  
    auto [ec, n] = co_await rs.read(std::span(&buf, 1));
74  
    auto [ec, n] = co_await rs.read(std::span(&buf, 1));
75  
    @endcode
75  
    @endcode
76  

76  

77  
    @see any_read_stream, ReadSource
77  
    @see any_read_stream, ReadSource
78  
*/
78  
*/
79  
class any_read_source
79  
class any_read_source
80  
{
80  
{
81  
    struct vtable;
81  
    struct vtable;
82  
    struct awaitable_ops;
82  
    struct awaitable_ops;
83  

83  

84  
    template<ReadSource S>
84  
    template<ReadSource S>
85  
    struct vtable_for_impl;
85  
    struct vtable_for_impl;
86  

86  

87  
    void* source_ = nullptr;
87  
    void* source_ = nullptr;
88  
    vtable const* vt_ = nullptr;
88  
    vtable const* vt_ = nullptr;
89  
    void* cached_awaitable_ = nullptr;
89  
    void* cached_awaitable_ = nullptr;
90  
    void* storage_ = nullptr;
90  
    void* storage_ = nullptr;
91  
    awaitable_ops const* active_ops_ = nullptr;
91  
    awaitable_ops const* active_ops_ = nullptr;
92  

92  

93  
public:
93  
public:
94  
    /** Destructor.
94  
    /** Destructor.
95  

95  

96  
        Destroys the owned source (if any) and releases the cached
96  
        Destroys the owned source (if any) and releases the cached
97  
        awaitable storage.
97  
        awaitable storage.
98  
    */
98  
    */
99  
    ~any_read_source();
99  
    ~any_read_source();
100  

100  

101  
    /** Default constructor.
101  
    /** Default constructor.
102  

102  

103  
        Constructs an empty wrapper. Operations on a default-constructed
103  
        Constructs an empty wrapper. Operations on a default-constructed
104  
        wrapper result in undefined behavior.
104  
        wrapper result in undefined behavior.
105  
    */
105  
    */
106  
    any_read_source() = default;
106  
    any_read_source() = default;
107  

107  

108  
    /** Non-copyable.
108  
    /** Non-copyable.
109  

109  

110  
        The awaitable cache is per-instance and cannot be shared.
110  
        The awaitable cache is per-instance and cannot be shared.
111  
    */
111  
    */
112  
    any_read_source(any_read_source const&) = delete;
112  
    any_read_source(any_read_source const&) = delete;
113  
    any_read_source& operator=(any_read_source const&) = delete;
113  
    any_read_source& operator=(any_read_source const&) = delete;
114  

114  

115  
    /** Move constructor.
115  
    /** Move constructor.
116  

116  

117  
        Transfers ownership of the wrapped source (if owned) and
117  
        Transfers ownership of the wrapped source (if owned) and
118  
        cached awaitable storage from `other`. After the move, `other` is
118  
        cached awaitable storage from `other`. After the move, `other` is
119  
        in a default-constructed state.
119  
        in a default-constructed state.
120  

120  

121  
        @param other The wrapper to move from.
121  
        @param other The wrapper to move from.
122  
    */
122  
    */
123  
    any_read_source(any_read_source&& other) noexcept
123  
    any_read_source(any_read_source&& other) noexcept
124  
        : source_(std::exchange(other.source_, nullptr))
124  
        : source_(std::exchange(other.source_, nullptr))
125  
        , vt_(std::exchange(other.vt_, nullptr))
125  
        , vt_(std::exchange(other.vt_, nullptr))
126  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127  
        , storage_(std::exchange(other.storage_, nullptr))
127  
        , storage_(std::exchange(other.storage_, nullptr))
128  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
128  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
129  
    {
129  
    {
130  
    }
130  
    }
131  

131  

132  
    /** Move assignment operator.
132  
    /** Move assignment operator.
133  

133  

134  
        Destroys any owned source and releases existing resources,
134  
        Destroys any owned source and releases existing resources,
135  
        then transfers ownership from `other`.
135  
        then transfers ownership from `other`.
136  

136  

137  
        @param other The wrapper to move from.
137  
        @param other The wrapper to move from.
138  
        @return Reference to this wrapper.
138  
        @return Reference to this wrapper.
139  
    */
139  
    */
140  
    any_read_source&
140  
    any_read_source&
141  
    operator=(any_read_source&& other) noexcept;
141  
    operator=(any_read_source&& other) noexcept;
142  

142  

143  
    /** Construct by taking ownership of a ReadSource.
143  
    /** Construct by taking ownership of a ReadSource.
144  

144  

145  
        Allocates storage and moves the source into this wrapper.
145  
        Allocates storage and moves the source into this wrapper.
146  
        The wrapper owns the source and will destroy it.
146  
        The wrapper owns the source and will destroy it.
147  

147  

148  
        @param s The source to take ownership of.
148  
        @param s The source to take ownership of.
149  
    */
149  
    */
150  
    template<ReadSource S>
150  
    template<ReadSource S>
151  
        requires (!std::same_as<std::decay_t<S>, any_read_source>)
151  
        requires (!std::same_as<std::decay_t<S>, any_read_source>)
152  
    any_read_source(S s);
152  
    any_read_source(S s);
153  

153  

154  
    /** Construct by wrapping a ReadSource without ownership.
154  
    /** Construct by wrapping a ReadSource without ownership.
155  

155  

156  
        Wraps the given source by pointer. The source must remain
156  
        Wraps the given source by pointer. The source must remain
157  
        valid for the lifetime of this wrapper.
157  
        valid for the lifetime of this wrapper.
158  

158  

159  
        @param s Pointer to the source to wrap.
159  
        @param s Pointer to the source to wrap.
160  
    */
160  
    */
161  
    template<ReadSource S>
161  
    template<ReadSource S>
162  
    any_read_source(S* s);
162  
    any_read_source(S* s);
163  

163  

164  
    /** Check if the wrapper contains a valid source.
164  
    /** Check if the wrapper contains a valid source.
165  

165  

166  
        @return `true` if wrapping a source, `false` if default-constructed
166  
        @return `true` if wrapping a source, `false` if default-constructed
167  
            or moved-from.
167  
            or moved-from.
168  
    */
168  
    */
169  
    bool
169  
    bool
170  
    has_value() const noexcept
170  
    has_value() const noexcept
171  
    {
171  
    {
172  
        return source_ != nullptr;
172  
        return source_ != nullptr;
173  
    }
173  
    }
174  

174  

175  
    /** Check if the wrapper contains a valid source.
175  
    /** Check if the wrapper contains a valid source.
176  

176  

177  
        @return `true` if wrapping a source, `false` if default-constructed
177  
        @return `true` if wrapping a source, `false` if default-constructed
178  
            or moved-from.
178  
            or moved-from.
179  
    */
179  
    */
180  
    explicit
180  
    explicit
181  
    operator bool() const noexcept
181  
    operator bool() const noexcept
182  
    {
182  
    {
183  
        return has_value();
183  
        return has_value();
184  
    }
184  
    }
185  

185  

186  
    /** Initiate a partial read operation.
186  
    /** Initiate a partial read operation.
187  

187  

188  
        Reads one or more bytes into the provided buffer sequence.
188  
        Reads one or more bytes into the provided buffer sequence.
189  
        May fill less than the full sequence.
189  
        May fill less than the full sequence.
190  

190  

191  
        @param buffers The buffer sequence to read into.
191  
        @param buffers The buffer sequence to read into.
192  

192  

193  
        @return An awaitable yielding `(error_code,std::size_t)`.
193  
        @return An awaitable yielding `(error_code,std::size_t)`.
194  

194  

195  
        @par Immediate Completion
195  
        @par Immediate Completion
196  
        The operation completes immediately without suspending
196  
        The operation completes immediately without suspending
197  
        the calling coroutine when:
197  
        the calling coroutine when:
198  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
198  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
199  
        @li The underlying source's awaitable reports immediate
199  
        @li The underlying source's awaitable reports immediate
200  
            readiness via `await_ready`.
200  
            readiness via `await_ready`.
201  

201  

202  
        @note This is a partial operation and may not process the
202  
        @note This is a partial operation and may not process the
203  
        entire buffer sequence. Use @ref read for guaranteed
203  
        entire buffer sequence. Use @ref read for guaranteed
204  
        complete transfer.
204  
        complete transfer.
205  

205  

206  
        @par Preconditions
206  
        @par Preconditions
207  
        The wrapper must contain a valid source (`has_value() == true`).
207  
        The wrapper must contain a valid source (`has_value() == true`).
208  
        The caller must not call this function again after a prior
208  
        The caller must not call this function again after a prior
209  
        call returned an error (including EOF).
209  
        call returned an error (including EOF).
210  
    */
210  
    */
211  
    template<MutableBufferSequence MB>
211  
    template<MutableBufferSequence MB>
212  
    auto
212  
    auto
213  
    read_some(MB buffers);
213  
    read_some(MB buffers);
214  

214  

215  
    /** Initiate a complete read operation.
215  
    /** Initiate a complete read operation.
216  

216  

217  
        Reads data into the provided buffer sequence by forwarding
217  
        Reads data into the provided buffer sequence by forwarding
218  
        to the underlying source's `read` operation. Large buffer
218  
        to the underlying source's `read` operation. Large buffer
219  
        sequences are processed in windows, with each window
219  
        sequences are processed in windows, with each window
220  
        forwarded as a separate `read` call to the underlying source.
220  
        forwarded as a separate `read` call to the underlying source.
221  
        The operation completes when the entire buffer sequence is
221  
        The operation completes when the entire buffer sequence is
222  
        filled, end-of-file is reached, or an error occurs.
222  
        filled, end-of-file is reached, or an error occurs.
223  

223  

224  
        @param buffers The buffer sequence to read into.
224  
        @param buffers The buffer sequence to read into.
225  

225  

226  
        @return An awaitable yielding `(error_code,std::size_t)`.
226  
        @return An awaitable yielding `(error_code,std::size_t)`.
227  

227  

228  
        @par Immediate Completion
228  
        @par Immediate Completion
229  
        The operation completes immediately without suspending
229  
        The operation completes immediately without suspending
230  
        the calling coroutine when:
230  
        the calling coroutine when:
231  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
231  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
232  
        @li The underlying source's `read` awaitable reports
232  
        @li The underlying source's `read` awaitable reports
233  
            immediate readiness via `await_ready`.
233  
            immediate readiness via `await_ready`.
234  

234  

235  
        @par Postconditions
235  
        @par Postconditions
236  
        Exactly one of the following is true on return:
236  
        Exactly one of the following is true on return:
237  
        @li **Success**: `!ec` and `n == buffer_size(buffers)`.
237  
        @li **Success**: `!ec` and `n == buffer_size(buffers)`.
238  
            The entire buffer was filled.
238  
            The entire buffer was filled.
239  
        @li **End-of-stream or Error**: `ec` and `n` indicates
239  
        @li **End-of-stream or Error**: `ec` and `n` indicates
240  
            the number of bytes transferred before the failure.
240  
            the number of bytes transferred before the failure.
241  

241  

242  
        @par Preconditions
242  
        @par Preconditions
243  
        The wrapper must contain a valid source (`has_value() == true`).
243  
        The wrapper must contain a valid source (`has_value() == true`).
244  
        The caller must not call this function again after a prior
244  
        The caller must not call this function again after a prior
245  
        call returned an error (including EOF).
245  
        call returned an error (including EOF).
246  
    */
246  
    */
247  
    template<MutableBufferSequence MB>
247  
    template<MutableBufferSequence MB>
248  
    io_task<std::size_t>
248  
    io_task<std::size_t>
249  
    read(MB buffers);
249  
    read(MB buffers);
250  

250  

251  
protected:
251  
protected:
252  
    /** Rebind to a new source after move.
252  
    /** Rebind to a new source after move.
253  

253  

254  
        Updates the internal pointer to reference a new source object.
254  
        Updates the internal pointer to reference a new source object.
255  
        Used by owning wrappers after move assignment when the owned
255  
        Used by owning wrappers after move assignment when the owned
256  
        object has moved to a new location.
256  
        object has moved to a new location.
257  

257  

258  
        @param new_source The new source to bind to. Must be the same
258  
        @param new_source The new source to bind to. Must be the same
259  
            type as the original source.
259  
            type as the original source.
260  

260  

261  
        @note Terminates if called with a source of different type
261  
        @note Terminates if called with a source of different type
262  
            than the original.
262  
            than the original.
263  
    */
263  
    */
264  
    template<ReadSource S>
264  
    template<ReadSource S>
265  
    void
265  
    void
266  
    rebind(S& new_source) noexcept
266  
    rebind(S& new_source) noexcept
267  
    {
267  
    {
268  
        if(vt_ != &vtable_for_impl<S>::value)
268  
        if(vt_ != &vtable_for_impl<S>::value)
269  
            std::terminate();
269  
            std::terminate();
270  
        source_ = &new_source;
270  
        source_ = &new_source;
271  
    }
271  
    }
272  

272  

273  
private:
273  
private:
274  
    auto
274  
    auto
275  
    read_(std::span<mutable_buffer const> buffers);
275  
    read_(std::span<mutable_buffer const> buffers);
276  
};
276  
};
277  

277  

278  
//----------------------------------------------------------
278  
//----------------------------------------------------------
279  

279  

280  
// ordered by call sequence for cache line coherence
280  
// ordered by call sequence for cache line coherence
281  
struct any_read_source::awaitable_ops
281  
struct any_read_source::awaitable_ops
282  
{
282  
{
283  
    bool (*await_ready)(void*);
283  
    bool (*await_ready)(void*);
284  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
284  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
285  
    io_result<std::size_t> (*await_resume)(void*);
285  
    io_result<std::size_t> (*await_resume)(void*);
286  
    void (*destroy)(void*) noexcept;
286  
    void (*destroy)(void*) noexcept;
287  
};
287  
};
288  

288  

289  
// ordered by call frequency for cache line coherence
289  
// ordered by call frequency for cache line coherence
290  
struct any_read_source::vtable
290  
struct any_read_source::vtable
291  
{
291  
{
292  
    awaitable_ops const* (*construct_read_some_awaitable)(
292  
    awaitable_ops const* (*construct_read_some_awaitable)(
293  
        void* source,
293  
        void* source,
294  
        void* storage,
294  
        void* storage,
295  
        std::span<mutable_buffer const> buffers);
295  
        std::span<mutable_buffer const> buffers);
296  
    awaitable_ops const* (*construct_read_awaitable)(
296  
    awaitable_ops const* (*construct_read_awaitable)(
297  
        void* source,
297  
        void* source,
298  
        void* storage,
298  
        void* storage,
299  
        std::span<mutable_buffer const> buffers);
299  
        std::span<mutable_buffer const> buffers);
300  
    std::size_t awaitable_size;
300  
    std::size_t awaitable_size;
301  
    std::size_t awaitable_align;
301  
    std::size_t awaitable_align;
302  
    void (*destroy)(void*) noexcept;
302  
    void (*destroy)(void*) noexcept;
303  
};
303  
};
304  

304  

305  
template<ReadSource S>
305  
template<ReadSource S>
306  
struct any_read_source::vtable_for_impl
306  
struct any_read_source::vtable_for_impl
307  
{
307  
{
308  
    using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
308  
    using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
309  
        std::span<mutable_buffer const>{}));
309  
        std::span<mutable_buffer const>{}));
310  
    using ReadAwaitable = decltype(std::declval<S&>().read(
310  
    using ReadAwaitable = decltype(std::declval<S&>().read(
311  
        std::span<mutable_buffer const>{}));
311  
        std::span<mutable_buffer const>{}));
312  

312  

313  
    static void
313  
    static void
314  
    do_destroy_impl(void* source) noexcept
314  
    do_destroy_impl(void* source) noexcept
315  
    {
315  
    {
316  
        static_cast<S*>(source)->~S();
316  
        static_cast<S*>(source)->~S();
317  
    }
317  
    }
318  

318  

319  
    static awaitable_ops const*
319  
    static awaitable_ops const*
320  
    construct_read_some_awaitable_impl(
320  
    construct_read_some_awaitable_impl(
321  
        void* source,
321  
        void* source,
322  
        void* storage,
322  
        void* storage,
323  
        std::span<mutable_buffer const> buffers)
323  
        std::span<mutable_buffer const> buffers)
324  
    {
324  
    {
325  
        auto& s = *static_cast<S*>(source);
325  
        auto& s = *static_cast<S*>(source);
326  
        ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
326  
        ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
327  

327  

328  
        static constexpr awaitable_ops ops = {
328  
        static constexpr awaitable_ops ops = {
329  
            +[](void* p) {
329  
            +[](void* p) {
330  
                return static_cast<ReadSomeAwaitable*>(p)->await_ready();
330  
                return static_cast<ReadSomeAwaitable*>(p)->await_ready();
331  
            },
331  
            },
332  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
332  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
333  
                return detail::call_await_suspend(
333  
                return detail::call_await_suspend(
334  
                    static_cast<ReadSomeAwaitable*>(p), h, env);
334  
                    static_cast<ReadSomeAwaitable*>(p), h, env);
335  
            },
335  
            },
336  
            +[](void* p) {
336  
            +[](void* p) {
337  
                return static_cast<ReadSomeAwaitable*>(p)->await_resume();
337  
                return static_cast<ReadSomeAwaitable*>(p)->await_resume();
338  
            },
338  
            },
339  
            +[](void* p) noexcept {
339  
            +[](void* p) noexcept {
340  
                static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
340  
                static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
341  
            }
341  
            }
342  
        };
342  
        };
343  
        return &ops;
343  
        return &ops;
344  
    }
344  
    }
345  

345  

346  
    static awaitable_ops const*
346  
    static awaitable_ops const*
347  
    construct_read_awaitable_impl(
347  
    construct_read_awaitable_impl(
348  
        void* source,
348  
        void* source,
349  
        void* storage,
349  
        void* storage,
350  
        std::span<mutable_buffer const> buffers)
350  
        std::span<mutable_buffer const> buffers)
351  
    {
351  
    {
352  
        auto& s = *static_cast<S*>(source);
352  
        auto& s = *static_cast<S*>(source);
353  
        ::new(storage) ReadAwaitable(s.read(buffers));
353  
        ::new(storage) ReadAwaitable(s.read(buffers));
354  

354  

355  
        static constexpr awaitable_ops ops = {
355  
        static constexpr awaitable_ops ops = {
356  
            +[](void* p) {
356  
            +[](void* p) {
357  
                return static_cast<ReadAwaitable*>(p)->await_ready();
357  
                return static_cast<ReadAwaitable*>(p)->await_ready();
358  
            },
358  
            },
359  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
359  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
360  
                return detail::call_await_suspend(
360  
                return detail::call_await_suspend(
361  
                    static_cast<ReadAwaitable*>(p), h, env);
361  
                    static_cast<ReadAwaitable*>(p), h, env);
362  
            },
362  
            },
363  
            +[](void* p) {
363  
            +[](void* p) {
364  
                return static_cast<ReadAwaitable*>(p)->await_resume();
364  
                return static_cast<ReadAwaitable*>(p)->await_resume();
365  
            },
365  
            },
366  
            +[](void* p) noexcept {
366  
            +[](void* p) noexcept {
367  
                static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
367  
                static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
368  
            }
368  
            }
369  
        };
369  
        };
370  
        return &ops;
370  
        return &ops;
371  
    }
371  
    }
372  

372  

373  
    static constexpr std::size_t max_awaitable_size =
373  
    static constexpr std::size_t max_awaitable_size =
374  
        sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
374  
        sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
375  
            ? sizeof(ReadSomeAwaitable)
375  
            ? sizeof(ReadSomeAwaitable)
376  
            : sizeof(ReadAwaitable);
376  
            : sizeof(ReadAwaitable);
377  
    static constexpr std::size_t max_awaitable_align =
377  
    static constexpr std::size_t max_awaitable_align =
378  
        alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
378  
        alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
379  
            ? alignof(ReadSomeAwaitable)
379  
            ? alignof(ReadSomeAwaitable)
380  
            : alignof(ReadAwaitable);
380  
            : alignof(ReadAwaitable);
381  

381  

382  
    static constexpr vtable value = {
382  
    static constexpr vtable value = {
383  
        &construct_read_some_awaitable_impl,
383  
        &construct_read_some_awaitable_impl,
384  
        &construct_read_awaitable_impl,
384  
        &construct_read_awaitable_impl,
385  
        max_awaitable_size,
385  
        max_awaitable_size,
386  
        max_awaitable_align,
386  
        max_awaitable_align,
387  
        &do_destroy_impl
387  
        &do_destroy_impl
388  
    };
388  
    };
389  
};
389  
};
390  

390  

391  
//----------------------------------------------------------
391  
//----------------------------------------------------------
392  

392  

393  
inline
393  
inline
394  
any_read_source::~any_read_source()
394  
any_read_source::~any_read_source()
395  
{
395  
{
396  
    if(storage_)
396  
    if(storage_)
397  
    {
397  
    {
398  
        vt_->destroy(source_);
398  
        vt_->destroy(source_);
399  
        ::operator delete(storage_);
399  
        ::operator delete(storage_);
400  
    }
400  
    }
401  
    if(cached_awaitable_)
401  
    if(cached_awaitable_)
402  
    {
402  
    {
403  
        if(active_ops_)
403  
        if(active_ops_)
404  
            active_ops_->destroy(cached_awaitable_);
404  
            active_ops_->destroy(cached_awaitable_);
405  
        ::operator delete(cached_awaitable_);
405  
        ::operator delete(cached_awaitable_);
406  
    }
406  
    }
407  
}
407  
}
408  

408  

409  
inline any_read_source&
409  
inline any_read_source&
410  
any_read_source::operator=(any_read_source&& other) noexcept
410  
any_read_source::operator=(any_read_source&& other) noexcept
411  
{
411  
{
412  
    if(this != &other)
412  
    if(this != &other)
413  
    {
413  
    {
414  
        if(storage_)
414  
        if(storage_)
415  
        {
415  
        {
416  
            vt_->destroy(source_);
416  
            vt_->destroy(source_);
417  
            ::operator delete(storage_);
417  
            ::operator delete(storage_);
418  
        }
418  
        }
419  
        if(cached_awaitable_)
419  
        if(cached_awaitable_)
420  
        {
420  
        {
421  
            if(active_ops_)
421  
            if(active_ops_)
422  
                active_ops_->destroy(cached_awaitable_);
422  
                active_ops_->destroy(cached_awaitable_);
423  
            ::operator delete(cached_awaitable_);
423  
            ::operator delete(cached_awaitable_);
424  
        }
424  
        }
425  
        source_ = std::exchange(other.source_, nullptr);
425  
        source_ = std::exchange(other.source_, nullptr);
426  
        vt_ = std::exchange(other.vt_, nullptr);
426  
        vt_ = std::exchange(other.vt_, nullptr);
427  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
427  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
428  
        storage_ = std::exchange(other.storage_, nullptr);
428  
        storage_ = std::exchange(other.storage_, nullptr);
429  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
429  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
430  
    }
430  
    }
431  
    return *this;
431  
    return *this;
432  
}
432  
}
433  

433  

434  
template<ReadSource S>
434  
template<ReadSource S>
435  
    requires (!std::same_as<std::decay_t<S>, any_read_source>)
435  
    requires (!std::same_as<std::decay_t<S>, any_read_source>)
436  
any_read_source::any_read_source(S s)
436  
any_read_source::any_read_source(S s)
437  
    : vt_(&vtable_for_impl<S>::value)
437  
    : vt_(&vtable_for_impl<S>::value)
438  
{
438  
{
439  
    struct guard {
439  
    struct guard {
440  
        any_read_source* self;
440  
        any_read_source* self;
441  
        bool committed = false;
441  
        bool committed = false;
442  
        ~guard() {
442  
        ~guard() {
443  
            if(!committed && self->storage_) {
443  
            if(!committed && self->storage_) {
444  
                self->vt_->destroy(self->source_);
444  
                self->vt_->destroy(self->source_);
445  
                ::operator delete(self->storage_);
445  
                ::operator delete(self->storage_);
446  
                self->storage_ = nullptr;
446  
                self->storage_ = nullptr;
447  
                self->source_ = nullptr;
447  
                self->source_ = nullptr;
448  
            }
448  
            }
449  
        }
449  
        }
450  
    } g{this};
450  
    } g{this};
451  

451  

452  
    storage_ = ::operator new(sizeof(S));
452  
    storage_ = ::operator new(sizeof(S));
453  
    source_ = ::new(storage_) S(std::move(s));
453  
    source_ = ::new(storage_) S(std::move(s));
454  

454  

455  
    // Preallocate the awaitable storage
455  
    // Preallocate the awaitable storage
456  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
456  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
457  

457  

458  
    g.committed = true;
458  
    g.committed = true;
459  
}
459  
}
460  

460  

461  
template<ReadSource S>
461  
template<ReadSource S>
462  
any_read_source::any_read_source(S* s)
462  
any_read_source::any_read_source(S* s)
463  
    : source_(s)
463  
    : source_(s)
464  
    , vt_(&vtable_for_impl<S>::value)
464  
    , vt_(&vtable_for_impl<S>::value)
465  
{
465  
{
466  
    // Preallocate the awaitable storage
466  
    // Preallocate the awaitable storage
467  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
467  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
468  
}
468  
}
469  

469  

470  
//----------------------------------------------------------
470  
//----------------------------------------------------------
471  

471  

472  
template<MutableBufferSequence MB>
472  
template<MutableBufferSequence MB>
473  
auto
473  
auto
474  
any_read_source::read_some(MB buffers)
474  
any_read_source::read_some(MB buffers)
475  
{
475  
{
476  
    struct awaitable
476  
    struct awaitable
477  
    {
477  
    {
478  
        any_read_source* self_;
478  
        any_read_source* self_;
479  
        mutable_buffer_array<detail::max_iovec_> ba_;
479  
        mutable_buffer_array<detail::max_iovec_> ba_;
480  

480  

481  
        awaitable(any_read_source* self, MB const& buffers)
481  
        awaitable(any_read_source* self, MB const& buffers)
482  
            : self_(self)
482  
            : self_(self)
483  
            , ba_(buffers)
483  
            , ba_(buffers)
484  
        {
484  
        {
485  
        }
485  
        }
486  

486  

487  
        bool
487  
        bool
488  
        await_ready() const noexcept
488  
        await_ready() const noexcept
489  
        {
489  
        {
490  
            return ba_.to_span().empty();
490  
            return ba_.to_span().empty();
491  
        }
491  
        }
492  

492  

493  
        std::coroutine_handle<>
493  
        std::coroutine_handle<>
494  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
494  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
495  
        {
495  
        {
496  
            self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
496  
            self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
497  
                self_->source_,
497  
                self_->source_,
498  
                self_->cached_awaitable_,
498  
                self_->cached_awaitable_,
499  
                ba_.to_span());
499  
                ba_.to_span());
500  

500  

501  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
501  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
502  
                return h;
502  
                return h;
503  

503  

504  
            return self_->active_ops_->await_suspend(
504  
            return self_->active_ops_->await_suspend(
505  
                self_->cached_awaitable_, h, env);
505  
                self_->cached_awaitable_, h, env);
506  
        }
506  
        }
507  

507  

508  
        io_result<std::size_t>
508  
        io_result<std::size_t>
509  
        await_resume()
509  
        await_resume()
510  
        {
510  
        {
511  
            if(ba_.to_span().empty())
511  
            if(ba_.to_span().empty())
512  
                return {{}, 0};
512  
                return {{}, 0};
513  

513  

514  
            struct guard {
514  
            struct guard {
515  
                any_read_source* self;
515  
                any_read_source* self;
516  
                ~guard() {
516  
                ~guard() {
517  
                    self->active_ops_->destroy(self->cached_awaitable_);
517  
                    self->active_ops_->destroy(self->cached_awaitable_);
518  
                    self->active_ops_ = nullptr;
518  
                    self->active_ops_ = nullptr;
519  
                }
519  
                }
520  
            } g{self_};
520  
            } g{self_};
521  
            return self_->active_ops_->await_resume(
521  
            return self_->active_ops_->await_resume(
522  
                self_->cached_awaitable_);
522  
                self_->cached_awaitable_);
523  
        }
523  
        }
524  
    };
524  
    };
525  
    return awaitable(this, buffers);
525  
    return awaitable(this, buffers);
526  
}
526  
}
527  

527  

528  
inline auto
528  
inline auto
529  
any_read_source::read_(std::span<mutable_buffer const> buffers)
529  
any_read_source::read_(std::span<mutable_buffer const> buffers)
530  
{
530  
{
531  
    struct awaitable
531  
    struct awaitable
532  
    {
532  
    {
533  
        any_read_source* self_;
533  
        any_read_source* self_;
534  
        std::span<mutable_buffer const> buffers_;
534  
        std::span<mutable_buffer const> buffers_;
535  

535  

536  
        bool
536  
        bool
537  
        await_ready() const noexcept
537  
        await_ready() const noexcept
538  
        {
538  
        {
539  
            return false;
539  
            return false;
540  
        }
540  
        }
541  

541  

542  
        std::coroutine_handle<>
542  
        std::coroutine_handle<>
543  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
543  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
544  
        {
544  
        {
545  
            self_->active_ops_ = self_->vt_->construct_read_awaitable(
545  
            self_->active_ops_ = self_->vt_->construct_read_awaitable(
546  
                self_->source_,
546  
                self_->source_,
547  
                self_->cached_awaitable_,
547  
                self_->cached_awaitable_,
548  
                buffers_);
548  
                buffers_);
549  

549  

550  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
550  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
551  
                return h;
551  
                return h;
552  

552  

553  
            return self_->active_ops_->await_suspend(
553  
            return self_->active_ops_->await_suspend(
554  
                self_->cached_awaitable_, h, env);
554  
                self_->cached_awaitable_, h, env);
555  
        }
555  
        }
556  

556  

557  
        io_result<std::size_t>
557  
        io_result<std::size_t>
558  
        await_resume()
558  
        await_resume()
559  
        {
559  
        {
560  
            struct guard {
560  
            struct guard {
561  
                any_read_source* self;
561  
                any_read_source* self;
562  
                ~guard() {
562  
                ~guard() {
563  
                    self->active_ops_->destroy(self->cached_awaitable_);
563  
                    self->active_ops_->destroy(self->cached_awaitable_);
564  
                    self->active_ops_ = nullptr;
564  
                    self->active_ops_ = nullptr;
565  
                }
565  
                }
566  
            } g{self_};
566  
            } g{self_};
567  
            return self_->active_ops_->await_resume(
567  
            return self_->active_ops_->await_resume(
568  
                self_->cached_awaitable_);
568  
                self_->cached_awaitable_);
569  
        }
569  
        }
570  
    };
570  
    };
571  
    return awaitable{this, buffers};
571  
    return awaitable{this, buffers};
572  
}
572  
}
573  

573  

574  
template<MutableBufferSequence MB>
574  
template<MutableBufferSequence MB>
575  
io_task<std::size_t>
575  
io_task<std::size_t>
576  
any_read_source::read(MB buffers)
576  
any_read_source::read(MB buffers)
577  
{
577  
{
578  
    buffer_param bp(buffers);
578  
    buffer_param bp(buffers);
579  
    std::size_t total = 0;
579  
    std::size_t total = 0;
580  

580  

581  
    for(;;)
581  
    for(;;)
582  
    {
582  
    {
583  
        auto bufs = bp.data();
583  
        auto bufs = bp.data();
584  
        if(bufs.empty())
584  
        if(bufs.empty())
585  
            break;
585  
            break;
586  

586  

587  
        auto [ec, n] = co_await read_(bufs);
587  
        auto [ec, n] = co_await read_(bufs);
588  
        total += n;
588  
        total += n;
589  
        if(ec)
589  
        if(ec)
590  
            co_return {ec, total};
590  
            co_return {ec, total};
591  
        bp.consume(n);
591  
        bp.consume(n);
592  
    }
592  
    }
593  

593  

594  
    co_return {{}, total};
594  
    co_return {{}, total};
595  
}
595  
}
596  

596  

597  
} // namespace capy
597  
} // namespace capy
598  
} // namespace boost
598  
} // namespace boost
599  

599  

600  
#endif
600  
#endif