1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/concept/executor.hpp>
14  
#include <boost/capy/concept/executor.hpp>
15  
#include <boost/capy/concept/io_awaitable.hpp>
15  
#include <boost/capy/concept/io_awaitable.hpp>
16  
#include <coroutine>
16  
#include <coroutine>
17  
#include <boost/capy/ex/io_env.hpp>
17  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/ex/frame_allocator.hpp>
18  
#include <boost/capy/ex/frame_allocator.hpp>
19  
#include <boost/capy/task.hpp>
19  
#include <boost/capy/task.hpp>
20  

20  

21  
#include <array>
21  
#include <array>
22  
#include <atomic>
22  
#include <atomic>
23  
#include <exception>
23  
#include <exception>
24  
#include <optional>
24  
#include <optional>
25  
#include <stop_token>
25  
#include <stop_token>
26  
#include <tuple>
26  
#include <tuple>
27  
#include <type_traits>
27  
#include <type_traits>
28  
#include <utility>
28  
#include <utility>
29  

29  

30  
namespace boost {
30  
namespace boost {
31  
namespace capy {
31  
namespace capy {
32  

32  

33  
namespace detail {
33  
namespace detail {
34  

34  

35  
/** Type trait to filter void types from a tuple.
35  
/** Type trait to filter void types from a tuple.
36  

36  

37  
    Void-returning tasks do not contribute a value to the result tuple.
37  
    Void-returning tasks do not contribute a value to the result tuple.
38  
    This trait computes the filtered result type.
38  
    This trait computes the filtered result type.
39  

39  

40  
    Example: filter_void_tuple_t<int, void, string> = tuple<int, string>
40  
    Example: filter_void_tuple_t<int, void, string> = tuple<int, string>
41  
*/
41  
*/
42  
template<typename T>
42  
template<typename T>
43  
using wrap_non_void_t = std::conditional_t<std::is_void_v<T>, std::tuple<>, std::tuple<T>>;
43  
using wrap_non_void_t = std::conditional_t<std::is_void_v<T>, std::tuple<>, std::tuple<T>>;
44  

44  

45  
template<typename... Ts>
45  
template<typename... Ts>
46  
using filter_void_tuple_t = decltype(std::tuple_cat(std::declval<wrap_non_void_t<Ts>>()...));
46  
using filter_void_tuple_t = decltype(std::tuple_cat(std::declval<wrap_non_void_t<Ts>>()...));
47  

47  

48  
/** Holds the result of a single task within when_all.
48  
/** Holds the result of a single task within when_all.
49  
*/
49  
*/
50  
template<typename T>
50  
template<typename T>
51  
struct result_holder
51  
struct result_holder
52  
{
52  
{
53  
    std::optional<T> value_;
53  
    std::optional<T> value_;
54  

54  

55  
    void set(T v)
55  
    void set(T v)
56  
    {
56  
    {
57  
        value_ = std::move(v);
57  
        value_ = std::move(v);
58  
    }
58  
    }
59  

59  

60  
    T get() &&
60  
    T get() &&
61  
    {
61  
    {
62  
        return std::move(*value_);
62  
        return std::move(*value_);
63  
    }
63  
    }
64  
};
64  
};
65  

65  

66  
/** Specialization for void tasks - no value storage needed.
66  
/** Specialization for void tasks - no value storage needed.
67  
*/
67  
*/
68  
template<>
68  
template<>
69  
struct result_holder<void>
69  
struct result_holder<void>
70  
{
70  
{
71  
};
71  
};
72  

72  

73  
/** Shared state for when_all operation.
73  
/** Shared state for when_all operation.
74  

74  

75  
    @tparam Ts The result types of the tasks.
75  
    @tparam Ts The result types of the tasks.
76  
*/
76  
*/
77  
template<typename... Ts>
77  
template<typename... Ts>
78  
struct when_all_state
78  
struct when_all_state
79  
{
79  
{
80  
    static constexpr std::size_t task_count = sizeof...(Ts);
80  
    static constexpr std::size_t task_count = sizeof...(Ts);
81  

81  

82  
    // Completion tracking - when_all waits for all children
82  
    // Completion tracking - when_all waits for all children
83  
    std::atomic<std::size_t> remaining_count_;
83  
    std::atomic<std::size_t> remaining_count_;
84  

84  

85  
    // Result storage in input order
85  
    // Result storage in input order
86  
    std::tuple<result_holder<Ts>...> results_;
86  
    std::tuple<result_holder<Ts>...> results_;
87  

87  

88  
    // Runner handles - destroyed in await_resume while allocator is valid
88  
    // Runner handles - destroyed in await_resume while allocator is valid
89  
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
89  
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
90  

90  

91  
    // Exception storage - first error wins, others discarded
91  
    // Exception storage - first error wins, others discarded
92  
    std::atomic<bool> has_exception_{false};
92  
    std::atomic<bool> has_exception_{false};
93  
    std::exception_ptr first_exception_;
93  
    std::exception_ptr first_exception_;
94  

94  

95  
    // Stop propagation - on error, request stop for siblings
95  
    // Stop propagation - on error, request stop for siblings
96  
    std::stop_source stop_source_;
96  
    std::stop_source stop_source_;
97  

97  

98  
    // Connects parent's stop_token to our stop_source
98  
    // Connects parent's stop_token to our stop_source
99  
    struct stop_callback_fn
99  
    struct stop_callback_fn
100  
    {
100  
    {
101  
        std::stop_source* source_;
101  
        std::stop_source* source_;
102  
        void operator()() const { source_->request_stop(); }
102  
        void operator()() const { source_->request_stop(); }
103  
    };
103  
    };
104  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
104  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
105  
    std::optional<stop_callback_t> parent_stop_callback_;
105  
    std::optional<stop_callback_t> parent_stop_callback_;
106  

106  

107  
    // Parent resumption
107  
    // Parent resumption
108  
    std::coroutine_handle<> continuation_;
108  
    std::coroutine_handle<> continuation_;
109  
    io_env const* caller_env_ = nullptr;
109  
    io_env const* caller_env_ = nullptr;
110  

110  

111  
    when_all_state()
111  
    when_all_state()
112  
        : remaining_count_(task_count)
112  
        : remaining_count_(task_count)
113  
    {
113  
    {
114  
    }
114  
    }
115  

115  

116  
    // Runners self-destruct in final_suspend. No destruction needed here.
116  
    // Runners self-destruct in final_suspend. No destruction needed here.
117  

117  

118  
    /** Capture an exception (first one wins).
118  
    /** Capture an exception (first one wins).
119  
    */
119  
    */
120  
    void capture_exception(std::exception_ptr ep)
120  
    void capture_exception(std::exception_ptr ep)
121  
    {
121  
    {
122  
        bool expected = false;
122  
        bool expected = false;
123  
        if(has_exception_.compare_exchange_strong(
123  
        if(has_exception_.compare_exchange_strong(
124  
            expected, true, std::memory_order_relaxed))
124  
            expected, true, std::memory_order_relaxed))
125  
            first_exception_ = ep;
125  
            first_exception_ = ep;
126  
    }
126  
    }
127  

127  

128  
};
128  
};
129  

129  

130  
/** Wrapper coroutine that intercepts task completion.
130  
/** Wrapper coroutine that intercepts task completion.
131  

131  

132  
    This runner awaits its assigned task and stores the result in
132  
    This runner awaits its assigned task and stores the result in
133  
    the shared state, or captures the exception and requests stop.
133  
    the shared state, or captures the exception and requests stop.
134  
*/
134  
*/
135  
template<typename T, typename... Ts>
135  
template<typename T, typename... Ts>
136  
struct when_all_runner
136  
struct when_all_runner
137  
{
137  
{
138  
    struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
138  
    struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
139  
    {
139  
    {
140  
        when_all_state<Ts...>* state_ = nullptr;
140  
        when_all_state<Ts...>* state_ = nullptr;
141  
        io_env env_;
141  
        io_env env_;
142  

142  

143  
        when_all_runner get_return_object()
143  
        when_all_runner get_return_object()
144  
        {
144  
        {
145  
            return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this));
145  
            return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this));
146  
        }
146  
        }
147  

147  

148  
        std::suspend_always initial_suspend() noexcept
148  
        std::suspend_always initial_suspend() noexcept
149  
        {
149  
        {
150  
            return {};
150  
            return {};
151  
        }
151  
        }
152  

152  

153  
        auto final_suspend() noexcept
153  
        auto final_suspend() noexcept
154  
        {
154  
        {
155  
            struct awaiter
155  
            struct awaiter
156  
            {
156  
            {
157  
                promise_type* p_;
157  
                promise_type* p_;
158  

158  

159  
                bool await_ready() const noexcept
159  
                bool await_ready() const noexcept
160  
                {
160  
                {
161  
                    return false;
161  
                    return false;
162  
                }
162  
                }
163  

163  

164  
                std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
164  
                std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
165  
                {
165  
                {
166  
                    // Extract everything needed before self-destruction.
166  
                    // Extract everything needed before self-destruction.
167  
                    auto* state = p_->state_;
167  
                    auto* state = p_->state_;
168  
                    auto* counter = &state->remaining_count_;
168  
                    auto* counter = &state->remaining_count_;
169  
                    auto* caller_env = state->caller_env_;
169  
                    auto* caller_env = state->caller_env_;
170  
                    auto cont = state->continuation_;
170  
                    auto cont = state->continuation_;
171  

171  

172  
                    h.destroy();
172  
                    h.destroy();
173  

173  

174  
                    // If last runner, dispatch parent for symmetric transfer.
174  
                    // If last runner, dispatch parent for symmetric transfer.
175  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
175  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
176  
                    if(remaining == 1)
176  
                    if(remaining == 1)
177  
                        return caller_env->executor.dispatch(cont);
177  
                        return caller_env->executor.dispatch(cont);
178  
                    return std::noop_coroutine();
178  
                    return std::noop_coroutine();
179  
                }
179  
                }
180  

180  

181  
                void await_resume() const noexcept
181  
                void await_resume() const noexcept
182  
                {
182  
                {
183  
                }
183  
                }
184  
            };
184  
            };
185  
            return awaiter{this};
185  
            return awaiter{this};
186  
        }
186  
        }
187  

187  

188  
        void return_void()
188  
        void return_void()
189  
        {
189  
        {
190  
        }
190  
        }
191  

191  

192  
        void unhandled_exception()
192  
        void unhandled_exception()
193  
        {
193  
        {
194  
            state_->capture_exception(std::current_exception());
194  
            state_->capture_exception(std::current_exception());
195  
            // Request stop for sibling tasks
195  
            // Request stop for sibling tasks
196  
            state_->stop_source_.request_stop();
196  
            state_->stop_source_.request_stop();
197  
        }
197  
        }
198  

198  

199  
        template<class Awaitable>
199  
        template<class Awaitable>
200  
        struct transform_awaiter
200  
        struct transform_awaiter
201  
        {
201  
        {
202  
            std::decay_t<Awaitable> a_;
202  
            std::decay_t<Awaitable> a_;
203  
            promise_type* p_;
203  
            promise_type* p_;
204  

204  

205  
            bool await_ready()
205  
            bool await_ready()
206  
            {
206  
            {
207  
                return a_.await_ready();
207  
                return a_.await_ready();
208  
            }
208  
            }
209  

209  

210  
            decltype(auto) await_resume()
210  
            decltype(auto) await_resume()
211  
            {
211  
            {
212  
                return a_.await_resume();
212  
                return a_.await_resume();
213  
            }
213  
            }
214  

214  

215  
            template<class Promise>
215  
            template<class Promise>
216  
            auto await_suspend(std::coroutine_handle<Promise> h)
216  
            auto await_suspend(std::coroutine_handle<Promise> h)
217  
            {
217  
            {
218  
                return a_.await_suspend(h, &p_->env_);
218  
                return a_.await_suspend(h, &p_->env_);
219  
            }
219  
            }
220  
        };
220  
        };
221  

221  

222  
        template<class Awaitable>
222  
        template<class Awaitable>
223  
        auto await_transform(Awaitable&& a)
223  
        auto await_transform(Awaitable&& a)
224  
        {
224  
        {
225  
            using A = std::decay_t<Awaitable>;
225  
            using A = std::decay_t<Awaitable>;
226  
            if constexpr (IoAwaitable<A>)
226  
            if constexpr (IoAwaitable<A>)
227  
            {
227  
            {
228  
                return transform_awaiter<Awaitable>{
228  
                return transform_awaiter<Awaitable>{
229  
                    std::forward<Awaitable>(a), this};
229  
                    std::forward<Awaitable>(a), this};
230  
            }
230  
            }
231  
            else
231  
            else
232  
            {
232  
            {
233  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
233  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
234  
            }
234  
            }
235  
        }
235  
        }
236  
    };
236  
    };
237  

237  

238  
    std::coroutine_handle<promise_type> h_;
238  
    std::coroutine_handle<promise_type> h_;
239  

239  

240  
    explicit when_all_runner(std::coroutine_handle<promise_type> h)
240  
    explicit when_all_runner(std::coroutine_handle<promise_type> h)
241  
        : h_(h)
241  
        : h_(h)
242  
    {
242  
    {
243  
    }
243  
    }
244  

244  

245  
    // Enable move for all clang versions - some versions need it
245  
    // Enable move for all clang versions - some versions need it
246  
    when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
246  
    when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
247  

247  

248  
    // Non-copyable
248  
    // Non-copyable
249  
    when_all_runner(when_all_runner const&) = delete;
249  
    when_all_runner(when_all_runner const&) = delete;
250  
    when_all_runner& operator=(when_all_runner const&) = delete;
250  
    when_all_runner& operator=(when_all_runner const&) = delete;
251  
    when_all_runner& operator=(when_all_runner&&) = delete;
251  
    when_all_runner& operator=(when_all_runner&&) = delete;
252  

252  

253  
    auto release() noexcept
253  
    auto release() noexcept
254  
    {
254  
    {
255  
        return std::exchange(h_, nullptr);
255  
        return std::exchange(h_, nullptr);
256  
    }
256  
    }
257  
};
257  
};
258  

258  

259  
/** Create a runner coroutine for a single awaitable.
259  
/** Create a runner coroutine for a single awaitable.
260  

260  

261  
    Awaitable is passed directly to ensure proper coroutine frame storage.
261  
    Awaitable is passed directly to ensure proper coroutine frame storage.
262  
*/
262  
*/
263  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
263  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
264  
when_all_runner<awaitable_result_t<Awaitable>, Ts...>
264  
when_all_runner<awaitable_result_t<Awaitable>, Ts...>
265  
make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state)
265  
make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state)
266  
{
266  
{
267  
    using T = awaitable_result_t<Awaitable>;
267  
    using T = awaitable_result_t<Awaitable>;
268  
    if constexpr (std::is_void_v<T>)
268  
    if constexpr (std::is_void_v<T>)
269  
    {
269  
    {
270  
        co_await std::move(inner);
270  
        co_await std::move(inner);
271  
    }
271  
    }
272  
    else
272  
    else
273  
    {
273  
    {
274  
        std::get<Index>(state->results_).set(co_await std::move(inner));
274  
        std::get<Index>(state->results_).set(co_await std::move(inner));
275  
    }
275  
    }
276  
}
276  
}
277  

