libs/capy/include/boost/capy/when_any.hpp

99.2% Lines (131/132) 96.4% Functions (432/448) 100.0% Branches (27/27)
libs/capy/include/boost/capy/when_any.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 #define BOOST_CAPY_WHEN_ANY_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/concept/executor.hpp>
15 #include <boost/capy/concept/io_awaitable.hpp>
16 #include <coroutine>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/frame_allocator.hpp>
19 #include <boost/capy/ex/io_env.hpp>
20 #include <boost/capy/task.hpp>
21
22 #include <array>
23 #include <atomic>
24 #include <exception>
25 #include <optional>
26 #include <ranges>
27 #include <stdexcept>
28 #include <stop_token>
29 #include <tuple>
30 #include <type_traits>
31 #include <utility>
32 #include <variant>
33 #include <vector>
34
35 /*
36 when_any - Race multiple tasks, return first completion
37 ========================================================
38
39 OVERVIEW:
40 ---------
41 when_any launches N tasks concurrently and completes when the FIRST task
42 finishes (success or failure). It then requests stop for all siblings and
43 waits for them to acknowledge before returning.
44
45 ARCHITECTURE:
46 -------------
47 The design mirrors when_all but with inverted completion semantics:
48
49 when_all: complete when remaining_count reaches 0 (all done)
50 when_any: complete when has_winner becomes true (first done)
51 BUT still wait for remaining_count to reach 0 for cleanup
52
53 Key components:
54 - when_any_state: Shared state tracking winner and completion
55 - when_any_runner: Wrapper coroutine for each child task
56 - when_any_launcher: Awaitable that starts all runners concurrently
57
58 CRITICAL INVARIANTS:
59 --------------------
60 1. Exactly one task becomes the winner (via atomic compare_exchange)
61 2. All tasks must complete before parent resumes (cleanup safety)
62 3. Stop is requested immediately when winner is determined
63 4. Only the winner's result/exception is stored
64
65 TYPE DEDUPLICATION:
66 -------------------
67 std::variant requires unique alternative types. Since when_any can race
68 tasks with identical return types (e.g., three task<int>), we must
69 deduplicate types before constructing the variant.
70
71 Example: when_any(task<int>, task<string>, task<int>)
72 - Raw types after void->monostate: int, string, int
73 - Deduplicated variant: std::variant<int, string>
74 - Return: pair<size_t, variant<int, string>>
75
76 The winner_index tells you which task won (0, 1, or 2), while the variant
77 holds the result. Use the index to determine how to interpret the variant.
78
79 VOID HANDLING:
80 --------------
81 void tasks contribute std::monostate to the variant (then deduplicated).
82 All-void tasks result in: pair<size_t, variant<monostate>>
83
84 MEMORY MODEL:
85 -------------
86 Synchronization chain from winner's write to parent's read:
87
88 1. Winner thread writes result_/winner_exception_ (non-atomic)
89 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
90 3. Last task thread (may be winner or non-winner) calls signal_completion()
91 → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
92 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
93 5. Parent coroutine resumes and reads result_/winner_exception_
94
95 Synchronization analysis:
96 - All fetch_sub operations on remaining_count_ form a release sequence
97 - Winner's fetch_sub releases; subsequent fetch_sub operations participate
98 in the modification order of remaining_count_
99 - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
100 modification order, establishing happens-before from winner's writes
101 - Executor dispatch() is expected to provide queue-based synchronization
102 (release-on-post, acquire-on-execute) completing the chain to parent
103 - Even inline executors work (same thread = sequenced-before)
104
105 Alternative considered: Adding winner_ready_ atomic (set with release after
106 storing winner data, acquired before reading) would make synchronization
107 self-contained and not rely on executor implementation details. Current
108 approach is correct but requires careful reasoning about release sequences
109 and executor behavior.
110
111 EXCEPTION SEMANTICS:
112 --------------------
113 Unlike when_all (which captures first exception, discards others), when_any
114 treats exceptions as valid completions. If the winning task threw, that
115 exception is rethrown. Exceptions from non-winners are silently discarded.
116 */
117
118 namespace boost {
119 namespace capy {
120
121 namespace detail {
122
123 /** Convert void to monostate for variant storage.
124
125 std::variant<void, ...> is ill-formed, so void tasks contribute
126 std::monostate to the result variant instead. Non-void types
127 pass through unchanged.
128
129 @tparam T The type to potentially convert (void becomes monostate).
130 */
131 template<typename T>
132 using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
133
134 // Type deduplication: std::variant requires unique alternative types.
135 // Fold left over the type list, appending each type only if not already present.
136 template<typename Variant, typename T>
137 struct variant_append_if_unique;
138
139 template<typename... Vs, typename T>
140 struct variant_append_if_unique<std::variant<Vs...>, T>
141 {
142 using type = std::conditional_t<
143 (std::is_same_v<T, Vs> || ...),
144 std::variant<Vs...>,
145 std::variant<Vs..., T>>;
146 };
147
148 template<typename Accumulated, typename... Remaining>
149 struct deduplicate_impl;
150
151 template<typename Accumulated>
152 struct deduplicate_impl<Accumulated>
153 {
154 using type = Accumulated;
155 };
156
157 template<typename Accumulated, typename T, typename... Rest>
158 struct deduplicate_impl<Accumulated, T, Rest...>
159 {
160 using next = typename variant_append_if_unique<Accumulated, T>::type;
161 using type = typename deduplicate_impl<next, Rest...>::type;
162 };
163
164 // Deduplicated variant; void types become monostate before deduplication
165 template<typename T0, typename... Ts>
166 using unique_variant_t = typename deduplicate_impl<
167 std::variant<void_to_monostate_t<T0>>,
168 void_to_monostate_t<Ts>...>::type;
169
170 // Result: (winner_index, deduplicated_variant). Use index to disambiguate
171 // when multiple tasks share the same return type.
172 template<typename T0, typename... Ts>
173 using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
174
175 /** Core shared state for when_any operations.
176
177 Contains all members and methods common to both heterogeneous (variadic)
178 and homogeneous (range) when_any implementations. State classes embed
179 this via composition to avoid CRTP destructor ordering issues.
180
181 @par Thread Safety
182 Atomic operations protect winner selection and completion count.
183 */
184 struct when_any_core
185 {
186 std::atomic<std::size_t> remaining_count_;
187 std::size_t winner_index_{0};
188 std::exception_ptr winner_exception_;
189 std::stop_source stop_source_;
190
191 // Bridges parent's stop token to our stop_source
192 struct stop_callback_fn
193 {
194 std::stop_source* source_;
195 9 void operator()() const noexcept { source_->request_stop(); }
196 };
197 using stop_callback_t = std::stop_callback<stop_callback_fn>;
198 std::optional<stop_callback_t> parent_stop_callback_;
199
200 std::coroutine_handle<> continuation_;
201 io_env const* caller_env_ = nullptr;
202
203 // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
204 std::atomic<bool> has_winner_{false};
205
206 65 explicit when_any_core(std::size_t count) noexcept
207 65 : remaining_count_(count)
208 {
209 65 }
210
211 /** Atomically claim winner status; exactly one task succeeds. */
212 190 bool try_win(std::size_t index) noexcept
213 {
214 190 bool expected = false;
215
2/2
✓ Branch 1 taken 65 times.
✓ Branch 2 taken 125 times.
190 if(has_winner_.compare_exchange_strong(
216 expected, true, std::memory_order_acq_rel))
217 {
218 65 winner_index_ = index;
219 65 stop_source_.request_stop();
220 65 return true;
221 }
222 125 return false;
223 }
224
225 /** @pre try_win() returned true. */
226 8 void set_winner_exception(std::exception_ptr ep) noexcept
227 {
228 8 winner_exception_ = ep;
229 8 }
230
231 // Runners signal completion directly via final_suspend; no member function needed.
232 };
233
234 /** Shared state for heterogeneous when_any operation.
235
236 Coordinates winner selection, result storage, and completion tracking
237 for all child tasks in a when_any operation. Uses composition with
238 when_any_core for shared functionality.
239
240 @par Lifetime
241 Allocated on the parent coroutine's frame, outlives all runners.
242
243 @tparam T0 First task's result type.
244 @tparam Ts Remaining tasks' result types.
245 */
246 template<typename T0, typename... Ts>
247 struct when_any_state
248 {
249 static constexpr std::size_t task_count = 1 + sizeof...(Ts);
250 using variant_type = unique_variant_t<T0, Ts...>;
251
252 when_any_core core_;
253 std::optional<variant_type> result_;
254 std::array<std::coroutine_handle<>, task_count> runner_handles_{};
255
256 43 when_any_state()
257 43 : core_(task_count)
258 {
259 43 }
260
261 // Runners self-destruct in final_suspend. No destruction needed here.
262
263 /** @pre core_.try_win() returned true.
264 @note Uses in_place_type (not index) because variant is deduplicated.
265 */
266 template<typename T>
267 35 void set_winner_result(T value)
268 noexcept(std::is_nothrow_move_constructible_v<T>)
269 {
270 35 result_.emplace(std::in_place_type<T>, std::move(value));
271 35 }
272
273 /** @pre core_.try_win() returned true. */
274 3 void set_winner_void() noexcept
275 {
276 3 result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
277 3 }
278 };
279
280 /** Wrapper coroutine that runs a single child task for when_any.
281
282 Propagates executor/stop_token to the child, attempts to claim winner
283 status on completion, and signals completion for cleanup coordination.
284
285 @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
286 */
287 template<typename StateType>
288 struct when_any_runner
289 {
290 struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
291 {
292 StateType* state_ = nullptr;
293 std::size_t index_ = 0;
294 io_env env_;
295
296 190 when_any_runner get_return_object() noexcept
297 {
298 190 return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
299 }
300
301 // Starts suspended; launcher sets up state/ex/token then resumes
302 190 std::suspend_always initial_suspend() noexcept
303 {
304 190 return {};
305 }
306
307 190 auto final_suspend() noexcept
308 {
309 struct awaiter
310 {
311 promise_type* p_;
312 bool await_ready() const noexcept { return false; }
313 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
314 {
315 // Extract everything needed before self-destruction.
316 auto& core = p_->state_->core_;
317 auto* counter = &core.remaining_count_;
318 auto* caller_env = core.caller_env_;
319 auto cont = core.continuation_;
320
321 h.destroy();
322
323 // If last runner, dispatch parent for symmetric transfer.
324 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
325 if(remaining == 1)
326 return caller_env->executor.dispatch(cont);
327 return std::noop_coroutine();
328 }
329 void await_resume() const noexcept {}
330 };
331 190 return awaiter{this};
332 }
333
334 178 void return_void() noexcept {}
335
336 // Exceptions are valid completions in when_any (unlike when_all)
337 12 void unhandled_exception()
338 {
339
2/2
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 4 times.
12 if(state_->core_.try_win(index_))
340 8 state_->core_.set_winner_exception(std::current_exception());
341 12 }
342
343 /** Injects executor and stop token into child awaitables. */
344 template<class Awaitable>
345 struct transform_awaiter
346 {
347 std::decay_t<Awaitable> a_;
348 promise_type* p_;
349
350 190 bool await_ready() { return a_.await_ready(); }
351 190 auto await_resume() { return a_.await_resume(); }
352
353 template<class Promise>
354 185 auto await_suspend(std::coroutine_handle<Promise> h)
355 {
356 185 return a_.await_suspend(h, &p_->env_);
357 }
358 };
359
360 template<class Awaitable>
361 190 auto await_transform(Awaitable&& a)
362 {
363 using A = std::decay_t<Awaitable>;
364 if constexpr (IoAwaitable<A>)
365 {
366 return transform_awaiter<Awaitable>{
367 380 std::forward<Awaitable>(a), this};
368 }
369 else
370 {
371 static_assert(sizeof(A) == 0, "requires IoAwaitable");
372 }
373 190 }
374 };
375
376 std::coroutine_handle<promise_type> h_;
377
378 190 explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
379 190 : h_(h)
380 {
381 190 }
382
383 // Enable move for all clang versions - some versions need it
384 when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
385
386 // Non-copyable
387 when_any_runner(when_any_runner const&) = delete;
388 when_any_runner& operator=(when_any_runner const&) = delete;
389 when_any_runner& operator=(when_any_runner&&) = delete;
390
391 190 auto release() noexcept
392 {
393 190 return std::exchange(h_, nullptr);
394 }
395 };
396
397 /** Wraps a child awaitable, attempts to claim winner on completion.
398
399 Uses requires-expressions to detect state capabilities:
400 - set_winner_void(): for heterogeneous void tasks (stores monostate)
401 - set_winner_result(): for non-void tasks
402 - Neither: for homogeneous void tasks (no result storage)
403 */
404 template<IoAwaitable Awaitable, typename StateType>
405 when_any_runner<StateType>
406
1/1
✓ Branch 1 taken 190 times.
190 make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
407 {
408 using T = awaitable_result_t<Awaitable>;
409 if constexpr (std::is_void_v<T>)
410 {
411 co_await std::move(inner);
412 if(state->core_.try_win(index))
413 {
414 // Heterogeneous void tasks store monostate in the variant
415 if constexpr (requires { state->set_winner_void(); })
416 state->set_winner_void();
417 // Homogeneous void tasks have no result to store
418 }
419 }
420 else
421 {
422 auto result = co_await std::move(inner);
423 if(state->core_.try_win(index))
424 {
425 // Defensive: move should not throw (already moved once), but we
426 // catch just in case since an uncaught exception would be devastating.
427 try
428 {
429 state->set_winner_result(std::move(result));
430 }
431 catch(...)
432 {
433 state->core_.set_winner_exception(std::current_exception());
434 }
435 }
436 }
437 380 }
438
439 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
440 template<IoAwaitable... Awaitables>
441 class when_any_launcher
442 {
443 using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
444
445 std::tuple<Awaitables...>* tasks_;
446 state_type* state_;
447
448 public:
449 43 when_any_launcher(
450 std::tuple<Awaitables...>* tasks,
451 state_type* state)
452 43 : tasks_(tasks)
453 43 , state_(state)
454 {
455 43 }
456
457 43 bool await_ready() const noexcept
458 {
459 43 return sizeof...(Awaitables) == 0;
460 }
461
462 /** CRITICAL: If the last task finishes synchronously, parent resumes and
463 destroys this object before await_suspend returns. Must not reference
464 `this` after the final launch_one call.
465 */
466 43 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
467 {
468 43 state_->core_.continuation_ = continuation;
469 43 state_->core_.caller_env_ = caller_env;
470
471
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 34 times.
43 if(caller_env->stop_token.stop_possible())
472 {
473 18 state_->core_.parent_stop_callback_.emplace(
474 9 caller_env->stop_token,
475 9 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
476
477
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 6 times.
9 if(caller_env->stop_token.stop_requested())
478 3 state_->core_.stop_source_.request_stop();
479 }
480
481 43 auto token = state_->core_.stop_source_.get_token();
482 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
483 (..., launch_one<Is>(caller_env->executor, token));
484
1/1
✓ Branch 1 taken 43 times.
43 }(std::index_sequence_for<Awaitables...>{});
485
486 86 return std::noop_coroutine();
487 43 }
488
489 43 void await_resume() const noexcept
490 {
491 43 }
492
493 private:
494 /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
495 template<std::size_t I>
496 105 void launch_one(executor_ref caller_ex, std::stop_token token)
497 {
498
1/1
✓ Branch 2 taken 105 times.
105 auto runner = make_when_any_runner(
499 105 std::move(std::get<I>(*tasks_)), state_, I);
500
501 105 auto h = runner.release();
502 105 h.promise().state_ = state_;
503 105 h.promise().index_ = I;
504 105 h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->allocator};
505
506 105 std::coroutine_handle<> ch{h};
507 105 state_->runner_handles_[I] = ch;
508
1/1
✓ Branch 1 taken 105 times.
105 caller_ex.post(ch);
509 210 }
510 };
511
512 } // namespace detail
513
514 /** Wait for the first awaitable to complete.
515
516 Races multiple heterogeneous awaitables concurrently and returns when the
517 first one completes. The result includes the winner's index and a
518 deduplicated variant containing the result value.
519
520 @par Suspends
521 The calling coroutine suspends when co_await is invoked. All awaitables
522 are launched concurrently and execute in parallel. The coroutine resumes
523 only after all awaitables have completed, even though the winner is
524 determined by the first to finish.
525
526 @par Completion Conditions
527 @li Winner is determined when the first awaitable completes (success or exception)
528 @li Only one task can claim winner status via atomic compare-exchange
529 @li Once a winner exists, stop is requested for all remaining siblings
530 @li Parent coroutine resumes only after all siblings acknowledge completion
531 @li The winner's result is returned; if the winner threw, the exception is rethrown
532
533 @par Cancellation Semantics
534 Cancellation is supported via stop_token propagated through the
535 IoAwaitable protocol:
536 @li Each child awaitable receives a stop_token derived from a shared stop_source
537 @li When the parent's stop token is activated, the stop is forwarded to all children
538 @li When a winner is determined, stop_source_.request_stop() is called immediately
539 @li Siblings must handle cancellation gracefully and complete before parent resumes
540 @li Stop requests are cooperative; tasks must check and respond to them
541
542 @par Concurrency/Overlap
543 All awaitables are launched concurrently before any can complete.
544 The launcher iterates through the arguments, starting each task on the
545 caller's executor. Tasks may execute in parallel on multi-threaded
546 executors or interleave on single-threaded executors. There is no
547 guaranteed ordering of task completion.
548
549 @par Notable Error Conditions
550 @li Winner exception: if the winning task threw, that exception is rethrown
551 @li Non-winner exceptions: silently discarded (only winner's result matters)
552 @li Cancellation: tasks may complete via cancellation without throwing
553
554 @par Example
555 @code
556 task<void> example() {
557 auto [index, result] = co_await when_any(
558 fetch_from_primary(), // task<Response>
559 fetch_from_backup() // task<Response>
560 );
561 // index is 0 or 1, result holds the winner's Response
562 auto response = std::get<Response>(result);
563 }
564 @endcode
565
566 @par Example with Heterogeneous Types
567 @code
568 task<void> mixed_types() {
569 auto [index, result] = co_await when_any(
570 fetch_int(), // task<int>
571 fetch_string() // task<std::string>
572 );
573 if (index == 0)
574 std::cout << "Got int: " << std::get<int>(result) << "\n";
575 else
576 std::cout << "Got string: " << std::get<std::string>(result) << "\n";
577 }
578 @endcode
579
580 @tparam A0 First awaitable type (must satisfy IoAwaitable).
581 @tparam As Remaining awaitable types (must satisfy IoAwaitable).
582 @param a0 The first awaitable to race.
583 @param as Additional awaitables to race concurrently.
584 @return A task yielding a pair of (winner_index, result_variant).
585
586 @throws Rethrows the winner's exception if the winning task threw an exception.
587
588 @par Remarks
589 Awaitables are moved into the coroutine frame; original objects become
590 empty after the call. When multiple awaitables share the same return type,
591 the variant is deduplicated to contain only unique types. Use the winner
592 index to determine which awaitable completed first. Void awaitables
593 contribute std::monostate to the variant.
594
595 @see when_all, IoAwaitable
596 */
597 template<IoAwaitable A0, IoAwaitable... As>
598
1/1
✓ Branch 1 taken 43 times.
43 [[nodiscard]] auto when_any(A0 a0, As... as)
599 -> task<detail::when_any_result_t<
600 detail::awaitable_result_t<A0>,
601 detail::awaitable_result_t<As>...>>
602 {
603 using result_type = detail::when_any_result_t<
604 detail::awaitable_result_t<A0>,
605 detail::awaitable_result_t<As>...>;
606
607 detail::when_any_state<
608 detail::awaitable_result_t<A0>,
609 detail::awaitable_result_t<As>...> state;
610 std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
611
612 co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
613
614 if(state.core_.winner_exception_)
615 std::rethrow_exception(state.core_.winner_exception_);
616
617 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
618 86 }
619
620 /** Concept for ranges of full I/O awaitables.
621
622 A range satisfies `IoAwaitableRange` if it is a sized input range
623 whose value type satisfies @ref IoAwaitable. This enables when_any
624 to accept any container or view of awaitables, not just std::vector.
625
626 @tparam R The range type.
627
628 @par Requirements
629 @li `R` must satisfy `std::ranges::input_range`
630 @li `R` must satisfy `std::ranges::sized_range`
631 @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
632
633 @par Syntactic Requirements
634 Given `r` of type `R`:
635 @li `std::ranges::begin(r)` is valid
636 @li `std::ranges::end(r)` is valid
637 @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
638 @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
639
640 @par Example
641 @code
642 template<IoAwaitableRange R>
643 task<void> race_all(R&& awaitables) {
644 auto winner = co_await when_any(std::forward<R>(awaitables));
645 // Process winner...
646 }
647 @endcode
648
649 @see when_any, IoAwaitable
650 */
651 template<typename R>
652 concept IoAwaitableRange =
653 std::ranges::input_range<R> &&
654 std::ranges::sized_range<R> &&
655 IoAwaitable<std::ranges::range_value_t<R>>;
656
657 namespace detail {
658
659 /** Shared state for homogeneous when_any (range overload).
660
661 Uses composition with when_any_core for shared functionality.
662 Simpler than heterogeneous: optional<T> instead of variant, vector
663 instead of array for runner handles.
664 */
665 template<typename T>
666 struct when_any_homogeneous_state
667 {
668 when_any_core core_;
669 std::optional<T> result_;
670 std::vector<std::coroutine_handle<>> runner_handles_;
671
672 19 explicit when_any_homogeneous_state(std::size_t count)
673 19 : core_(count)
674
1/1
✓ Branch 2 taken 19 times.
38 , runner_handles_(count)
675 {
676 19 }
677
678 // Runners self-destruct in final_suspend. No destruction needed here.
679
680 /** @pre core_.try_win() returned true. */
681 17 void set_winner_result(T value)
682 noexcept(std::is_nothrow_move_constructible_v<T>)
683 {
684 17 result_.emplace(std::move(value));
685 17 }
686 };
687
688 /** Specialization for void tasks (no result storage needed). */
689 template<>
690 struct when_any_homogeneous_state<void>
691 {
692 when_any_core core_;
693 std::vector<std::coroutine_handle<>> runner_handles_;
694
695 3 explicit when_any_homogeneous_state(std::size_t count)
696 3 : core_(count)
697
1/1
✓ Branch 1 taken 3 times.
6 , runner_handles_(count)
698 {
699 3 }
700
701 // Runners self-destruct in final_suspend. No destruction needed here.
702
703 // No set_winner_result - void tasks have no result to store
704 };
705
706 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
707 template<IoAwaitableRange Range>
708 class when_any_homogeneous_launcher
709 {
710 using Awaitable = std::ranges::range_value_t<Range>;
711 using T = awaitable_result_t<Awaitable>;
712
713 Range* range_;
714 when_any_homogeneous_state<T>* state_;
715
716 public:
717 22 when_any_homogeneous_launcher(
718 Range* range,
719 when_any_homogeneous_state<T>* state)
720 22 : range_(range)
721 22 , state_(state)
722 {
723 22 }
724
725 22 bool await_ready() const noexcept
726 {
727 22 return std::ranges::empty(*range_);
728 }
729
730 /** CRITICAL: If the last task finishes synchronously, parent resumes and
731 destroys this object before await_suspend returns. Must not reference
732 `this` after dispatching begins.
733
734 Two-phase approach:
735 1. Create all runners (safe - no dispatch yet)
736 2. Dispatch all runners (any may complete synchronously)
737 */
738 22 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
739 {
740 22 state_->core_.continuation_ = continuation;
741 22 state_->core_.caller_env_ = caller_env;
742
743
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 15 times.
22 if(caller_env->stop_token.stop_possible())
744 {
745 14 state_->core_.parent_stop_callback_.emplace(
746 7 caller_env->stop_token,
747 7 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
748
749
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 3 times.
7 if(caller_env->stop_token.stop_requested())
750 4 state_->core_.stop_source_.request_stop();
751 }
752
753 22 auto token = state_->core_.stop_source_.get_token();
754
755 // Phase 1: Create all runners without dispatching.
756 // This iterates over *range_ safely because no runners execute yet.
757 22 std::size_t index = 0;
758
2/2
✓ Branch 4 taken 85 times.
✓ Branch 5 taken 22 times.
107 for(auto&& a : *range_)
759 {
760
1/1
✓ Branch 2 taken 85 times.
85 auto runner = make_when_any_runner(
761 85 std::move(a), state_, index);
762
763 85 auto h = runner.release();
764 85 h.promise().state_ = state_;
765 85 h.promise().index_ = index;
766 85 h.promise().env_ = io_env{caller_env->executor, token, caller_env->allocator};
767
768 85 state_->runner_handles_[index] = std::coroutine_handle<>{h};
769 85 ++index;
770 }
771
772 // Phase 2: Post all runners. Any may complete synchronously.
773 // After last post, state_ and this may be destroyed.
774 // Use raw pointer/count captured before posting.
775 22 std::coroutine_handle<>* handles = state_->runner_handles_.data();
776 22 std::size_t count = state_->runner_handles_.size();
777
2/2
✓ Branch 0 taken 85 times.
✓ Branch 1 taken 22 times.
107 for(std::size_t i = 0; i < count; ++i)
778
1/1
✓ Branch 1 taken 85 times.
85 caller_env->executor.post(handles[i]);
779
780 44 return std::noop_coroutine();
781 107 }
782
783 22 void await_resume() const noexcept
784 {
785 22 }
786 };
787
788 } // namespace detail
789
790 /** Wait for the first awaitable to complete (range overload).
791
792 Races a range of awaitables with the same result type. Accepts any
793 sized input range of IoAwaitable types, enabling use with arrays,
794 spans, or custom containers.
795
796 @par Suspends
797 The calling coroutine suspends when co_await is invoked. All awaitables
798 in the range are launched concurrently and execute in parallel. The
799 coroutine resumes only after all awaitables have completed, even though
800 the winner is determined by the first to finish.
801
802 @par Completion Conditions
803 @li Winner is determined when the first awaitable completes (success or exception)
804 @li Only one task can claim winner status via atomic compare-exchange
805 @li Once a winner exists, stop is requested for all remaining siblings
806 @li Parent coroutine resumes only after all siblings acknowledge completion
807 @li The winner's index and result are returned; if the winner threw, the exception is rethrown
808
809 @par Cancellation Semantics
810 Cancellation is supported via stop_token propagated through the
811 IoAwaitable protocol:
812 @li Each child awaitable receives a stop_token derived from a shared stop_source
813 @li When the parent's stop token is activated, the stop is forwarded to all children
814 @li When a winner is determined, stop_source_.request_stop() is called immediately
815 @li Siblings must handle cancellation gracefully and complete before parent resumes
816 @li Stop requests are cooperative; tasks must check and respond to them
817
818 @par Concurrency/Overlap
819 All awaitables are launched concurrently before any can complete.
820 The launcher iterates through the range, starting each task on the
821 caller's executor. Tasks may execute in parallel on multi-threaded
822 executors or interleave on single-threaded executors. There is no
823 guaranteed ordering of task completion.
824
825 @par Notable Error Conditions
826 @li Empty range: throws std::invalid_argument immediately (not via co_return)
827 @li Winner exception: if the winning task threw, that exception is rethrown
828 @li Non-winner exceptions: silently discarded (only winner's result matters)
829 @li Cancellation: tasks may complete via cancellation without throwing
830
831 @par Example
832 @code
833 task<void> example() {
834 std::array<task<Response>, 3> requests = {
835 fetch_from_server(0),
836 fetch_from_server(1),
837 fetch_from_server(2)
838 };
839
840 auto [index, response] = co_await when_any(std::move(requests));
841 }
842 @endcode
843
844 @par Example with Vector
845 @code
846 task<Response> fetch_fastest(std::vector<Server> const& servers) {
847 std::vector<task<Response>> requests;
848 for (auto const& server : servers)
849 requests.push_back(fetch_from(server));
850
851 auto [index, response] = co_await when_any(std::move(requests));
852 co_return response;
853 }
854 @endcode
855
856 @tparam R Range type satisfying IoAwaitableRange.
857 @param awaitables Range of awaitables to race concurrently (must not be empty).
858 @return A task yielding a pair of (winner_index, result).
859
860 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
861 @throws Rethrows the winner's exception if the winning task threw an exception.
862
863 @par Remarks
864 Elements are moved from the range; for lvalue ranges, the original
865 container will have moved-from elements after this call. The range
866 is moved onto the coroutine frame to ensure lifetime safety. Unlike
867 the variadic overload, no variant wrapper is needed since all tasks
868 share the same return type.
869
870 @see when_any, IoAwaitableRange
871 */
872 template<IoAwaitableRange R>
873 requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
874
1/1
✓ Branch 1 taken 21 times.
21 [[nodiscard]] auto when_any(R&& awaitables)
875 -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
876 {
877 using Awaitable = std::ranges::range_value_t<R>;
878 using T = detail::awaitable_result_t<Awaitable>;
879 using result_type = std::pair<std::size_t, T>;
880 using OwnedRange = std::remove_cvref_t<R>;
881
882 auto count = std::ranges::size(awaitables);
883 if(count == 0)
884 throw std::invalid_argument("when_any requires at least one awaitable");
885
886 // Move/copy range onto coroutine frame to ensure lifetime
887 OwnedRange owned_awaitables = std::forward<R>(awaitables);
888
889 detail::when_any_homogeneous_state<T> state(count);
890
891 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
892
893 if(state.core_.winner_exception_)
894 std::rethrow_exception(state.core_.winner_exception_);
895
896 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
897 42 }
898
899 /** Wait for the first awaitable to complete (void range overload).
900
901 Races a range of void-returning awaitables. Since void awaitables have
902 no result value, only the winner's index is returned.
903
904 @par Suspends
905 The calling coroutine suspends when co_await is invoked. All awaitables
906 in the range are launched concurrently and execute in parallel. The
907 coroutine resumes only after all awaitables have completed, even though
908 the winner is determined by the first to finish.
909
910 @par Completion Conditions
911 @li Winner is determined when the first awaitable completes (success or exception)
912 @li Only one task can claim winner status via atomic compare-exchange
913 @li Once a winner exists, stop is requested for all remaining siblings
914 @li Parent coroutine resumes only after all siblings acknowledge completion
915 @li The winner's index is returned; if the winner threw, the exception is rethrown
916
917 @par Cancellation Semantics
918 Cancellation is supported via stop_token propagated through the
919 IoAwaitable protocol:
920 @li Each child awaitable receives a stop_token derived from a shared stop_source
921 @li When the parent's stop token is activated, the stop is forwarded to all children
922 @li When a winner is determined, stop_source_.request_stop() is called immediately
923 @li Siblings must handle cancellation gracefully and complete before parent resumes
924 @li Stop requests are cooperative; tasks must check and respond to them
925
926 @par Concurrency/Overlap
927 All awaitables are launched concurrently before any can complete.
928 The launcher iterates through the range, starting each task on the
929 caller's executor. Tasks may execute in parallel on multi-threaded
930 executors or interleave on single-threaded executors. There is no
931 guaranteed ordering of task completion.
932
933 @par Notable Error Conditions
934 @li Empty range: throws std::invalid_argument immediately (not via co_return)
935 @li Winner exception: if the winning task threw, that exception is rethrown
936 @li Non-winner exceptions: silently discarded (only winner's result matters)
937 @li Cancellation: tasks may complete via cancellation without throwing
938
939 @par Example
940 @code
941 task<void> example() {
942 std::vector<task<void>> tasks;
943 for (int i = 0; i < 5; ++i)
944 tasks.push_back(background_work(i));
945
946 std::size_t winner = co_await when_any(std::move(tasks));
947 // winner is the index of the first task to complete
948 }
949 @endcode
950
951 @par Example with Timeout
952 @code
953 task<void> with_timeout() {
954 std::vector<task<void>> tasks;
955 tasks.push_back(long_running_operation());
956 tasks.push_back(delay(std::chrono::seconds(5)));
957
958 std::size_t winner = co_await when_any(std::move(tasks));
959 if (winner == 1) {
960 // Timeout occurred
961 }
962 }
963 @endcode
964
965 @tparam R Range type satisfying IoAwaitableRange with void result.
966 @param awaitables Range of void awaitables to race concurrently (must not be empty).
967 @return A task yielding the winner's index (zero-based).
968
969 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
970 @throws Rethrows the winner's exception if the winning task threw an exception.
971
972 @par Remarks
973 Elements are moved from the range; for lvalue ranges, the original
974 container will have moved-from elements after this call. The range
975 is moved onto the coroutine frame to ensure lifetime safety. Unlike
976 the non-void overload, no result storage is needed since void tasks
977 produce no value.
978
979 @see when_any, IoAwaitableRange
980 */
981 template<IoAwaitableRange R>
982 requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
983
1/1
✓ Branch 1 taken 3 times.
3 [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
984 {
985 using OwnedRange = std::remove_cvref_t<R>;
986
987 auto count = std::ranges::size(awaitables);
988 if(count == 0)
989 throw std::invalid_argument("when_any requires at least one awaitable");
990
991 // Move/copy range onto coroutine frame to ensure lifetime
992 OwnedRange owned_awaitables = std::forward<R>(awaitables);
993
994 detail::when_any_homogeneous_state<void> state(count);
995
996 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
997
998 if(state.core_.winner_exception_)
999 std::rethrow_exception(state.core_.winner_exception_);
1000
1001 co_return state.core_.winner_index_;
1002 6 }
1003
1004 } // namespace capy
1005 } // namespace boost
1006
1007 #endif
1008