Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 : #define BOOST_CAPY_WHEN_ANY_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/concept/executor.hpp>
15 : #include <boost/capy/concept/io_awaitable.hpp>
16 : #include <coroutine>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/ex/frame_allocator.hpp>
19 : #include <boost/capy/ex/io_env.hpp>
20 : #include <boost/capy/task.hpp>
21 :
22 : #include <array>
23 : #include <atomic>
24 : #include <exception>
25 : #include <optional>
26 : #include <ranges>
27 : #include <stdexcept>
28 : #include <stop_token>
29 : #include <tuple>
30 : #include <type_traits>
31 : #include <utility>
32 : #include <variant>
33 : #include <vector>
34 :
35 : /*
36 : when_any - Race multiple tasks, return first completion
37 : ========================================================
38 :
39 : OVERVIEW:
40 : ---------
41 : when_any launches N tasks concurrently and completes when the FIRST task
42 : finishes (success or failure). It then requests stop for all siblings and
43 : waits for them to acknowledge before returning.
44 :
45 : ARCHITECTURE:
46 : -------------
47 : The design mirrors when_all but with inverted completion semantics:
48 :
49 : when_all: complete when remaining_count reaches 0 (all done)
50 : when_any: complete when has_winner becomes true (first done)
51 : BUT still wait for remaining_count to reach 0 for cleanup
52 :
53 : Key components:
54 : - when_any_state: Shared state tracking winner and completion
55 : - when_any_runner: Wrapper coroutine for each child task
56 : - when_any_launcher: Awaitable that starts all runners concurrently
57 :
58 : CRITICAL INVARIANTS:
59 : --------------------
60 : 1. Exactly one task becomes the winner (via atomic compare_exchange)
61 : 2. All tasks must complete before parent resumes (cleanup safety)
62 : 3. Stop is requested immediately when winner is determined
63 : 4. Only the winner's result/exception is stored
64 :
65 : TYPE DEDUPLICATION:
66 : -------------------
67 : std::variant requires unique alternative types. Since when_any can race
68 : tasks with identical return types (e.g., three task<int>), we must
69 : deduplicate types before constructing the variant.
70 :
71 : Example: when_any(task<int>, task<string>, task<int>)
72 : - Raw types after void->monostate: int, string, int
73 : - Deduplicated variant: std::variant<int, string>
74 : - Return: pair<size_t, variant<int, string>>
75 :
76 : The winner_index tells you which task won (0, 1, or 2), while the variant
77 : holds the result. Use the index to determine how to interpret the variant.
78 :
79 : VOID HANDLING:
80 : --------------
81 : void tasks contribute std::monostate to the variant (then deduplicated).
82 : All-void tasks result in: pair<size_t, variant<monostate>>
83 :
84 : MEMORY MODEL:
85 : -------------
86 : Synchronization chain from winner's write to parent's read:
87 :
88 : 1. Winner thread writes result_/winner_exception_ (non-atomic)
89 : 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
90 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
91 : → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
92 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
93 : 5. Parent coroutine resumes and reads result_/winner_exception_
94 :
95 : Synchronization analysis:
96 : - All fetch_sub operations on remaining_count_ form a release sequence
97 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
98 : in the modification order of remaining_count_
99 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
100 : modification order, establishing happens-before from winner's writes
101 : - Executor dispatch() is expected to provide queue-based synchronization
102 : (release-on-post, acquire-on-execute) completing the chain to parent
103 : - Even inline executors work (same thread = sequenced-before)
104 :
105 : Alternative considered: Adding winner_ready_ atomic (set with release after
106 : storing winner data, acquired before reading) would make synchronization
107 : self-contained and not rely on executor implementation details. Current
108 : approach is correct but requires careful reasoning about release sequences
109 : and executor behavior.
110 :
111 : EXCEPTION SEMANTICS:
112 : --------------------
113 : Unlike when_all (which captures first exception, discards others), when_any
114 : treats exceptions as valid completions. If the winning task threw, that
115 : exception is rethrown. Exceptions from non-winners are silently discarded.
116 : */
117 :
118 : namespace boost {
119 : namespace capy {
120 :
121 : namespace detail {
122 :
123 : /** Convert void to monostate for variant storage.
124 :
125 : std::variant<void, ...> is ill-formed, so void tasks contribute
126 : std::monostate to the result variant instead. Non-void types
127 : pass through unchanged.
128 :
129 : @tparam T The type to potentially convert (void becomes monostate).
130 : */
131 : template<typename T>
132 : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
133 :
134 : // Type deduplication: std::variant requires unique alternative types.
135 : // Fold left over the type list, appending each type only if not already present.
136 : template<typename Variant, typename T>
137 : struct variant_append_if_unique;
138 :
139 : template<typename... Vs, typename T>
140 : struct variant_append_if_unique<std::variant<Vs...>, T>
141 : {
142 : using type = std::conditional_t<
143 : (std::is_same_v<T, Vs> || ...),
144 : std::variant<Vs...>,
145 : std::variant<Vs..., T>>;
146 : };
147 :
148 : template<typename Accumulated, typename... Remaining>
149 : struct deduplicate_impl;
150 :
151 : template<typename Accumulated>
152 : struct deduplicate_impl<Accumulated>
153 : {
154 : using type = Accumulated;
155 : };
156 :
157 : template<typename Accumulated, typename T, typename... Rest>
158 : struct deduplicate_impl<Accumulated, T, Rest...>
159 : {
160 : using next = typename variant_append_if_unique<Accumulated, T>::type;
161 : using type = typename deduplicate_impl<next, Rest...>::type;
162 : };
163 :
164 : // Deduplicated variant; void types become monostate before deduplication
165 : template<typename T0, typename... Ts>
166 : using unique_variant_t = typename deduplicate_impl<
167 : std::variant<void_to_monostate_t<T0>>,
168 : void_to_monostate_t<Ts>...>::type;
169 :
170 : // Result: (winner_index, deduplicated_variant). Use index to disambiguate
171 : // when multiple tasks share the same return type.
172 : template<typename T0, typename... Ts>
173 : using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
174 :
175 : /** Core shared state for when_any operations.
176 :
177 : Contains all members and methods common to both heterogeneous (variadic)
178 : and homogeneous (range) when_any implementations. State classes embed
179 : this via composition to avoid CRTP destructor ordering issues.
180 :
181 : @par Thread Safety
182 : Atomic operations protect winner selection and completion count.
183 : */
184 : struct when_any_core
185 : {
186 : std::atomic<std::size_t> remaining_count_;
187 : std::size_t winner_index_{0};
188 : std::exception_ptr winner_exception_;
189 : std::stop_source stop_source_;
190 :
191 : // Bridges parent's stop token to our stop_source
192 : struct stop_callback_fn
193 : {
194 : std::stop_source* source_;
195 9 : void operator()() const noexcept { source_->request_stop(); }
196 : };
197 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
198 : std::optional<stop_callback_t> parent_stop_callback_;
199 :
200 : std::coroutine_handle<> continuation_;
201 : io_env const* caller_env_ = nullptr;
202 :
203 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
204 : std::atomic<bool> has_winner_{false};
205 :
206 65 : explicit when_any_core(std::size_t count) noexcept
207 65 : : remaining_count_(count)
208 : {
209 65 : }
210 :
211 : /** Atomically claim winner status; exactly one task succeeds. */
212 190 : bool try_win(std::size_t index) noexcept
213 : {
214 190 : bool expected = false;
215 190 : if(has_winner_.compare_exchange_strong(
216 : expected, true, std::memory_order_acq_rel))
217 : {
218 65 : winner_index_ = index;
219 65 : stop_source_.request_stop();
220 65 : return true;
221 : }
222 125 : return false;
223 : }
224 :
225 : /** @pre try_win() returned true. */
226 8 : void set_winner_exception(std::exception_ptr ep) noexcept
227 : {
228 8 : winner_exception_ = ep;
229 8 : }
230 :
231 : // Runners signal completion directly via final_suspend; no member function needed.
232 : };
233 :
234 : /** Shared state for heterogeneous when_any operation.
235 :
236 : Coordinates winner selection, result storage, and completion tracking
237 : for all child tasks in a when_any operation. Uses composition with
238 : when_any_core for shared functionality.
239 :
240 : @par Lifetime
241 : Allocated on the parent coroutine's frame, outlives all runners.
242 :
243 : @tparam T0 First task's result type.
244 : @tparam Ts Remaining tasks' result types.
245 : */
246 : template<typename T0, typename... Ts>
247 : struct when_any_state
248 : {
249 : static constexpr std::size_t task_count = 1 + sizeof...(Ts);
250 : using variant_type = unique_variant_t<T0, Ts...>;
251 :
252 : when_any_core core_;
253 : std::optional<variant_type> result_;
254 : std::array<std::coroutine_handle<>, task_count> runner_handles_{};
255 :
256 43 : when_any_state()
257 43 : : core_(task_count)
258 : {
259 43 : }
260 :
261 : // Runners self-destruct in final_suspend. No destruction needed here.
262 :
263 : /** @pre core_.try_win() returned true.
264 : @note Uses in_place_type (not index) because variant is deduplicated.
265 : */
266 : template<typename T>
267 35 : void set_winner_result(T value)
268 : noexcept(std::is_nothrow_move_constructible_v<T>)
269 : {
270 35 : result_.emplace(std::in_place_type<T>, std::move(value));
271 35 : }
272 :
273 : /** @pre core_.try_win() returned true. */
274 3 : void set_winner_void() noexcept
275 : {
276 3 : result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
277 3 : }
278 : };
279 :
280 : /** Wrapper coroutine that runs a single child task for when_any.
281 :
282 : Propagates executor/stop_token to the child, attempts to claim winner
283 : status on completion, and signals completion for cleanup coordination.
284 :
285 : @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
286 : */
287 : template<typename StateType>
288 : struct when_any_runner
289 : {
290 : struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
291 : {
292 : StateType* state_ = nullptr;
293 : std::size_t index_ = 0;
294 : io_env env_;
295 :
296 190 : when_any_runner get_return_object() noexcept
297 : {
298 190 : return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
299 : }
300 :
301 : // Starts suspended; launcher sets up state/ex/token then resumes
302 190 : std::suspend_always initial_suspend() noexcept
303 : {
304 190 : return {};
305 : }
306 :
307 190 : auto final_suspend() noexcept
308 : {
309 : struct awaiter
310 : {
311 : promise_type* p_;
312 190 : bool await_ready() const noexcept { return false; }
313 190 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
314 : {
315 : // Extract everything needed before self-destruction.
316 190 : auto& core = p_->state_->core_;
317 190 : auto* counter = &core.remaining_count_;
318 190 : auto* caller_env = core.caller_env_;
319 190 : auto cont = core.continuation_;
320 :
321 190 : h.destroy();
322 :
323 : // If last runner, dispatch parent for symmetric transfer.
324 190 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
325 190 : if(remaining == 1)
326 65 : return caller_env->executor.dispatch(cont);
327 125 : return std::noop_coroutine();
328 : }
329 0 : void await_resume() const noexcept {}
330 : };
331 190 : return awaiter{this};
332 : }
333 :
334 178 : void return_void() noexcept {}
335 :
336 : // Exceptions are valid completions in when_any (unlike when_all)
337 12 : void unhandled_exception()
338 : {
339 12 : if(state_->core_.try_win(index_))
340 8 : state_->core_.set_winner_exception(std::current_exception());
341 12 : }
342 :
343 : /** Injects executor and stop token into child awaitables. */
344 : template<class Awaitable>
345 : struct transform_awaiter
346 : {
347 : std::decay_t<Awaitable> a_;
348 : promise_type* p_;
349 :
350 190 : bool await_ready() { return a_.await_ready(); }
351 190 : auto await_resume() { return a_.await_resume(); }
352 :
353 : template<class Promise>
354 185 : auto await_suspend(std::coroutine_handle<Promise> h)
355 : {
356 185 : return a_.await_suspend(h, &p_->env_);
357 : }
358 : };
359 :
360 : template<class Awaitable>
361 190 : auto await_transform(Awaitable&& a)
362 : {
363 : using A = std::decay_t<Awaitable>;
364 : if constexpr (IoAwaitable<A>)
365 : {
366 : return transform_awaiter<Awaitable>{
367 380 : std::forward<Awaitable>(a), this};
368 : }
369 : else
370 : {
371 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
372 : }
373 190 : }
374 : };
375 :
376 : std::coroutine_handle<promise_type> h_;
377 :
378 190 : explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
379 190 : : h_(h)
380 : {
381 190 : }
382 :
383 : // Enable move for all clang versions - some versions need it
384 : when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
385 :
386 : // Non-copyable
387 : when_any_runner(when_any_runner const&) = delete;
388 : when_any_runner& operator=(when_any_runner const&) = delete;
389 : when_any_runner& operator=(when_any_runner&&) = delete;
390 :
391 190 : auto release() noexcept
392 : {
393 190 : return std::exchange(h_, nullptr);
394 : }
395 : };
396 :
397 : /** Wraps a child awaitable, attempts to claim winner on completion.
398 :
399 : Uses requires-expressions to detect state capabilities:
400 : - set_winner_void(): for heterogeneous void tasks (stores monostate)
401 : - set_winner_result(): for non-void tasks
402 : - Neither: for homogeneous void tasks (no result storage)
403 : */
404 : template<IoAwaitable Awaitable, typename StateType>
405 : when_any_runner<StateType>
406 190 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
407 : {
408 : using T = awaitable_result_t<Awaitable>;
409 : if constexpr (std::is_void_v<T>)
410 : {
411 : co_await std::move(inner);
412 : if(state->core_.try_win(index))
413 : {
414 : // Heterogeneous void tasks store monostate in the variant
415 : if constexpr (requires { state->set_winner_void(); })
416 : state->set_winner_void();
417 : // Homogeneous void tasks have no result to store
418 : }
419 : }
420 : else
421 : {
422 : auto result = co_await std::move(inner);
423 : if(state->core_.try_win(index))
424 : {
425 : // Defensive: move should not throw (already moved once), but we
426 : // catch just in case since an uncaught exception would be devastating.
427 : try
428 : {
429 : state->set_winner_result(std::move(result));
430 : }
431 : catch(...)
432 : {
433 : state->core_.set_winner_exception(std::current_exception());
434 : }
435 : }
436 : }
437 380 : }
438 :
439 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
440 : template<IoAwaitable... Awaitables>
441 : class when_any_launcher
442 : {
443 : using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
444 :
445 : std::tuple<Awaitables...>* tasks_;
446 : state_type* state_;
447 :
448 : public:
449 43 : when_any_launcher(
450 : std::tuple<Awaitables...>* tasks,
451 : state_type* state)
452 43 : : tasks_(tasks)
453 43 : , state_(state)
454 : {
455 43 : }
456 :
457 43 : bool await_ready() const noexcept
458 : {
459 43 : return sizeof...(Awaitables) == 0;
460 : }
461 :
462 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
463 : destroys this object before await_suspend returns. Must not reference
464 : `this` after the final launch_one call.
465 : */
466 43 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
467 : {
468 43 : state_->core_.continuation_ = continuation;
469 43 : state_->core_.caller_env_ = caller_env;
470 :
471 43 : if(caller_env->stop_token.stop_possible())
472 : {
473 18 : state_->core_.parent_stop_callback_.emplace(
474 9 : caller_env->stop_token,
475 9 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
476 :
477 9 : if(caller_env->stop_token.stop_requested())
478 3 : state_->core_.stop_source_.request_stop();
479 : }
480 :
481 43 : auto token = state_->core_.stop_source_.get_token();
482 86 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
483 43 : (..., launch_one<Is>(caller_env->executor, token));
484 43 : }(std::index_sequence_for<Awaitables...>{});
485 :
486 86 : return std::noop_coroutine();
487 43 : }
488 :
489 43 : void await_resume() const noexcept
490 : {
491 43 : }
492 :
493 : private:
494 : /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
495 : template<std::size_t I>
496 105 : void launch_one(executor_ref caller_ex, std::stop_token token)
497 : {
498 105 : auto runner = make_when_any_runner(
499 105 : std::move(std::get<I>(*tasks_)), state_, I);
500 :
501 105 : auto h = runner.release();
502 105 : h.promise().state_ = state_;
503 105 : h.promise().index_ = I;
504 105 : h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->allocator};
505 :
506 105 : std::coroutine_handle<> ch{h};
507 105 : state_->runner_handles_[I] = ch;
508 105 : caller_ex.post(ch);
509 210 : }
510 : };
511 :
512 : } // namespace detail
513 :
514 : /** Wait for the first awaitable to complete.
515 :
516 : Races multiple heterogeneous awaitables concurrently and returns when the
517 : first one completes. The result includes the winner's index and a
518 : deduplicated variant containing the result value.
519 :
520 : @par Suspends
521 : The calling coroutine suspends when co_await is invoked. All awaitables
522 : are launched concurrently and execute in parallel. The coroutine resumes
523 : only after all awaitables have completed, even though the winner is
524 : determined by the first to finish.
525 :
526 : @par Completion Conditions
527 : @li Winner is determined when the first awaitable completes (success or exception)
528 : @li Only one task can claim winner status via atomic compare-exchange
529 : @li Once a winner exists, stop is requested for all remaining siblings
530 : @li Parent coroutine resumes only after all siblings acknowledge completion
531 : @li The winner's result is returned; if the winner threw, the exception is rethrown
532 :
533 : @par Cancellation Semantics
534 : Cancellation is supported via stop_token propagated through the
535 : IoAwaitable protocol:
536 : @li Each child awaitable receives a stop_token derived from a shared stop_source
537 : @li When the parent's stop token is activated, the stop is forwarded to all children
538 : @li When a winner is determined, stop_source_.request_stop() is called immediately
539 : @li Siblings must handle cancellation gracefully and complete before parent resumes
540 : @li Stop requests are cooperative; tasks must check and respond to them
541 :
542 : @par Concurrency/Overlap
543 : All awaitables are launched concurrently before any can complete.
544 : The launcher iterates through the arguments, starting each task on the
545 : caller's executor. Tasks may execute in parallel on multi-threaded
546 : executors or interleave on single-threaded executors. There is no
547 : guaranteed ordering of task completion.
548 :
549 : @par Notable Error Conditions
550 : @li Winner exception: if the winning task threw, that exception is rethrown
551 : @li Non-winner exceptions: silently discarded (only winner's result matters)
552 : @li Cancellation: tasks may complete via cancellation without throwing
553 :
554 : @par Example
555 : @code
556 : task<void> example() {
557 : auto [index, result] = co_await when_any(
558 : fetch_from_primary(), // task<Response>
559 : fetch_from_backup() // task<Response>
560 : );
561 : // index is 0 or 1, result holds the winner's Response
562 : auto response = std::get<Response>(result);
563 : }
564 : @endcode
565 :
566 : @par Example with Heterogeneous Types
567 : @code
568 : task<void> mixed_types() {
569 : auto [index, result] = co_await when_any(
570 : fetch_int(), // task<int>
571 : fetch_string() // task<std::string>
572 : );
573 : if (index == 0)
574 : std::cout << "Got int: " << std::get<int>(result) << "\n";
575 : else
576 : std::cout << "Got string: " << std::get<std::string>(result) << "\n";
577 : }
578 : @endcode
579 :
580 : @tparam A0 First awaitable type (must satisfy IoAwaitable).
581 : @tparam As Remaining awaitable types (must satisfy IoAwaitable).
582 : @param a0 The first awaitable to race.
583 : @param as Additional awaitables to race concurrently.
584 : @return A task yielding a pair of (winner_index, result_variant).
585 :
586 : @throws Rethrows the winner's exception if the winning task threw an exception.
587 :
588 : @par Remarks
589 : Awaitables are moved into the coroutine frame; original objects become
590 : empty after the call. When multiple awaitables share the same return type,
591 : the variant is deduplicated to contain only unique types. Use the winner
592 : index to determine which awaitable completed first. Void awaitables
593 : contribute std::monostate to the variant.
594 :
595 : @see when_all, IoAwaitable
596 : */
597 : template<IoAwaitable A0, IoAwaitable... As>
598 43 : [[nodiscard]] auto when_any(A0 a0, As... as)
599 : -> task<detail::when_any_result_t<
600 : detail::awaitable_result_t<A0>,
601 : detail::awaitable_result_t<As>...>>
602 : {
603 : using result_type = detail::when_any_result_t<
604 : detail::awaitable_result_t<A0>,
605 : detail::awaitable_result_t<As>...>;
606 :
607 : detail::when_any_state<
608 : detail::awaitable_result_t<A0>,
609 : detail::awaitable_result_t<As>...> state;
610 : std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
611 :
612 : co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
613 :
614 : if(state.core_.winner_exception_)
615 : std::rethrow_exception(state.core_.winner_exception_);
616 :
617 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
618 86 : }
619 :
620 : /** Concept for ranges of full I/O awaitables.
621 :
622 : A range satisfies `IoAwaitableRange` if it is a sized input range
623 : whose value type satisfies @ref IoAwaitable. This enables when_any
624 : to accept any container or view of awaitables, not just std::vector.
625 :
626 : @tparam R The range type.
627 :
628 : @par Requirements
629 : @li `R` must satisfy `std::ranges::input_range`
630 : @li `R` must satisfy `std::ranges::sized_range`
631 : @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
632 :
633 : @par Syntactic Requirements
634 : Given `r` of type `R`:
635 : @li `std::ranges::begin(r)` is valid
636 : @li `std::ranges::end(r)` is valid
637 : @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
638 : @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
639 :
640 : @par Example
641 : @code
642 : template<IoAwaitableRange R>
643 : task<void> race_all(R&& awaitables) {
644 : auto winner = co_await when_any(std::forward<R>(awaitables));
645 : // Process winner...
646 : }
647 : @endcode
648 :
649 : @see when_any, IoAwaitable
650 : */
651 : template<typename R>
652 : concept IoAwaitableRange =
653 : std::ranges::input_range<R> &&
654 : std::ranges::sized_range<R> &&
655 : IoAwaitable<std::ranges::range_value_t<R>>;
656 :
657 : namespace detail {
658 :
659 : /** Shared state for homogeneous when_any (range overload).
660 :
661 : Uses composition with when_any_core for shared functionality.
662 : Simpler than heterogeneous: optional<T> instead of variant, vector
663 : instead of array for runner handles.
664 : */
665 : template<typename T>
666 : struct when_any_homogeneous_state
667 : {
668 : when_any_core core_;
669 : std::optional<T> result_;
670 : std::vector<std::coroutine_handle<>> runner_handles_;
671 :
672 19 : explicit when_any_homogeneous_state(std::size_t count)
673 19 : : core_(count)
674 38 : , runner_handles_(count)
675 : {
676 19 : }
677 :
678 : // Runners self-destruct in final_suspend. No destruction needed here.
679 :
680 : /** @pre core_.try_win() returned true. */
681 17 : void set_winner_result(T value)
682 : noexcept(std::is_nothrow_move_constructible_v<T>)
683 : {
684 17 : result_.emplace(std::move(value));
685 17 : }
686 : };
687 :
688 : /** Specialization for void tasks (no result storage needed). */
689 : template<>
690 : struct when_any_homogeneous_state<void>
691 : {
692 : when_any_core core_;
693 : std::vector<std::coroutine_handle<>> runner_handles_;
694 :
695 3 : explicit when_any_homogeneous_state(std::size_t count)
696 3 : : core_(count)
697 6 : , runner_handles_(count)
698 : {
699 3 : }
700 :
701 : // Runners self-destruct in final_suspend. No destruction needed here.
702 :
703 : // No set_winner_result - void tasks have no result to store
704 : };
705 :
706 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
707 : template<IoAwaitableRange Range>
708 : class when_any_homogeneous_launcher
709 : {
710 : using Awaitable = std::ranges::range_value_t<Range>;
711 : using T = awaitable_result_t<Awaitable>;
712 :
713 : Range* range_;
714 : when_any_homogeneous_state<T>* state_;
715 :
716 : public:
717 22 : when_any_homogeneous_launcher(
718 : Range* range,
719 : when_any_homogeneous_state<T>* state)
720 22 : : range_(range)
721 22 : , state_(state)
722 : {
723 22 : }
724 :
725 22 : bool await_ready() const noexcept
726 : {
727 22 : return std::ranges::empty(*range_);
728 : }
729 :
730 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
731 : destroys this object before await_suspend returns. Must not reference
732 : `this` after dispatching begins.
733 :
734 : Two-phase approach:
735 : 1. Create all runners (safe - no dispatch yet)
736 : 2. Dispatch all runners (any may complete synchronously)
737 : */
738 22 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
739 : {
740 22 : state_->core_.continuation_ = continuation;
741 22 : state_->core_.caller_env_ = caller_env;
742 :
743 22 : if(caller_env->stop_token.stop_possible())
744 : {
745 14 : state_->core_.parent_stop_callback_.emplace(
746 7 : caller_env->stop_token,
747 7 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
748 :
749 7 : if(caller_env->stop_token.stop_requested())
750 4 : state_->core_.stop_source_.request_stop();
751 : }
752 :
753 22 : auto token = state_->core_.stop_source_.get_token();
754 :
755 : // Phase 1: Create all runners without dispatching.
756 : // This iterates over *range_ safely because no runners execute yet.
757 22 : std::size_t index = 0;
758 107 : for(auto&& a : *range_)
759 : {
760 85 : auto runner = make_when_any_runner(
761 85 : std::move(a), state_, index);
762 :
763 85 : auto h = runner.release();
764 85 : h.promise().state_ = state_;
765 85 : h.promise().index_ = index;
766 85 : h.promise().env_ = io_env{caller_env->executor, token, caller_env->allocator};
767 :
768 85 : state_->runner_handles_[index] = std::coroutine_handle<>{h};
769 85 : ++index;
770 : }
771 :
772 : // Phase 2: Post all runners. Any may complete synchronously.
773 : // After last post, state_ and this may be destroyed.
774 : // Use raw pointer/count captured before posting.
775 22 : std::coroutine_handle<>* handles = state_->runner_handles_.data();
776 22 : std::size_t count = state_->runner_handles_.size();
777 107 : for(std::size_t i = 0; i < count; ++i)
778 85 : caller_env->executor.post(handles[i]);
779 :
780 44 : return std::noop_coroutine();
781 107 : }
782 :
783 22 : void await_resume() const noexcept
784 : {
785 22 : }
786 : };
787 :
788 : } // namespace detail
789 :
790 : /** Wait for the first awaitable to complete (range overload).
791 :
792 : Races a range of awaitables with the same result type. Accepts any
793 : sized input range of IoAwaitable types, enabling use with arrays,
794 : spans, or custom containers.
795 :
796 : @par Suspends
797 : The calling coroutine suspends when co_await is invoked. All awaitables
798 : in the range are launched concurrently and execute in parallel. The
799 : coroutine resumes only after all awaitables have completed, even though
800 : the winner is determined by the first to finish.
801 :
802 : @par Completion Conditions
803 : @li Winner is determined when the first awaitable completes (success or exception)
804 : @li Only one task can claim winner status via atomic compare-exchange
805 : @li Once a winner exists, stop is requested for all remaining siblings
806 : @li Parent coroutine resumes only after all siblings acknowledge completion
807 : @li The winner's index and result are returned; if the winner threw, the exception is rethrown
808 :
809 : @par Cancellation Semantics
810 : Cancellation is supported via stop_token propagated through the
811 : IoAwaitable protocol:
812 : @li Each child awaitable receives a stop_token derived from a shared stop_source
813 : @li When the parent's stop token is activated, the stop is forwarded to all children
814 : @li When a winner is determined, stop_source_.request_stop() is called immediately
815 : @li Siblings must handle cancellation gracefully and complete before parent resumes
816 : @li Stop requests are cooperative; tasks must check and respond to them
817 :
818 : @par Concurrency/Overlap
819 : All awaitables are launched concurrently before any can complete.
820 : The launcher iterates through the range, starting each task on the
821 : caller's executor. Tasks may execute in parallel on multi-threaded
822 : executors or interleave on single-threaded executors. There is no
823 : guaranteed ordering of task completion.
824 :
825 : @par Notable Error Conditions
826 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
827 : @li Winner exception: if the winning task threw, that exception is rethrown
828 : @li Non-winner exceptions: silently discarded (only winner's result matters)
829 : @li Cancellation: tasks may complete via cancellation without throwing
830 :
831 : @par Example
832 : @code
833 : task<void> example() {
834 : std::array<task<Response>, 3> requests = {
835 : fetch_from_server(0),
836 : fetch_from_server(1),
837 : fetch_from_server(2)
838 : };
839 :
840 : auto [index, response] = co_await when_any(std::move(requests));
841 : }
842 : @endcode
843 :
844 : @par Example with Vector
845 : @code
846 : task<Response> fetch_fastest(std::vector<Server> const& servers) {
847 : std::vector<task<Response>> requests;
848 : for (auto const& server : servers)
849 : requests.push_back(fetch_from(server));
850 :
851 : auto [index, response] = co_await when_any(std::move(requests));
852 : co_return response;
853 : }
854 : @endcode
855 :
856 : @tparam R Range type satisfying IoAwaitableRange.
857 : @param awaitables Range of awaitables to race concurrently (must not be empty).
858 : @return A task yielding a pair of (winner_index, result).
859 :
860 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
861 : @throws Rethrows the winner's exception if the winning task threw an exception.
862 :
863 : @par Remarks
864 : Elements are moved from the range; for lvalue ranges, the original
865 : container will have moved-from elements after this call. The range
866 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
867 : the variadic overload, no variant wrapper is needed since all tasks
868 : share the same return type.
869 :
870 : @see when_any, IoAwaitableRange
871 : */
872 : template<IoAwaitableRange R>
873 : requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
874 21 : [[nodiscard]] auto when_any(R&& awaitables)
875 : -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
876 : {
877 : using Awaitable = std::ranges::range_value_t<R>;
878 : using T = detail::awaitable_result_t<Awaitable>;
879 : using result_type = std::pair<std::size_t, T>;
880 : using OwnedRange = std::remove_cvref_t<R>;
881 :
882 : auto count = std::ranges::size(awaitables);
883 : if(count == 0)
884 : throw std::invalid_argument("when_any requires at least one awaitable");
885 :
886 : // Move/copy range onto coroutine frame to ensure lifetime
887 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
888 :
889 : detail::when_any_homogeneous_state<T> state(count);
890 :
891 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
892 :
893 : if(state.core_.winner_exception_)
894 : std::rethrow_exception(state.core_.winner_exception_);
895 :
896 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
897 42 : }
898 :
899 : /** Wait for the first awaitable to complete (void range overload).
900 :
901 : Races a range of void-returning awaitables. Since void awaitables have
902 : no result value, only the winner's index is returned.
903 :
904 : @par Suspends
905 : The calling coroutine suspends when co_await is invoked. All awaitables
906 : in the range are launched concurrently and execute in parallel. The
907 : coroutine resumes only after all awaitables have completed, even though
908 : the winner is determined by the first to finish.
909 :
910 : @par Completion Conditions
911 : @li Winner is determined when the first awaitable completes (success or exception)
912 : @li Only one task can claim winner status via atomic compare-exchange
913 : @li Once a winner exists, stop is requested for all remaining siblings
914 : @li Parent coroutine resumes only after all siblings acknowledge completion
915 : @li The winner's index is returned; if the winner threw, the exception is rethrown
916 :
917 : @par Cancellation Semantics
918 : Cancellation is supported via stop_token propagated through the
919 : IoAwaitable protocol:
920 : @li Each child awaitable receives a stop_token derived from a shared stop_source
921 : @li When the parent's stop token is activated, the stop is forwarded to all children
922 : @li When a winner is determined, stop_source_.request_stop() is called immediately
923 : @li Siblings must handle cancellation gracefully and complete before parent resumes
924 : @li Stop requests are cooperative; tasks must check and respond to them
925 :
926 : @par Concurrency/Overlap
927 : All awaitables are launched concurrently before any can complete.
928 : The launcher iterates through the range, starting each task on the
929 : caller's executor. Tasks may execute in parallel on multi-threaded
930 : executors or interleave on single-threaded executors. There is no
931 : guaranteed ordering of task completion.
932 :
933 : @par Notable Error Conditions
934 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
935 : @li Winner exception: if the winning task threw, that exception is rethrown
936 : @li Non-winner exceptions: silently discarded (only winner's result matters)
937 : @li Cancellation: tasks may complete via cancellation without throwing
938 :
939 : @par Example
940 : @code
941 : task<void> example() {
942 : std::vector<task<void>> tasks;
943 : for (int i = 0; i < 5; ++i)
944 : tasks.push_back(background_work(i));
945 :
946 : std::size_t winner = co_await when_any(std::move(tasks));
947 : // winner is the index of the first task to complete
948 : }
949 : @endcode
950 :
951 : @par Example with Timeout
952 : @code
953 : task<void> with_timeout() {
954 : std::vector<task<void>> tasks;
955 : tasks.push_back(long_running_operation());
956 : tasks.push_back(delay(std::chrono::seconds(5)));
957 :
958 : std::size_t winner = co_await when_any(std::move(tasks));
959 : if (winner == 1) {
960 : // Timeout occurred
961 : }
962 : }
963 : @endcode
964 :
965 : @tparam R Range type satisfying IoAwaitableRange with void result.
966 : @param awaitables Range of void awaitables to race concurrently (must not be empty).
967 : @return A task yielding the winner's index (zero-based).
968 :
969 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
970 : @throws Rethrows the winner's exception if the winning task threw an exception.
971 :
972 : @par Remarks
973 : Elements are moved from the range; for lvalue ranges, the original
974 : container will have moved-from elements after this call. The range
975 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
976 : the non-void overload, no result storage is needed since void tasks
977 : produce no value.
978 :
979 : @see when_any, IoAwaitableRange
980 : */
981 : template<IoAwaitableRange R>
982 : requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
983 3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
984 : {
985 : using OwnedRange = std::remove_cvref_t<R>;
986 :
987 : auto count = std::ranges::size(awaitables);
988 : if(count == 0)
989 : throw std::invalid_argument("when_any requires at least one awaitable");
990 :
991 : // Move/copy range onto coroutine frame to ensure lifetime
992 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
993 :
994 : detail::when_any_homogeneous_state<void> state(count);
995 :
996 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
997 :
998 : if(state.core_.winner_exception_)
999 : std::rethrow_exception(state.core_.winner_exception_);
1000 :
1001 : co_return state.core_.winner_index_;
1002 6 : }
1003 :
1004 : } // namespace capy
1005 : } // namespace boost
1006 :
1007 : #endif
|