277  

278  
/** Internal awaitable that launches all runner coroutines and waits.
278  
/** Internal awaitable that launches all runner coroutines and waits.
279  

279  

280  
    This awaitable is used inside the when_all coroutine to handle
280  
    This awaitable is used inside the when_all coroutine to handle
281  
    the concurrent execution of child awaitables.
281  
    the concurrent execution of child awaitables.
282  
*/
282  
*/
283  
template<IoAwaitable... Awaitables>
283  
template<IoAwaitable... Awaitables>
284  
class when_all_launcher
284  
class when_all_launcher
285  
{
285  
{
286  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
286  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
287  

287  

288  
    std::tuple<Awaitables...>* awaitables_;
288  
    std::tuple<Awaitables...>* awaitables_;
289  
    state_type* state_;
289  
    state_type* state_;
290  

290  

291  
public:
291  
public:
292  
    when_all_launcher(
292  
    when_all_launcher(
293  
        std::tuple<Awaitables...>* awaitables,
293  
        std::tuple<Awaitables...>* awaitables,
294  
        state_type* state)
294  
        state_type* state)
295  
        : awaitables_(awaitables)
295  
        : awaitables_(awaitables)
296  
        , state_(state)
296  
        , state_(state)
297  
    {
297  
    {
298  
    }
298  
    }
299  

299  

300  
    bool await_ready() const noexcept
300  
    bool await_ready() const noexcept
301  
    {
301  
    {
302  
        return sizeof...(Awaitables) == 0;
302  
        return sizeof...(Awaitables) == 0;
303  
    }
303  
    }
304  

304  

305  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
305  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
306  
    {
306  
    {
307  
        state_->continuation_ = continuation;
307  
        state_->continuation_ = continuation;
308  
        state_->caller_env_ = caller_env;
308  
        state_->caller_env_ = caller_env;
309  

309  

310  
        // Forward parent's stop requests to children
310  
        // Forward parent's stop requests to children
311  
        if(caller_env->stop_token.stop_possible())
311  
        if(caller_env->stop_token.stop_possible())
312  
        {
312  
        {
313  
            state_->parent_stop_callback_.emplace(
313  
            state_->parent_stop_callback_.emplace(
314  
                caller_env->stop_token,
314  
                caller_env->stop_token,
315  
                typename state_type::stop_callback_fn{&state_->stop_source_});
315  
                typename state_type::stop_callback_fn{&state_->stop_source_});
316  

316  

317  
            if(caller_env->stop_token.stop_requested())
317  
            if(caller_env->stop_token.stop_requested())
318  
                state_->stop_source_.request_stop();
318  
                state_->stop_source_.request_stop();
319  
        }
319  
        }
320  

320  

321  
        // CRITICAL: If the last task finishes synchronously then the parent
321  
        // CRITICAL: If the last task finishes synchronously then the parent
322  
        // coroutine resumes, destroying its frame, and destroying this object
322  
        // coroutine resumes, destroying its frame, and destroying this object
323  
        // prior to the completion of await_suspend. Therefore, await_suspend
323  
        // prior to the completion of await_suspend. Therefore, await_suspend
324  
        // must ensure `this` cannot be referenced after calling `launch_one`
324  
        // must ensure `this` cannot be referenced after calling `launch_one`
325  
        // for the last time.
325  
        // for the last time.
326  
        auto token = state_->stop_source_.get_token();
326  
        auto token = state_->stop_source_.get_token();
327  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
327  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
328  
            (..., launch_one<Is>(caller_env->executor, token));
328  
            (..., launch_one<Is>(caller_env->executor, token));
329  
        }(std::index_sequence_for<Awaitables...>{});
329  
        }(std::index_sequence_for<Awaitables...>{});
330  

330  

331  
        // Let signal_completion() handle resumption
331  
        // Let signal_completion() handle resumption
332  
        return std::noop_coroutine();
332  
        return std::noop_coroutine();
333  
    }
333  
    }
