1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_WRITE_SINK_HPP
10  
#ifndef BOOST_CAPY_TEST_WRITE_SINK_HPP
11  
#define BOOST_CAPY_TEST_WRITE_SINK_HPP
11  
#define BOOST_CAPY_TEST_WRITE_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
17  
#include <coroutine>
17  
#include <coroutine>
18  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/error.hpp>
20  
#include <boost/capy/error.hpp>
21  
#include <boost/capy/test/fuse.hpp>
21  
#include <boost/capy/test/fuse.hpp>
22  

22  

23  
#include <algorithm>
23  
#include <algorithm>
24  
#include <stop_token>
24  
#include <stop_token>
25  
#include <string>
25  
#include <string>
26  
#include <string_view>
26  
#include <string_view>
27  

27  

28  
namespace boost {
28  
namespace boost {
29  
namespace capy {
29  
namespace capy {
30  
namespace test {
30  
namespace test {
31  

31  

32  
/** A mock sink for testing write operations.
32  
/** A mock sink for testing write operations.
33  

33  

34  
    Use this to verify code that performs complete writes without needing
34  
    Use this to verify code that performs complete writes without needing
35  
    real I/O. Call @ref write to write data, then @ref data to retrieve
35  
    real I/O. Call @ref write to write data, then @ref data to retrieve
36  
    what was written. The associated @ref fuse enables error injection
36  
    what was written. The associated @ref fuse enables error injection
37  
    at controlled points.
37  
    at controlled points.
38  

38  

39  
    This class satisfies the @ref WriteSink concept by providing partial
39  
    This class satisfies the @ref WriteSink concept by providing partial
40  
    writes via `write_some` (satisfying @ref WriteStream), complete
40  
    writes via `write_some` (satisfying @ref WriteStream), complete
41  
    writes via `write`, and EOF signaling via `write_eof`.
41  
    writes via `write`, and EOF signaling via `write_eof`.
42  

42  

43  
    @par Thread Safety
43  
    @par Thread Safety
44  
    Not thread-safe.
44  
    Not thread-safe.
45  

45  

46  
    @par Example
46  
    @par Example
47  
    @code
47  
    @code
48  
    fuse f;
48  
    fuse f;
49  
    write_sink ws( f );
49  
    write_sink ws( f );
50  

50  

51  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
51  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
52  
        auto [ec, n] = co_await ws.write(
52  
        auto [ec, n] = co_await ws.write(
53  
            const_buffer( "Hello", 5 ) );
53  
            const_buffer( "Hello", 5 ) );
54  
        if( ec )
54  
        if( ec )
55  
            co_return;
55  
            co_return;
56  
        auto [ec2] = co_await ws.write_eof();
56  
        auto [ec2] = co_await ws.write_eof();
57  
        if( ec2 )
57  
        if( ec2 )
58  
            co_return;
58  
            co_return;
59  
        // ws.data() returns "Hello"
59  
        // ws.data() returns "Hello"
60  
    } );
60  
    } );
61  
    @endcode
61  
    @endcode
62  

62  

63  
    @see fuse, WriteSink
63  
    @see fuse, WriteSink
64  
*/
64  
*/
65  
class write_sink
65  
class write_sink
66  
{
66  
{
67  
    fuse f_;
67  
    fuse f_;
68  
    std::string data_;
68  
    std::string data_;
69  
    std::string expect_;
69  
    std::string expect_;
70  
    std::size_t max_write_size_;
70  
    std::size_t max_write_size_;
71  
    bool eof_called_ = false;
71  
    bool eof_called_ = false;
72  

72  

73  
    std::error_code
73  
    std::error_code
74  
    consume_match_() noexcept
74  
    consume_match_() noexcept
75  
    {
75  
    {
76  
        if(data_.empty() || expect_.empty())
76  
        if(data_.empty() || expect_.empty())
77  
            return {};
77  
            return {};
78  
        std::size_t const n = (std::min)(data_.size(), expect_.size());
78  
        std::size_t const n = (std::min)(data_.size(), expect_.size());
79  
        if(std::string_view(data_.data(), n) !=
79  
        if(std::string_view(data_.data(), n) !=
80  
            std::string_view(expect_.data(), n))
80  
            std::string_view(expect_.data(), n))
81  
            return error::test_failure;
81  
            return error::test_failure;
82  
        data_.erase(0, n);
82  
        data_.erase(0, n);
83  
        expect_.erase(0, n);
83  
        expect_.erase(0, n);
84  
        return {};
84  
        return {};
85  
    }
85  
    }
86  

86  

87  
public:
87  
public:
88  
    /** Construct a write sink.
88  
    /** Construct a write sink.
89  

89  

90  
        @param f The fuse used to inject errors during writes.
90  
        @param f The fuse used to inject errors during writes.
91  

91  

92  
        @param max_write_size Maximum bytes transferred per write.
92  
        @param max_write_size Maximum bytes transferred per write.
93  
        Use to simulate chunked delivery.
93  
        Use to simulate chunked delivery.
94  
    */
94  
    */
95  
    explicit write_sink(
95  
    explicit write_sink(
96  
        fuse f = {},
96  
        fuse f = {},
97  
        std::size_t max_write_size = std::size_t(-1)) noexcept
97  
        std::size_t max_write_size = std::size_t(-1)) noexcept
98  
        : f_(std::move(f))
98  
        : f_(std::move(f))
99  
        , max_write_size_(max_write_size)
99  
        , max_write_size_(max_write_size)
100  
    {
100  
    {
101  
    }
101  
    }
102  

102  

103  
    /// Return the written data as a string view.
103  
    /// Return the written data as a string view.
104  
    std::string_view
104  
    std::string_view
105  
    data() const noexcept
105  
    data() const noexcept
106  
    {
106  
    {
107  
        return data_;
107  
        return data_;
108  
    }
108  
    }
109  

109  

110  
    /** Set the expected data for subsequent writes.
110  
    /** Set the expected data for subsequent writes.
111  

111  

112  
        Stores the expected data and immediately tries to match
112  
        Stores the expected data and immediately tries to match
113  
        against any data already written. Matched data is consumed
113  
        against any data already written. Matched data is consumed
114  
        from both buffers.
114  
        from both buffers.
115  

115  

116  
        @param sv The expected data.
116  
        @param sv The expected data.
117  

117  

118  
        @return An error if existing data does not match.
118  
        @return An error if existing data does not match.
119  
    */
119  
    */
120  
    std::error_code
120  
    std::error_code
121  
    expect(std::string_view sv)
121  
    expect(std::string_view sv)
122  
    {
122  
    {
123  
        expect_.assign(sv);
123  
        expect_.assign(sv);
124  
        return consume_match_();
124  
        return consume_match_();
125  
    }
125  
    }
126  

126  

127  
    /// Return the number of bytes written.
127  
    /// Return the number of bytes written.
128  
    std::size_t
128  
    std::size_t
129  
    size() const noexcept
129  
    size() const noexcept
130  
    {
130  
    {
131  
        return data_.size();
131  
        return data_.size();
132  
    }
132  
    }
133  

133  

134  
    /// Return whether write_eof has been called.
134  
    /// Return whether write_eof has been called.
135  
    bool
135  
    bool
136  
    eof_called() const noexcept
136  
    eof_called() const noexcept
137  
    {
137  
    {
138  
        return eof_called_;
138  
        return eof_called_;
139  
    }
139  
    }
140  

140  

141  
    /// Clear all data and reset state.
141  
    /// Clear all data and reset state.
142  
    void
142  
    void
143  
    clear() noexcept
143  
    clear() noexcept
144  
    {
144  
    {
145  
        data_.clear();
145  
        data_.clear();
146  
        expect_.clear();
146  
        expect_.clear();
147  
        eof_called_ = false;
147  
        eof_called_ = false;
148  
    }
148  
    }
149  

149  

150  
    /** Asynchronously write some data to the sink.
150  
    /** Asynchronously write some data to the sink.
151  

151  

152  
        Transfers up to `buffer_size( buffers )` bytes from the provided
152  
        Transfers up to `buffer_size( buffers )` bytes from the provided
153  
        const buffer sequence to the internal buffer. Before every write,
153  
        const buffer sequence to the internal buffer. Before every write,
154  
        the attached @ref fuse is consulted to possibly inject an error.
154  
        the attached @ref fuse is consulted to possibly inject an error.
155  

155  

156  
        @param buffers The const buffer sequence containing data to write.
156  
        @param buffers The const buffer sequence containing data to write.
157  

157  

158  
        @return An awaitable yielding `(error_code,std::size_t)`.
158  
        @return An awaitable yielding `(error_code,std::size_t)`.
159  

159  

160  
        @see fuse
160  
        @see fuse
161  
    */
161  
    */
162  
    template<ConstBufferSequence CB>
162  
    template<ConstBufferSequence CB>
163  
    auto
163  
    auto
164  
    write_some(CB buffers)
164  
    write_some(CB buffers)
165  
    {
165  
    {
166  
        struct awaitable
166  
        struct awaitable
167  
        {
167  
        {
168  
            write_sink* self_;
168  
            write_sink* self_;
169  
            CB buffers_;
169  
            CB buffers_;
170  

170  

171  
            bool await_ready() const noexcept { return true; }
171  
            bool await_ready() const noexcept { return true; }
172  

172  

173  
            void await_suspend(
173  
            void await_suspend(
174  
                std::coroutine_handle<>,
174  
                std::coroutine_handle<>,
175  
                io_env const*) const noexcept
175  
                io_env const*) const noexcept
176  
            {
176  
            {
177  
            }
177  
            }
178  

178  

179  
            io_result<std::size_t>
179  
            io_result<std::size_t>
180  
            await_resume()
180  
            await_resume()
181  
            {
181  
            {
182  
                if(buffer_empty(buffers_))
182  
                if(buffer_empty(buffers_))
183  
                    return {{}, 0};
183  
                    return {{}, 0};
184  

184  

185  
                auto ec = self_->f_.maybe_fail();
185  
                auto ec = self_->f_.maybe_fail();
186  
                if(ec)
186  
                if(ec)
187  
                    return {ec, 0};
187  
                    return {ec, 0};
188  

188  

189  
                std::size_t n = buffer_size(buffers_);
189  
                std::size_t n = buffer_size(buffers_);
190  
                n = (std::min)(n, self_->max_write_size_);
190  
                n = (std::min)(n, self_->max_write_size_);
191  

191  

192  
                std::size_t const old_size = self_->data_.size();
192  
                std::size_t const old_size = self_->data_.size();
193  
                self_->data_.resize(old_size + n);
193  
                self_->data_.resize(old_size + n);
194  
                buffer_copy(make_buffer(
194  
                buffer_copy(make_buffer(
195  
                    self_->data_.data() + old_size, n), buffers_, n);
195  
                    self_->data_.data() + old_size, n), buffers_, n);
196  

196  

197  
                ec = self_->consume_match_();
197  
                ec = self_->consume_match_();
198  
                if(ec)
198  
                if(ec)
199  
                {
199  
                {
200  
                    self_->data_.resize(old_size);
200  
                    self_->data_.resize(old_size);
201  
                    return {ec, 0};
201  
                    return {ec, 0};
202  
                }
202  
                }
203  

203  

204  
                return {{}, n};
204  
                return {{}, n};
205  
            }
205  
            }
206  
        };
206  
        };
207  
        return awaitable{this, buffers};
207  
        return awaitable{this, buffers};
208  
    }
208  
    }
209  

209  

210  
    /** Asynchronously write data to the sink.
210  
    /** Asynchronously write data to the sink.
211  

211  

212  
        Transfers all bytes from the provided const buffer sequence
212  
        Transfers all bytes from the provided const buffer sequence
213  
        to the internal buffer. Unlike @ref write_some, this ignores
213  
        to the internal buffer. Unlike @ref write_some, this ignores
214  
        `max_write_size` and writes all available data, matching the
214  
        `max_write_size` and writes all available data, matching the
215  
        @ref WriteSink semantic contract.
215  
        @ref WriteSink semantic contract.
216  

216  

217  
        @param buffers The const buffer sequence containing data to write.
217  
        @param buffers The const buffer sequence containing data to write.
218  

218  

219  
        @return An awaitable yielding `(error_code,std::size_t)`.
219  
        @return An awaitable yielding `(error_code,std::size_t)`.
220  

220  

221  
        @see fuse
221  
        @see fuse
222  
    */
222  
    */
223  
    template<ConstBufferSequence CB>
223  
    template<ConstBufferSequence CB>
224  
    auto
224  
    auto
225  
    write(CB buffers)
225  
    write(CB buffers)
226  
    {
226  
    {
227  
        struct awaitable
227  
        struct awaitable
228  
        {
228  
        {
229  
            write_sink* self_;
229  
            write_sink* self_;
230  
            CB buffers_;
230  
            CB buffers_;
231  

231  

232  
            bool await_ready() const noexcept { return true; }
232  
            bool await_ready() const noexcept { return true; }
233  

233  

234  
            void await_suspend(
234  
            void await_suspend(
235  
                std::coroutine_handle<>,
235  
                std::coroutine_handle<>,
236  
                io_env const*) const noexcept
236  
                io_env const*) const noexcept
237  
            {
237  
            {
238  
            }
238  
            }
239  

239  

240  
            io_result<std::size_t>
240  
            io_result<std::size_t>
241  
            await_resume()
241  
            await_resume()
242  
            {
242  
            {
243  
                auto ec = self_->f_.maybe_fail();
243  
                auto ec = self_->f_.maybe_fail();
244  
                if(ec)
244  
                if(ec)
245  
                    return {ec, 0};
245  
                    return {ec, 0};
246  

246  

247  
                std::size_t n = buffer_size(buffers_);
247  
                std::size_t n = buffer_size(buffers_);
248  
                if(n == 0)
248  
                if(n == 0)
249  
                    return {{}, 0};
249  
                    return {{}, 0};
250  

250  

251  
                std::size_t const old_size = self_->data_.size();
251  
                std::size_t const old_size = self_->data_.size();
252  
                self_->data_.resize(old_size + n);
252  
                self_->data_.resize(old_size + n);
253  
                buffer_copy(make_buffer(
253  
                buffer_copy(make_buffer(
254  
                    self_->data_.data() + old_size, n), buffers_);
254  
                    self_->data_.data() + old_size, n), buffers_);
255  

255  

256  
                ec = self_->consume_match_();
256  
                ec = self_->consume_match_();
257  
                if(ec)
257  
                if(ec)
258  
                    return {ec, n};
258  
                    return {ec, n};
259  

259  

260  
                return {{}, n};
260  
                return {{}, n};
261  
            }
261  
            }
262  
        };
262  
        };
263  
        return awaitable{this, buffers};
263  
        return awaitable{this, buffers};
264  
    }
264  
    }
265  

265  

266  
    /** Atomically write data and signal end-of-stream.
266  
    /** Atomically write data and signal end-of-stream.
267  

267  

268  
        Transfers all bytes from the provided const buffer sequence to
268  
        Transfers all bytes from the provided const buffer sequence to
269  
        the internal buffer and signals end-of-stream. Before the write,
269  
        the internal buffer and signals end-of-stream. Before the write,
270  
        the attached @ref fuse is consulted to possibly inject an error
270  
        the attached @ref fuse is consulted to possibly inject an error
271  
        for testing fault scenarios.
271  
        for testing fault scenarios.
272  

272  

273  
        @par Effects
273  
        @par Effects
274  
        On success, appends the written bytes to the internal buffer
274  
        On success, appends the written bytes to the internal buffer
275  
        and marks the sink as finalized.
275  
        and marks the sink as finalized.
276  
        If an error is injected by the fuse, the internal buffer remains
276  
        If an error is injected by the fuse, the internal buffer remains
277  
        unchanged.
277  
        unchanged.
278  

278  

279  
        @par Exception Safety
279  
        @par Exception Safety
280  
        No-throw guarantee.
280  
        No-throw guarantee.
281  

281  

282  
        @param buffers The const buffer sequence containing data to write.
282  
        @param buffers The const buffer sequence containing data to write.
283  

283  

284  
        @return An awaitable yielding `(error_code,std::size_t)`.
284  
        @return An awaitable yielding `(error_code,std::size_t)`.
285  

285  

286  
        @see fuse
286  
        @see fuse
287  
    */
287  
    */
288  
    template<ConstBufferSequence CB>
288  
    template<ConstBufferSequence CB>
289  
    auto
289  
    auto
290  
    write_eof(CB buffers)
290  
    write_eof(CB buffers)
291  
    {
291  
    {
292  
        struct awaitable
292  
        struct awaitable
293  
        {
293  
        {
294  
            write_sink* self_;
294  
            write_sink* self_;
295  
            CB buffers_;
295  
            CB buffers_;
296  

296  

297  
            bool await_ready() const noexcept { return true; }
297  
            bool await_ready() const noexcept { return true; }
298  

298  

299  
            void await_suspend(
299  
            void await_suspend(
300  
                std::coroutine_handle<>,
300  
                std::coroutine_handle<>,
301  
                io_env const*) const noexcept
301  
                io_env const*) const noexcept
302  
            {
302  
            {
303  
            }
303  
            }
304  

304  

305  
            io_result<std::size_t>
305  
            io_result<std::size_t>
306  
            await_resume()
306  
            await_resume()
307  
            {
307  
            {
308  
                auto ec = self_->f_.maybe_fail();
308  
                auto ec = self_->f_.maybe_fail();
309  
                if(ec)
309  
                if(ec)
310  
                    return {ec, 0};
310  
                    return {ec, 0};
311  

311  

312  
                std::size_t n = buffer_size(buffers_);
312  
                std::size_t n = buffer_size(buffers_);
313  
                if(n > 0)
313  
                if(n > 0)
314  
                {
314  
                {
315  
                    std::size_t const old_size = self_->data_.size();
315  
                    std::size_t const old_size = self_->data_.size();
316  
                    self_->data_.resize(old_size + n);
316  
                    self_->data_.resize(old_size + n);
317  
                    buffer_copy(make_buffer(
317  
                    buffer_copy(make_buffer(
318  
                        self_->data_.data() + old_size, n), buffers_);
318  
                        self_->data_.data() + old_size, n), buffers_);
319  

319  

320  
                    ec = self_->consume_match_();
320  
                    ec = self_->consume_match_();
321  
                    if(ec)
321  
                    if(ec)
322  
                        return {ec, n};
322  
                        return {ec, n};
323  
                }
323  
                }
324  

324  

325  
                self_->eof_called_ = true;
325  
                self_->eof_called_ = true;
326  

326  

327  
                return {{}, n};
327  
                return {{}, n};
328  
            }
328  
            }
329  
        };
329  
        };
330  
        return awaitable{this, buffers};
330  
        return awaitable{this, buffers};
331  
    }
331  
    }
332  

332  

333  
    /** Signal end-of-stream.
333  
    /** Signal end-of-stream.
334  

334  

335  
        Marks the sink as finalized, indicating no more data will be
335  
        Marks the sink as finalized, indicating no more data will be
336  
        written. Before signaling, the attached @ref fuse is consulted
336  
        written. Before signaling, the attached @ref fuse is consulted
337  
        to possibly inject an error for testing fault scenarios.
337  
        to possibly inject an error for testing fault scenarios.
338  

338  

339  
        @par Effects
339  
        @par Effects
340  
        On success, marks the sink as finalized.
340  
        On success, marks the sink as finalized.
341  
        If an error is injected by the fuse, the state remains unchanged.
341  
        If an error is injected by the fuse, the state remains unchanged.
342  

342  

343  
        @par Exception Safety
343  
        @par Exception Safety
344  
        No-throw guarantee.
344  
        No-throw guarantee.
345  

345  

346  
        @return An awaitable yielding `(error_code)`.
346  
        @return An awaitable yielding `(error_code)`.
347  

347  

348  
        @see fuse
348  
        @see fuse
349  
    */
349  
    */
350  
    auto
350  
    auto
351  
    write_eof()
351  
    write_eof()
352  
    {
352  
    {
353  
        struct awaitable
353  
        struct awaitable
354  
        {
354  
        {
355  
            write_sink* self_;
355  
            write_sink* self_;
356  

356  

357  
            bool await_ready() const noexcept { return true; }
357  
            bool await_ready() const noexcept { return true; }
358  

358  

359  
            // This method is required to satisfy Capy's IoAwaitable concept,
359  
            // This method is required to satisfy Capy's IoAwaitable concept,
360  
            // but is never called because await_ready() returns true.
360  
            // but is never called because await_ready() returns true.
361  
            // See the comment on write(CB buffers) for a detailed explanation.
361  
            // See the comment on write(CB buffers) for a detailed explanation.
362  
            void await_suspend(
362  
            void await_suspend(
363  
                std::coroutine_handle<>,
363  
                std::coroutine_handle<>,
364  
                io_env const*) const noexcept
364  
                io_env const*) const noexcept
365  
            {
365  
            {
366  
            }
366  
            }
367  

367  

368  
            io_result<>
368  
            io_result<>
369  
            await_resume()
369  
            await_resume()
370  
            {
370  
            {
371  
                auto ec = self_->f_.maybe_fail();
371  
                auto ec = self_->f_.maybe_fail();
372  
                if(ec)
372  
                if(ec)
373  
                    return {ec};
373  
                    return {ec};
374  

374  

375  
                self_->eof_called_ = true;
375  
                self_->eof_called_ = true;
376  
                return {};
376  
                return {};
377  
            }
377  
            }
378  
        };
378  
        };
379  
        return awaitable{this};
379  
        return awaitable{this};
380  
    }
380  
    }
381  
};
381  
};
382  

382  

383  
} // test
383  
} // test
384  
} // capy
384  
} // capy
385  
} // boost
385  
} // boost
386  

386  

387  
#endif
387  
#endif