libs/capy/include/boost/capy/io/any_buffer_sink.hpp

89.5% Lines (229/256) 88.7% Functions (63/71) 89.7% Branches (26/29)
libs/capy/include/boost/capy/io/any_buffer_sink.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/buffer_sink.hpp>
19 #include <boost/capy/concept/io_awaitable.hpp>
20 #include <boost/capy/concept/write_sink.hpp>
21 #include <boost/capy/ex/io_env.hpp>
22 #include <boost/capy/io_result.hpp>
23 #include <boost/capy/io_task.hpp>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <exception>
29 #include <new>
30 #include <span>
31 #include <stop_token>
32 #include <system_error>
33 #include <utility>
34
35 namespace boost {
36 namespace capy {
37
38 /** Type-erased wrapper for any BufferSink.
39
40 This class provides type erasure for any type satisfying the
41 @ref BufferSink concept, enabling runtime polymorphism for
42 buffer sink operations. It uses cached awaitable storage to achieve
43 zero steady-state allocation after construction.
44
45 The wrapper exposes two interfaces for producing data:
46 the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47 and the @ref WriteSink interface (`write_some`, `write`,
48 `write_eof`). Choose the interface that matches how your data
49 is produced:
50
51 @par Choosing an Interface
52
53 Use the **BufferSink** interface when you are a generator that
54 produces data into externally-provided buffers. The sink owns
55 the memory; you call @ref prepare to obtain writable buffers,
56 fill them, then call @ref commit or @ref commit_eof.
57
58 Use the **WriteSink** interface when you already have buffers
59 containing the data to write:
60 - If the entire body is available up front, call
61 @ref write_eof(buffers) to send everything atomically.
62 - If data arrives incrementally, call @ref write or
63 @ref write_some in a loop, then @ref write_eof() when done.
64 Prefer `write` (complete) unless your streaming pattern
65 benefits from partial writes via `write_some`.
66
67 If the wrapped type only satisfies @ref BufferSink, the
68 @ref WriteSink operations are provided automatically.
69
70 @par Construction Modes
71
72 - **Owning**: Pass by value to transfer ownership. The wrapper
73 allocates storage and owns the sink.
74 - **Reference**: Pass a pointer to wrap without ownership. The
75 pointed-to sink must outlive this wrapper.
76
77 @par Awaitable Preallocation
78 The constructor preallocates storage for the type-erased awaitable.
79 This reserves all virtual address space at server startup
80 so memory usage can be measured up front, rather than
81 allocating piecemeal as traffic arrives.
82
83 @par Thread Safety
84 Not thread-safe. Concurrent operations on the same wrapper
85 are undefined behavior.
86
87 @par Example
88 @code
89 // Owning - takes ownership of the sink
90 any_buffer_sink abs(some_buffer_sink{args...});
91
92 // Reference - wraps without ownership
93 some_buffer_sink sink;
94 any_buffer_sink abs(&sink);
95
96 // BufferSink interface: generate into callee-owned buffers
97 mutable_buffer arr[16];
98 auto bufs = abs.prepare(arr);
99 // Write data into bufs[0..bufs.size())
100 auto [ec] = co_await abs.commit(bytes_written);
101 auto [ec2] = co_await abs.commit_eof(0);
102
103 // WriteSink interface: send caller-owned buffers
104 auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105 auto [ec4] = co_await abs.write_eof();
106
107 // Or send everything at once
108 auto [ec5, n2] = co_await abs.write_eof(
109 make_buffer(body_data));
110 @endcode
111
112 @see any_buffer_source, BufferSink, WriteSink
113 */
114 class any_buffer_sink
115 {
116 struct vtable;
117 struct awaitable_ops;
118 struct write_awaitable_ops;
119
120 template<BufferSink S>
121 struct vtable_for_impl;
122
123 // hot-path members first for cache locality
124 void* sink_ = nullptr;
125 vtable const* vt_ = nullptr;
126 void* cached_awaitable_ = nullptr;
127 awaitable_ops const* active_ops_ = nullptr;
128 write_awaitable_ops const* active_write_ops_ = nullptr;
129 void* storage_ = nullptr;
130
131 public:
132 /** Destructor.
133
134 Destroys the owned sink (if any) and releases the cached
135 awaitable storage.
136 */
137 ~any_buffer_sink();
138
139 /** Default constructor.
140
141 Constructs an empty wrapper. Operations on a default-constructed
142 wrapper result in undefined behavior.
143 */
144 any_buffer_sink() = default;
145
146 /** Non-copyable.
147
148 The awaitable cache is per-instance and cannot be shared.
149 */
150 any_buffer_sink(any_buffer_sink const&) = delete;
151 any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152
153 /** Move constructor.
154
155 Transfers ownership of the wrapped sink (if owned) and
156 cached awaitable storage from `other`. After the move, `other` is
157 in a default-constructed state.
158
159 @param other The wrapper to move from.
160 */
161 2 any_buffer_sink(any_buffer_sink&& other) noexcept
162 2 : sink_(std::exchange(other.sink_, nullptr))
163 2 , vt_(std::exchange(other.vt_, nullptr))
164 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165 2 , active_ops_(std::exchange(other.active_ops_, nullptr))
166 2 , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167 2 , storage_(std::exchange(other.storage_, nullptr))
168 {
169 2 }
170
171 /** Move assignment operator.
172
173 Destroys any owned sink and releases existing resources,
174 then transfers ownership from `other`.
175
176 @param other The wrapper to move from.
177 @return Reference to this wrapper.
178 */
179 any_buffer_sink&
180 operator=(any_buffer_sink&& other) noexcept;
181
182 /** Construct by taking ownership of a BufferSink.
183
184 Allocates storage and moves the sink into this wrapper.
185 The wrapper owns the sink and will destroy it. If `S` also
186 satisfies @ref WriteSink, native write operations are
187 forwarded through the virtual boundary.
188
189 @param s The sink to take ownership of.
190 */
191 template<BufferSink S>
192 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193 any_buffer_sink(S s);
194
195 /** Construct by wrapping a BufferSink without ownership.
196
197 Wraps the given sink by pointer. The sink must remain
198 valid for the lifetime of this wrapper. If `S` also
199 satisfies @ref WriteSink, native write operations are
200 forwarded through the virtual boundary.
201
202 @param s Pointer to the sink to wrap.
203 */
204 template<BufferSink S>
205 any_buffer_sink(S* s);
206
207 /** Check if the wrapper contains a valid sink.
208
209 @return `true` if wrapping a sink, `false` if default-constructed
210 or moved-from.
211 */
212 bool
213 26 has_value() const noexcept
214 {
215 26 return sink_ != nullptr;
216 }
217
218 /** Check if the wrapper contains a valid sink.
219
220 @return `true` if wrapping a sink, `false` if default-constructed
221 or moved-from.
222 */
223 explicit
224 3 operator bool() const noexcept
225 {
226 3 return has_value();
227 }
228
229 /** Prepare writable buffers.
230
231 Fills the provided span with mutable buffer descriptors
232 pointing to the underlying sink's internal storage. This
233 operation is synchronous.
234
235 @param dest Span of mutable_buffer to fill.
236
237 @return A span of filled buffers.
238
239 @par Preconditions
240 The wrapper must contain a valid sink (`has_value() == true`).
241 */
242 std::span<mutable_buffer>
243 prepare(std::span<mutable_buffer> dest);
244
245 /** Commit bytes written to the prepared buffers.
246
247 Commits `n` bytes written to the buffers returned by the
248 most recent call to @ref prepare. The operation may trigger
249 underlying I/O.
250
251 @param n The number of bytes to commit.
252
253 @return An awaitable yielding `(error_code)`.
254
255 @par Preconditions
256 The wrapper must contain a valid sink (`has_value() == true`).
257 */
258 auto
259 commit(std::size_t n);
260
261 /** Commit final bytes and signal end-of-stream.
262
263 Commits `n` bytes written to the buffers returned by the
264 most recent call to @ref prepare and finalizes the sink.
265 After success, no further operations are permitted.
266
267 @param n The number of bytes to commit.
268
269 @return An awaitable yielding `(error_code)`.
270
271 @par Preconditions
272 The wrapper must contain a valid sink (`has_value() == true`).
273 */
274 auto
275 commit_eof(std::size_t n);
276
277 /** Write some data from a buffer sequence.
278
279 Writes one or more bytes from the buffer sequence to the
280 underlying sink. May consume less than the full sequence.
281
282 When the wrapped type provides native @ref WriteSink support,
283 the operation forwards directly. Otherwise it is synthesized
284 from @ref prepare and @ref commit with a buffer copy.
285
286 @param buffers The buffer sequence to write.
287
288 @return An awaitable yielding `(error_code,std::size_t)`.
289
290 @par Preconditions
291 The wrapper must contain a valid sink (`has_value() == true`).
292 */
293 template<ConstBufferSequence CB>
294 io_task<std::size_t>
295 write_some(CB buffers);
296
297 /** Write all data from a buffer sequence.
298
299 Writes all data from the buffer sequence to the underlying
300 sink. This method satisfies the @ref WriteSink concept.
301
302 When the wrapped type provides native @ref WriteSink support,
303 each window is forwarded directly. Otherwise the data is
304 copied into the sink via @ref prepare and @ref commit.
305
306 @param buffers The buffer sequence to write.
307
308 @return An awaitable yielding `(error_code,std::size_t)`.
309
310 @par Preconditions
311 The wrapper must contain a valid sink (`has_value() == true`).
312 */
313 template<ConstBufferSequence CB>
314 io_task<std::size_t>
315 write(CB buffers);
316
317 /** Atomically write data and signal end-of-stream.
318
319 Writes all data from the buffer sequence to the underlying
320 sink and then signals end-of-stream.
321
322 When the wrapped type provides native @ref WriteSink support,
323 the final window is sent atomically via the underlying
324 `write_eof(buffers)`. Otherwise the data is synthesized
325 through @ref prepare, @ref commit, and @ref commit_eof.
326
327 @param buffers The buffer sequence to write.
328
329 @return An awaitable yielding `(error_code,std::size_t)`.
330
331 @par Preconditions
332 The wrapper must contain a valid sink (`has_value() == true`).
333 */
334 template<ConstBufferSequence CB>
335 io_task<std::size_t>
336 write_eof(CB buffers);
337
338 /** Signal end-of-stream.
339
340 Indicates that no more data will be written to the sink.
341 This method satisfies the @ref WriteSink concept.
342
343 When the wrapped type provides native @ref WriteSink support,
344 the underlying `write_eof()` is called. Otherwise the
345 operation is implemented as `commit_eof(0)`.
346
347 @return An awaitable yielding `(error_code)`.
348
349 @par Preconditions
350 The wrapper must contain a valid sink (`has_value() == true`).
351 */
352 auto
353 write_eof();
354
355 protected:
356 /** Rebind to a new sink after move.
357
358 Updates the internal pointer to reference a new sink object.
359 Used by owning wrappers after move assignment when the owned
360 object has moved to a new location.
361
362 @param new_sink The new sink to bind to. Must be the same
363 type as the original sink.
364
365 @note Terminates if called with a sink of different type
366 than the original.
367 */
368 template<BufferSink S>
369 void
370 rebind(S& new_sink) noexcept
371 {
372 if(vt_ != &vtable_for_impl<S>::value)
373 std::terminate();
374 sink_ = &new_sink;
375 }
376
377 private:
378 /** Forward a partial write through the vtable.
379
380 Constructs the underlying `write_some` awaitable in
381 cached storage and returns a type-erased awaitable.
382 */
383 auto
384 write_some_(std::span<const_buffer const> buffers);
385
386 /** Forward a complete write through the vtable.
387
388 Constructs the underlying `write` awaitable in
389 cached storage and returns a type-erased awaitable.
390 */
391 auto
392 write_(std::span<const_buffer const> buffers);
393
394 /** Forward an atomic write-with-EOF through the vtable.
395
396 Constructs the underlying `write_eof(buffers)` awaitable
397 in cached storage and returns a type-erased awaitable.
398 */
399 auto
400 write_eof_buffers_(std::span<const_buffer const> buffers);
401 };
402
403 //----------------------------------------------------------
404
405 /** Type-erased ops for awaitables yielding `io_result<>`. */
406 struct any_buffer_sink::awaitable_ops
407 {
408 bool (*await_ready)(void*);
409 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
410 io_result<> (*await_resume)(void*);
411 void (*destroy)(void*) noexcept;
412 };
413
414 /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
415 struct any_buffer_sink::write_awaitable_ops
416 {
417 bool (*await_ready)(void*);
418 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
419 io_result<std::size_t> (*await_resume)(void*);
420 void (*destroy)(void*) noexcept;
421 };
422
423 struct any_buffer_sink::vtable
424 {
425 void (*destroy)(void*) noexcept;
426 std::span<mutable_buffer> (*do_prepare)(
427 void* sink,
428 std::span<mutable_buffer> dest);
429 std::size_t awaitable_size;
430 std::size_t awaitable_align;
431 awaitable_ops const* (*construct_commit_awaitable)(
432 void* sink,
433 void* storage,
434 std::size_t n);
435 awaitable_ops const* (*construct_commit_eof_awaitable)(
436 void* sink,
437 void* storage,
438 std::size_t n);
439
440 // WriteSink forwarding (null when wrapped type is BufferSink-only)
441 write_awaitable_ops const* (*construct_write_some_awaitable)(
442 void* sink,
443 void* storage,
444 std::span<const_buffer const> buffers);
445 write_awaitable_ops const* (*construct_write_awaitable)(
446 void* sink,
447 void* storage,
448 std::span<const_buffer const> buffers);
449 write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
450 void* sink,
451 void* storage,
452 std::span<const_buffer const> buffers);
453 awaitable_ops const* (*construct_write_eof_awaitable)(
454 void* sink,
455 void* storage);
456 };
457
458 template<BufferSink S>
459 struct any_buffer_sink::vtable_for_impl
460 {
461 using CommitAwaitable = decltype(std::declval<S&>().commit(
462 std::size_t{}));
463 using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
464 std::size_t{}));
465
466 static void
467 18 do_destroy_impl(void* sink) noexcept
468 {
469 18 static_cast<S*>(sink)->~S();
470 18 }
471
472 static std::span<mutable_buffer>
473 126 do_prepare_impl(
474 void* sink,
475 std::span<mutable_buffer> dest)
476 {
477 126 auto& s = *static_cast<S*>(sink);
478 126 return s.prepare(dest);
479 }
480
481 static awaitable_ops const*
482 96 construct_commit_awaitable_impl(
483 void* sink,
484 void* storage,
485 std::size_t n)
486 {
487 96 auto& s = *static_cast<S*>(sink);
488 96 ::new(storage) CommitAwaitable(s.commit(n));
489
490 static constexpr awaitable_ops ops = {
491 +[](void* p) {
492 return static_cast<CommitAwaitable*>(p)->await_ready();
493 },
494 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
495 return detail::call_await_suspend(
496 static_cast<CommitAwaitable*>(p), h, env);
497 },
498 +[](void* p) {
499 return static_cast<CommitAwaitable*>(p)->await_resume();
500 },
501 +[](void* p) noexcept {
502 static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
503 }
504 };
505 96 return &ops;
506 }
507
508 static awaitable_ops const*
509 70 construct_commit_eof_awaitable_impl(
510 void* sink,
511 void* storage,
512 std::size_t n)
513 {
514 70 auto& s = *static_cast<S*>(sink);
515 70 ::new(storage) CommitEofAwaitable(s.commit_eof(n));
516
517 static constexpr awaitable_ops ops = {
518 +[](void* p) {
519 return static_cast<CommitEofAwaitable*>(p)->await_ready();
520 },
521 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
522 return detail::call_await_suspend(
523 static_cast<CommitEofAwaitable*>(p), h, env);
524 },
525 +[](void* p) {
526 return static_cast<CommitEofAwaitable*>(p)->await_resume();
527 },
528 +[](void* p) noexcept {
529 static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
530 }
531 };
532 70 return &ops;
533 }
534
535 //------------------------------------------------------
536 // WriteSink forwarding (only instantiated when WriteSink<S>)
537
538 static write_awaitable_ops const*
539 6 construct_write_some_awaitable_impl(
540 void* sink,
541 void* storage,
542 std::span<const_buffer const> buffers)
543 requires WriteSink<S>
544 {
545 using Aw = decltype(std::declval<S&>().write_some(
546 std::span<const_buffer const>{}));
547 6 auto& s = *static_cast<S*>(sink);
548 6 ::new(storage) Aw(s.write_some(buffers));
549
550 static constexpr write_awaitable_ops ops = {
551 6 +[](void* p) {
552 6 return static_cast<Aw*>(p)->await_ready();
553 },
554 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
555 return detail::call_await_suspend(
556 static_cast<Aw*>(p), h, env);
557 },
558 6 +[](void* p) {
559 6 return static_cast<Aw*>(p)->await_resume();
560 },
561 6 +[](void* p) noexcept {
562 6 static_cast<Aw*>(p)->~Aw();
563 }
564 };
565 6 return &ops;
566 }
567
568 static write_awaitable_ops const*
569 14 construct_write_awaitable_impl(
570 void* sink,
571 void* storage,
572 std::span<const_buffer const> buffers)
573 requires WriteSink<S>
574 {
575 using Aw = decltype(std::declval<S&>().write(
576 std::span<const_buffer const>{}));
577 14 auto& s = *static_cast<S*>(sink);
578 14 ::new(storage) Aw(s.write(buffers));
579
580 static constexpr write_awaitable_ops ops = {
581 14 +[](void* p) {
582 14 return static_cast<Aw*>(p)->await_ready();
583 },
584 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
585 return detail::call_await_suspend(
586 static_cast<Aw*>(p), h, env);
587 },
588 14 +[](void* p) {
589 14 return static_cast<Aw*>(p)->await_resume();
590 },
591 14 +[](void* p) noexcept {
592 14 static_cast<Aw*>(p)->~Aw();
593 }
594 };
595 14 return &ops;
596 }
597
598 static write_awaitable_ops const*
599 12 construct_write_eof_buffers_awaitable_impl(
600 void* sink,
601 void* storage,
602 std::span<const_buffer const> buffers)
603 requires WriteSink<S>
604 {
605 using Aw = decltype(std::declval<S&>().write_eof(
606 std::span<const_buffer const>{}));
607 12 auto& s = *static_cast<S*>(sink);
608 12 ::new(storage) Aw(s.write_eof(buffers));
609
610 static constexpr write_awaitable_ops ops = {
611 12 +[](void* p) {
612 12 return static_cast<Aw*>(p)->await_ready();
613 },
614 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
615 return detail::call_await_suspend(
616 static_cast<Aw*>(p), h, env);
617 },
618 12 +[](void* p) {
619 12 return static_cast<Aw*>(p)->await_resume();
620 },
621 12 +[](void* p) noexcept {
622 12 static_cast<Aw*>(p)->~Aw();
623 }
624 };
625 12 return &ops;
626 }
627
628 static awaitable_ops const*
629 16 construct_write_eof_awaitable_impl(
630 void* sink,
631 void* storage)
632 requires WriteSink<S>
633 {
634 using Aw = decltype(std::declval<S&>().write_eof());
635 16 auto& s = *static_cast<S*>(sink);
636 16 ::new(storage) Aw(s.write_eof());
637
638 static constexpr awaitable_ops ops = {
639 16 +[](void* p) {
640 16 return static_cast<Aw*>(p)->await_ready();
641 },
642 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
643 return detail::call_await_suspend(
644 static_cast<Aw*>(p), h, env);
645 },
646 16 +[](void* p) {
647 16 return static_cast<Aw*>(p)->await_resume();
648 },
649 16 +[](void* p) noexcept {
650 16 static_cast<Aw*>(p)->~Aw();
651 }
652 };
653 16 return &ops;
654 }
655
656 //------------------------------------------------------
657
658 static consteval std::size_t
659 compute_max_size() noexcept
660 {
661 std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
662 ? sizeof(CommitAwaitable)
663 : sizeof(CommitEofAwaitable);
664 if constexpr (WriteSink<S>)
665 {
666 using WS = decltype(std::declval<S&>().write_some(
667 std::span<const_buffer const>{}));
668 using W = decltype(std::declval<S&>().write(
669 std::span<const_buffer const>{}));
670 using WEB = decltype(std::declval<S&>().write_eof(
671 std::span<const_buffer const>{}));
672 using WE = decltype(std::declval<S&>().write_eof());
673
674 if(sizeof(WS) > s) s = sizeof(WS);
675 if(sizeof(W) > s) s = sizeof(W);
676 if(sizeof(WEB) > s) s = sizeof(WEB);
677 if(sizeof(WE) > s) s = sizeof(WE);
678 }
679 return s;
680 }
681
682 static consteval std::size_t
683 compute_max_align() noexcept
684 {
685 std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
686 ? alignof(CommitAwaitable)
687 : alignof(CommitEofAwaitable);
688 if constexpr (WriteSink<S>)
689 {
690 using WS = decltype(std::declval<S&>().write_some(
691 std::span<const_buffer const>{}));
692 using W = decltype(std::declval<S&>().write(
693 std::span<const_buffer const>{}));
694 using WEB = decltype(std::declval<S&>().write_eof(
695 std::span<const_buffer const>{}));
696 using WE = decltype(std::declval<S&>().write_eof());
697
698 if(alignof(WS) > a) a = alignof(WS);
699 if(alignof(W) > a) a = alignof(W);
700 if(alignof(WEB) > a) a = alignof(WEB);
701 if(alignof(WE) > a) a = alignof(WE);
702 }
703 return a;
704 }
705
706 static consteval vtable
707 make_vtable() noexcept
708 {
709 vtable v{};
710 v.destroy = &do_destroy_impl;
711 v.do_prepare = &do_prepare_impl;
712 v.awaitable_size = compute_max_size();
713 v.awaitable_align = compute_max_align();
714 v.construct_commit_awaitable = &construct_commit_awaitable_impl;
715 v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
716 v.construct_write_some_awaitable = nullptr;
717 v.construct_write_awaitable = nullptr;
718 v.construct_write_eof_buffers_awaitable = nullptr;
719 v.construct_write_eof_awaitable = nullptr;
720
721 if constexpr (WriteSink<S>)
722 {
723 v.construct_write_some_awaitable =
724 &construct_write_some_awaitable_impl;
725 v.construct_write_awaitable =
726 &construct_write_awaitable_impl;
727 v.construct_write_eof_buffers_awaitable =
728 &construct_write_eof_buffers_awaitable_impl;
729 v.construct_write_eof_awaitable =
730 &construct_write_eof_awaitable_impl;
731 }
732 return v;
733 }
734
735 static constexpr vtable value = make_vtable();
736 };
737
738 //----------------------------------------------------------
739
740 inline
741 215 any_buffer_sink::~any_buffer_sink()
742 {
743
2/2
✓ Branch 0 taken 17 times.
✓ Branch 1 taken 198 times.
215 if(storage_)
744 {
745 17 vt_->destroy(sink_);
746 17 ::operator delete(storage_);
747 }
748
2/2
✓ Branch 0 taken 208 times.
✓ Branch 1 taken 7 times.
215 if(cached_awaitable_)
749 208 ::operator delete(cached_awaitable_);
750 215 }
751
752 inline any_buffer_sink&
753 5 any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
754 {
755
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 if(this != &other)
756 {
757
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if(storage_)
758 {
759 1 vt_->destroy(sink_);
760 1 ::operator delete(storage_);
761 }
762
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if(cached_awaitable_)
763 2 ::operator delete(cached_awaitable_);
764 4 sink_ = std::exchange(other.sink_, nullptr);
765 4 vt_ = std::exchange(other.vt_, nullptr);
766 4 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
767 4 storage_ = std::exchange(other.storage_, nullptr);
768 4 active_ops_ = std::exchange(other.active_ops_, nullptr);
769 4 active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
770 }
771 5 return *this;
772 }
773
774 template<BufferSink S>
775 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
776 18 any_buffer_sink::any_buffer_sink(S s)
777 18 : vt_(&vtable_for_impl<S>::value)
778 {
779 struct guard {
780 any_buffer_sink* self;
781 bool committed = false;
782 ~guard() {
783 if(!committed && self->storage_) {
784 self->vt_->destroy(self->sink_);
785 ::operator delete(self->storage_);
786 self->storage_ = nullptr;
787 self->sink_ = nullptr;
788 }
789 }
790 18 } g{this};
791
792
1/1
✓ Branch 1 taken 18 times.
18 storage_ = ::operator new(sizeof(S));
793 18 sink_ = ::new(storage_) S(std::move(s));
794
795
1/1
✓ Branch 1 taken 18 times.
18 cached_awaitable_ = ::operator new(vt_->awaitable_size);
796
797 18 g.committed = true;
798 18 }
799
800 template<BufferSink S>
801 192 any_buffer_sink::any_buffer_sink(S* s)
802 192 : sink_(s)
803 192 , vt_(&vtable_for_impl<S>::value)
804 {
805 192 cached_awaitable_ = ::operator new(vt_->awaitable_size);
806 192 }
807
808 //----------------------------------------------------------
809
810 inline std::span<mutable_buffer>
811 126 any_buffer_sink::prepare(std::span<mutable_buffer> dest)
812 {
813 126 return vt_->do_prepare(sink_, dest);
814 }
815
816 inline auto
817 96 any_buffer_sink::commit(std::size_t n)
818 {
819 struct awaitable
820 {
821 any_buffer_sink* self_;
822 std::size_t n_;
823
824 bool
825 96 await_ready()
826 {
827 192 self_->active_ops_ = self_->vt_->construct_commit_awaitable(
828 96 self_->sink_,
829 96 self_->cached_awaitable_,
830 n_);
831 96 return self_->active_ops_->await_ready(self_->cached_awaitable_);
832 }
833
834 std::coroutine_handle<>
835 await_suspend(std::coroutine_handle<> h, io_env const* env)
836 {
837 return self_->active_ops_->await_suspend(
838 self_->cached_awaitable_, h, env);
839 }
840
841 io_result<>
842 96 await_resume()
843 {
844 struct guard {
845 any_buffer_sink* self;
846 96 ~guard() {
847 96 self->active_ops_->destroy(self->cached_awaitable_);
848 96 self->active_ops_ = nullptr;
849 96 }
850 96 } g{self_};
851 96 return self_->active_ops_->await_resume(
852
1/1
✓ Branch 1 taken 70 times.
166 self_->cached_awaitable_);
853 96 }
854 };
855 96 return awaitable{this, n};
856 }
857
858 inline auto
859 54 any_buffer_sink::commit_eof(std::size_t n)
860 {
861 struct awaitable
862 {
863 any_buffer_sink* self_;
864 std::size_t n_;
865
866 bool
867 54 await_ready()
868 {
869 108 self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
870 54 self_->sink_,
871 54 self_->cached_awaitable_,
872 n_);
873 54 return self_->active_ops_->await_ready(self_->cached_awaitable_);
874 }
875
876 std::coroutine_handle<>
877 await_suspend(std::coroutine_handle<> h, io_env const* env)
878 {
879 return self_->active_ops_->await_suspend(
880 self_->cached_awaitable_, h, env);
881 }
882
883 io_result<>
884 54 await_resume()
885 {
886 struct guard {
887 any_buffer_sink* self;
888 54 ~guard() {
889 54 self->active_ops_->destroy(self->cached_awaitable_);
890 54 self->active_ops_ = nullptr;
891 54 }
892 54 } g{self_};
893 54 return self_->active_ops_->await_resume(
894
1/1
✓ Branch 1 taken 38 times.
92 self_->cached_awaitable_);
895 54 }
896 };
897 54 return awaitable{this, n};
898 }
899
900 //----------------------------------------------------------
901 // Private helpers for native WriteSink forwarding
902
903 inline auto
904 6 any_buffer_sink::write_some_(
905 std::span<const_buffer const> buffers)
906 {
907 struct awaitable
908 {
909 any_buffer_sink* self_;
910 std::span<const_buffer const> buffers_;
911
912 bool
913 6 await_ready() const noexcept
914 {
915 6 return false;
916 }
917
918 std::coroutine_handle<>
919 6 await_suspend(std::coroutine_handle<> h, io_env const* env)
920 {
921 12 self_->active_write_ops_ =
922 12 self_->vt_->construct_write_some_awaitable(
923 6 self_->sink_,
924 6 self_->cached_awaitable_,
925 buffers_);
926
927
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if(self_->active_write_ops_->await_ready(
928 6 self_->cached_awaitable_))
929 6 return h;
930
931 return self_->active_write_ops_->await_suspend(
932 self_->cached_awaitable_, h, env);
933 }
934
935 io_result<std::size_t>
936 6 await_resume()
937 {
938 struct guard {
939 any_buffer_sink* self;
940 6 ~guard() {
941 6 self->active_write_ops_->destroy(
942 6 self->cached_awaitable_);
943 6 self->active_write_ops_ = nullptr;
944 6 }
945 6 } g{self_};
946 6 return self_->active_write_ops_->await_resume(
947
1/1
✓ Branch 1 taken 4 times.
10 self_->cached_awaitable_);
948 6 }
949 };
950 6 return awaitable{this, buffers};
951 }
952
953 inline auto
954 14 any_buffer_sink::write_(
955 std::span<const_buffer const> buffers)
956 {
957 struct awaitable
958 {
959 any_buffer_sink* self_;
960 std::span<const_buffer const> buffers_;
961
962 bool
963 14 await_ready() const noexcept
964 {
965 14 return false;
966 }
967
968 std::coroutine_handle<>
969 14 await_suspend(std::coroutine_handle<> h, io_env const* env)
970 {
971 28 self_->active_write_ops_ =
972 28 self_->vt_->construct_write_awaitable(
973 14 self_->sink_,
974 14 self_->cached_awaitable_,
975 buffers_);
976
977
1/2
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
14 if(self_->active_write_ops_->await_ready(
978 14 self_->cached_awaitable_))
979 14 return h;
980
981 return self_->active_write_ops_->await_suspend(
982 self_->cached_awaitable_, h, env);
983 }
984
985 io_result<std::size_t>
986 14 await_resume()
987 {
988 struct guard {
989 any_buffer_sink* self;
990 14 ~guard() {
991 14 self->active_write_ops_->destroy(
992 14 self->cached_awaitable_);
993 14 self->active_write_ops_ = nullptr;
994 14 }
995 14 } g{self_};
996 14 return self_->active_write_ops_->await_resume(
997
1/1
✓ Branch 1 taken 10 times.
24 self_->cached_awaitable_);
998 14 }
999 };
1000 14 return awaitable{this, buffers};
1001 }
1002
1003 inline auto
1004 12 any_buffer_sink::write_eof_buffers_(
1005 std::span<const_buffer const> buffers)
1006 {
1007 struct awaitable
1008 {
1009 any_buffer_sink* self_;
1010 std::span<const_buffer const> buffers_;
1011
1012 bool
1013 12 await_ready() const noexcept
1014 {
1015 12 return false;
1016 }
1017
1018 std::coroutine_handle<>
1019 12 await_suspend(std::coroutine_handle<> h, io_env const* env)
1020 {
1021 24 self_->active_write_ops_ =
1022 24 self_->vt_->construct_write_eof_buffers_awaitable(
1023 12 self_->sink_,
1024 12 self_->cached_awaitable_,
1025 buffers_);
1026
1027
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 if(self_->active_write_ops_->await_ready(
1028 12 self_->cached_awaitable_))
1029 12 return h;
1030
1031 return self_->active_write_ops_->await_suspend(
1032 self_->cached_awaitable_, h, env);
1033 }
1034
1035 io_result<std::size_t>
1036 12 await_resume()
1037 {
1038 struct guard {
1039 any_buffer_sink* self;
1040 12 ~guard() {
1041 12 self->active_write_ops_->destroy(
1042 12 self->cached_awaitable_);
1043 12 self->active_write_ops_ = nullptr;
1044 12 }
1045 12 } g{self_};
1046 12 return self_->active_write_ops_->await_resume(
1047
1/1
✓ Branch 1 taken 8 times.
20 self_->cached_awaitable_);
1048 12 }
1049 };
1050 12 return awaitable{this, buffers};
1051 }
1052
1053 //----------------------------------------------------------
1054 // Public WriteSink methods
1055
1056 template<ConstBufferSequence CB>
1057 io_task<std::size_t>
1058
1/1
✓ Branch 1 taken 22 times.
22 any_buffer_sink::write_some(CB buffers)
1059 {
1060 buffer_param<CB> bp(buffers);
1061 auto src = bp.data();
1062 if(src.empty())
1063 co_return {{}, 0};
1064
1065 // Native WriteSink path
1066 if(vt_->construct_write_some_awaitable)
1067 co_return co_await write_some_(src);
1068
1069 // Synthesized path: prepare + buffer_copy + commit
1070 mutable_buffer arr[detail::max_iovec_];
1071 auto dst_bufs = prepare(arr);
1072 if(dst_bufs.empty())
1073 {
1074 auto [ec] = co_await commit(0);
1075 if(ec)
1076 co_return {ec, 0};
1077 dst_bufs = prepare(arr);
1078 if(dst_bufs.empty())
1079 co_return {{}, 0};
1080 }
1081
1082 auto n = buffer_copy(dst_bufs, src);
1083 auto [ec] = co_await commit(n);
1084 if(ec)
1085 co_return {ec, 0};
1086 co_return {{}, n};
1087 44 }
1088
1089 template<ConstBufferSequence CB>
1090 io_task<std::size_t>
1091
1/1
✓ Branch 1 taken 38 times.
38 any_buffer_sink::write(CB buffers)
1092 {
1093 buffer_param<CB> bp(buffers);
1094 std::size_t total = 0;
1095
1096 // Native WriteSink path
1097 if(vt_->construct_write_awaitable)
1098 {
1099 for(;;)
1100 {
1101 auto bufs = bp.data();
1102 if(bufs.empty())
1103 break;
1104
1105 auto [ec, n] = co_await write_(bufs);
1106 total += n;
1107 if(ec)
1108 co_return {ec, total};
1109 bp.consume(n);
1110 }
1111 co_return {{}, total};
1112 }
1113
1114 // Synthesized path: prepare + buffer_copy + commit
1115 for(;;)
1116 {
1117 auto src = bp.data();
1118 if(src.empty())
1119 break;
1120
1121 mutable_buffer arr[detail::max_iovec_];
1122 auto dst_bufs = prepare(arr);
1123 if(dst_bufs.empty())
1124 {
1125 auto [ec] = co_await commit(0);
1126 if(ec)
1127 co_return {ec, total};
1128 continue;
1129 }
1130
1131 auto n = buffer_copy(dst_bufs, src);
1132 auto [ec] = co_await commit(n);
1133 if(ec)
1134 co_return {ec, total};
1135 bp.consume(n);
1136 total += n;
1137 }
1138
1139 co_return {{}, total};
1140 76 }
1141
1142 inline auto
1143 32 any_buffer_sink::write_eof()
1144 {
1145 struct awaitable
1146 {
1147 any_buffer_sink* self_;
1148
1149 bool
1150 32 await_ready()
1151 {
1152
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 16 times.
32 if(self_->vt_->construct_write_eof_awaitable)
1153 {
1154 // Native WriteSink: forward to underlying write_eof()
1155 32 self_->active_ops_ =
1156 16 self_->vt_->construct_write_eof_awaitable(
1157 16 self_->sink_,
1158 16 self_->cached_awaitable_);
1159 }
1160 else
1161 {
1162 // Synthesized: commit_eof(0)
1163 32 self_->active_ops_ =
1164 16 self_->vt_->construct_commit_eof_awaitable(
1165 16 self_->sink_,
1166 16 self_->cached_awaitable_,
1167 0);
1168 }
1169 64 return self_->active_ops_->await_ready(
1170 32 self_->cached_awaitable_);
1171 }
1172
1173 std::coroutine_handle<>
1174 await_suspend(std::coroutine_handle<> h, io_env const* env)
1175 {
1176 return self_->active_ops_->await_suspend(
1177 self_->cached_awaitable_, h, env);
1178 }
1179
1180 io_result<>
1181 32 await_resume()
1182 {
1183 struct guard {
1184 any_buffer_sink* self;
1185 32 ~guard() {
1186 32 self->active_ops_->destroy(self->cached_awaitable_);
1187 32 self->active_ops_ = nullptr;
1188 32 }
1189 32 } g{self_};
1190 32 return self_->active_ops_->await_resume(
1191
1/1
✓ Branch 1 taken 22 times.
54 self_->cached_awaitable_);
1192 32 }
1193 };
1194 32 return awaitable{this};
1195 }
1196
1197 template<ConstBufferSequence CB>
1198 io_task<std::size_t>
1199
1/1
✓ Branch 1 taken 40 times.
40 any_buffer_sink::write_eof(CB buffers)
1200 {
1201 // Native WriteSink path
1202 if(vt_->construct_write_eof_buffers_awaitable)
1203 {
1204 const_buffer_param<CB> bp(buffers);
1205 std::size_t total = 0;
1206
1207 for(;;)
1208 {
1209 auto bufs = bp.data();
1210 if(bufs.empty())
1211 {
1212 auto [ec] = co_await write_eof();
1213 co_return {ec, total};
1214 }
1215
1216 if(!bp.more())
1217 {
1218 // Last window: send atomically with EOF
1219 auto [ec, n] = co_await write_eof_buffers_(bufs);
1220 total += n;
1221 co_return {ec, total};
1222 }
1223
1224 auto [ec, n] = co_await write_(bufs);
1225 total += n;
1226 if(ec)
1227 co_return {ec, total};
1228 bp.consume(n);
1229 }
1230 }
1231
1232 // Synthesized path: prepare + buffer_copy + commit + commit_eof
1233 buffer_param<CB> bp(buffers);
1234 std::size_t total = 0;
1235
1236 for(;;)
1237 {
1238 auto src = bp.data();
1239 if(src.empty())
1240 break;
1241
1242 mutable_buffer arr[detail::max_iovec_];
1243 auto dst_bufs = prepare(arr);
1244 if(dst_bufs.empty())
1245 {
1246 auto [ec] = co_await commit(0);
1247 if(ec)
1248 co_return {ec, total};
1249 continue;
1250 }
1251
1252 auto n = buffer_copy(dst_bufs, src);
1253 auto [ec] = co_await commit(n);
1254 if(ec)
1255 co_return {ec, total};
1256 bp.consume(n);
1257 total += n;
1258 }
1259
1260 auto [ec] = co_await commit_eof(0);
1261 if(ec)
1262 co_return {ec, total};
1263
1264 co_return {{}, total};
1265 80 }
1266
1267 //----------------------------------------------------------
1268
1269 static_assert(BufferSink<any_buffer_sink>);
1270 static_assert(WriteSink<any_buffer_sink>);
1271
1272 } // namespace capy
1273 } // namespace boost
1274
1275 #endif
1276