334  

334  

335  
    void await_resume() const noexcept
335  
    void await_resume() const noexcept
336  
    {
336  
    {
337  
        // Results are extracted by the when_all coroutine from state
337  
        // Results are extracted by the when_all coroutine from state
338  
    }
338  
    }
339  

339  

340  
private:
340  
private:
341  
    template<std::size_t I>
341  
    template<std::size_t I>
342  
    void launch_one(executor_ref caller_ex, std::stop_token token)
342  
    void launch_one(executor_ref caller_ex, std::stop_token token)
343  
    {
343  
    {
344  
        auto runner = make_when_all_runner<I>(
344  
        auto runner = make_when_all_runner<I>(
345  
            std::move(std::get<I>(*awaitables_)), state_);
345  
            std::move(std::get<I>(*awaitables_)), state_);
346  

346  

347  
        auto h = runner.release();
347  
        auto h = runner.release();
348  
        h.promise().state_ = state_;
348  
        h.promise().state_ = state_;
349  
        h.promise().env_ = io_env{caller_ex, token, state_->caller_env_->allocator};
349  
        h.promise().env_ = io_env{caller_ex, token, state_->caller_env_->allocator};
350  

350  

351  
        std::coroutine_handle<> ch{h};
351  
        std::coroutine_handle<> ch{h};
352  
        state_->runner_handles_[I] = ch;
352  
        state_->runner_handles_[I] = ch;
353  
        state_->caller_env_->executor.post(ch);
353  
        state_->caller_env_->executor.post(ch);
354  
    }
354  
    }
355  
};
355  
};
356  

356  

357  
/** Compute the result type for when_all.
357  
/** Compute the result type for when_all.
358  

358  

359  
    Returns void when all tasks are void (P2300 aligned),
359  
    Returns void when all tasks are void (P2300 aligned),
360  
    otherwise returns a tuple with void types filtered out.
360  
    otherwise returns a tuple with void types filtered out.
361  
*/
361  
*/
362  
template<typename... Ts>
362  
template<typename... Ts>
363  
using when_all_result_t = std::conditional_t<
363  
using when_all_result_t = std::conditional_t<
364  
    std::is_same_v<filter_void_tuple_t<Ts...>, std::tuple<>>,
364  
    std::is_same_v<filter_void_tuple_t<Ts...>, std::tuple<>>,
365  
    void,
365  
    void,
366  
    filter_void_tuple_t<Ts...>>;
366  
    filter_void_tuple_t<Ts...>>;
367  

367  

368  
/** Helper to extract a single result, returning empty tuple for void.
368  
/** Helper to extract a single result, returning empty tuple for void.
369  
    This is a separate function to work around a GCC-11 ICE that occurs
369  
    This is a separate function to work around a GCC-11 ICE that occurs
370  
    when using nested immediately-invoked lambdas with pack expansion.
370  
    when using nested immediately-invoked lambdas with pack expansion.
371  
*/
371  
*/
372  
template<std::size_t I, typename... Ts>
372  
template<std::size_t I, typename... Ts>
373  
auto extract_single_result(when_all_state<Ts...>& state)
373  
auto extract_single_result(when_all_state<Ts...>& state)
374  
{
374  
{
375  
    using T = std::tuple_element_t<I, std::tuple<Ts...>>;
375  
    using T = std::tuple_element_t<I, std::tuple<Ts...>>;
376  
    if constexpr (std::is_void_v<T>)
376  
    if constexpr (std::is_void_v<T>)
377  
        return std::tuple<>();
377  
        return std::tuple<>();
378  
    else
378  
    else
379  
        return std::make_tuple(std::move(std::get<I>(state.results_)).get());
379  
        return std::make_tuple(std::move(std::get<I>(state.results_)).get());
380  
}
380  
}
381  

381  

382  
/** Extract results from state, filtering void types.
382  
/** Extract results from state, filtering void types.
383  
*/
383  
*/
384  
template<typename... Ts>
384  
template<typename... Ts>
385  
auto extract_results(when_all_state<Ts...>& state)
385  
auto extract_results(when_all_state<Ts...>& state)
386  
{
386  
{
387  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
387  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
388  
        return std::tuple_cat(extract_single_result<Is>(state)...);
388  
        return std::tuple_cat(extract_single_result<Is>(state)...);
389  
    }(std::index_sequence_for<Ts...>{});
389  
    }(std::index_sequence_for<Ts...>{});
390  
}
390  
}
391  

391  

392  
} // namespace detail
392  
} // namespace detail
393  

393  

394  
/** Execute multiple awaitables concurrently and collect their results.
394  
/** Execute multiple awaitables concurrently and collect their results.
395  

395  

396  
    Launches all awaitables simultaneously and waits for all to complete
396  
    Launches all awaitables simultaneously and waits for all to complete
397  
    before returning. Results are collected in input order. If any
397  
    before returning. Results are collected in input order. If any
398  
    awaitable throws, cancellation is requested for siblings and the first
398  
    awaitable throws, cancellation is requested for siblings and the first
399  
    exception is rethrown after all awaitables complete.
399  
    exception is rethrown after all awaitables complete.
400  

400  

401  
    @li All child awaitables run concurrently on the caller's executor
401  
    @li All child awaitables run concurrently on the caller's executor
402  
    @li Results are returned as a tuple in input order
402  
    @li Results are returned as a tuple in input order
403  
    @li Void-returning awaitables do not contribute to the result tuple
403  
    @li Void-returning awaitables do not contribute to the result tuple
404  
    @li If all awaitables return void, `when_all` returns `task<void>`
404  
    @li If all awaitables return void, `when_all` returns `task<void>`
405  
    @li First exception wins; subsequent exceptions are discarded
405  
    @li First exception wins; subsequent exceptions are discarded
406  
    @li Stop is requested for siblings on first error
406  
    @li Stop is requested for siblings on first error
407  
    @li Completes only after all children have finished
407  
    @li Completes only after all children have finished
408  

408  

409  
    @par Thread Safety
409  
    @par Thread Safety
410  
    The returned task must be awaited from a single execution context.
410  
    The returned task must be awaited from a single execution context.
411  
    Child awaitables execute concurrently but complete through the caller's
411  
    Child awaitables execute concurrently but complete through the caller's
412  
    executor.
412  
    executor.
413  

413  

414  
    @param awaitables The awaitables to execute concurrently. Each must
414  
    @param awaitables The awaitables to execute concurrently. Each must
415  
        satisfy @ref IoAwaitable and is consumed (moved-from) when
415  
        satisfy @ref IoAwaitable and is consumed (moved-from) when
416  
        `when_all` is awaited.
416  
        `when_all` is awaited.
417  

417  

418  
    @return A task yielding a tuple of non-void results. Returns
418  
    @return A task yielding a tuple of non-void results. Returns
419  
        `task<void>` when all input awaitables return void.
419  
        `task<void>` when all input awaitables return void.
420  

420  

421  
    @par Example
421  
    @par Example
422  

422  

423  
    @code
423  
    @code
424  
    task<> example()
424  
    task<> example()
425  
    {
425  
    {
426  
        // Concurrent fetch, results collected in order
426  
        // Concurrent fetch, results collected in order
427  
        auto [user, posts] = co_await when_all(
427  
        auto [user, posts] = co_await when_all(
428  
            fetch_user( id ),      // task<User>
428  
            fetch_user( id ),      // task<User>
429  
            fetch_posts( id )      // task<std::vector<Post>>
429  
            fetch_posts( id )      // task<std::vector<Post>>
430  
        );
430  
        );
431  

431  

432  
        // Void awaitables don't contribute to result
432  
        // Void awaitables don't contribute to result
433  
        co_await when_all(
433  
        co_await when_all(
434  
            log_event( "start" ),  // task<void>
434  
            log_event( "start" ),  // task<void>
435  
            notify_user( id )      // task<void>
435  
            notify_user( id )      // task<void>
436  
        );
436  
        );
437  
        // Returns task<void>, no result tuple
437  
        // Returns task<void>, no result tuple
438  
    }
438  
    }
439  
    @endcode
439  
    @endcode
440  

440  

441  
    @see IoAwaitable, task
441  
    @see IoAwaitable, task
442  
*/
442  
*/
443  
template<IoAwaitable... As>
443  
template<IoAwaitable... As>
444  
[[nodiscard]] auto when_all(As... awaitables)
444  
[[nodiscard]] auto when_all(As... awaitables)
445  
    -> task<detail::when_all_result_t<detail::awaitable_result_t<As>...>>
445  
    -> task<detail::when_all_result_t<detail::awaitable_result_t<As>...>>
446  
{
446  
{
447  
    using result_type = detail::when_all_result_t<detail::awaitable_result_t<As>...>;
447  
    using result_type = detail::when_all_result_t<detail::awaitable_result_t<As>...>;
448  

448  

449  
    // State is stored in the coroutine frame, using the frame allocator
449  
    // State is stored in the coroutine frame, using the frame allocator
450  
    detail::when_all_state<detail::awaitable_result_t<As>...> state;
450  
    detail::when_all_state<detail::awaitable_result_t<As>...> state;
451  

451  

452  
    // Store awaitables in the frame
452  
    // Store awaitables in the frame
453  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
453  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
454  

454  

455  
    // Launch all awaitables and wait for completion
455  
    // Launch all awaitables and wait for completion
456  
    co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state);
456  
    co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state);
457  

457  

458  
    // Propagate first exception if any.
458  
    // Propagate first exception if any.
459  
    // Safe without explicit acquire: capture_exception() is sequenced-before
459  
    // Safe without explicit acquire: capture_exception() is sequenced-before
460  
    // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
460  
    // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
461  
    // last task's decrement that resumes this coroutine.
461  
    // last task's decrement that resumes this coroutine.
462  
    if(state.first_exception_)
462  
    if(state.first_exception_)
463  
        std::rethrow_exception(state.first_exception_);
463  
        std::rethrow_exception(state.first_exception_);
464  

464  

465  
    // Extract and return results
465  
    // Extract and return results
466  
    if constexpr (std::is_void_v<result_type>)
466  
    if constexpr (std::is_void_v<result_type>)
467  
        co_return;
467  
        co_return;
468  
    else
468  
    else
469  
        co_return detail::extract_results(state);
469  
        co_return detail::extract_results(state);
470  
}
470  
}
471  

471  

472  
/// Compute the result type of `when_all` for the given task types.
472  
/// Compute the result type of `when_all` for the given task types.
473  
template<typename... Ts>
473  
template<typename... Ts>
474  
using when_all_result_type = detail::when_all_result_t<Ts...>;
474  
using when_all_result_type = detail::when_all_result_t<Ts...>;
475  

475  

476  
} // namespace capy
476  
} // namespace capy
477  
} // namespace boost
477  
} // namespace boost
478  

478  

479  
#endif
479  
#endif