Iros
 
Loading...
Searching...
No Matches
when_all.h
Go to the documentation of this file.
1#pragma once
2
29#include "di/function/invoke.h"
32#include "di/meta/algorithm.h"
33#include "di/meta/constexpr.h"
34#include "di/meta/core.h"
35#include "di/meta/util.h"
36#include "di/sync/atomic.h"
40#include "di/types/integers.h"
41#include "di/types/prelude.h"
42#include "di/util/addressof.h"
43#include "di/util/declval.h"
45#include "di/util/get.h"
46#include "di/util/immovable.h"
47#include "di/util/move.h"
48#include "di/util/unreachable.h"
51#include "di/vocab/tuple/tie.h"
58
59namespace di::execution {
60namespace when_all_ns {
61 template<typename T>
63
64 template<typename Env, concepts::SenderIn<Env> Send>
66
67 template<typename Env, typename... Sends>
69
70 template<typename Env, typename... Sends>
71 constexpr inline bool never_sends_value =
72 ((set_value_count<Env, Sends> == 0) || ...); // NOLINT(misc-redundant-expression)
73
74 template<typename Env, typename... Sends>
79 SetStopped()>>;
80
81 template<typename Env, typename... Sends>
82 requires(ValidSenders<Env, Sends...> && (!never_sends_value<Env, Sends...>) )
86
87 template<typename E>
89
90 template<typename E, typename... Sends>
93 meta::AsTemplate<types::CompletionSignatures, meta::PushFront<NonValueCompletions<Env<E>, Sends...>,
94 ValueCompletion<Env<E>, Sends...>>>> {};
95
96 template<typename E, typename... Sends>
97 requires(never_sends_value<E, Sends...>)
99 : meta::TypeConstant<meta::AsTemplate<types::CompletionSignatures, NonValueCompletions<Env<E>, Sends...>>> {};
100
101 template<typename E, typename... Sends>
102 using Sigs = meta::Type<CompletionSignaturesT<E, Sends...>>;
103
104 template<typename... Types>
106
107 template<typename Env, typename... Sends>
109
110 template<typename Env, typename... Sends>
111 requires(ValidSenders<Env, Sends...> && (!never_sends_value<Env, Sends...>) )
113 : meta::TypeConstant<
114 vocab::Tuple<meta::ValueTypesOf<Sends, Env, DecayedOptionalTuple, meta::TypeIdentity>...>> {};
115
116 template<typename Env, typename... Sends>
118
119 template<typename... Types>
121
122 struct NotError {};
123 struct Stopped {};
124
125 template<typename Env, typename... Sends>
129
132
133 void operator()() const noexcept { stop_source.request_stop(); }
134 };
135
136 template<typename Rec, typename... Sends>
137 struct DataT {
138 struct Type {
139 public:
141 using Value = ValueStorage<Env, Sends...>;
142 using Error = ErrorStorage<Env, Sends...>;
143 using Count = usize;
145 using StopCallback = StopToken::template CallbackType<StopCallbackFunction>;
146
147 explicit Type(Rec out_r_) : out_r(util::move(out_r_)) {}
148
149 template<usize index, typename... Types>
150 void report_value(Constexpr<index>, Types&&... values) {
151 if constexpr (never_sends_value<Env, Sends...>) {
152 ((void) values, ...);
153 } else {
154 auto& optional = util::get<index>(value);
155 optional.emplace(util::forward<Types>(values)...);
156 }
157 finish_one();
158 }
159
160 template<typename E>
161 void report_error(E&& error) {
162 auto old = failed.exchange(true, sync::MemoryOrder::AcquireRelease);
163 if (!old) {
164 stop_source.request_stop();
165 this->error.template emplace<meta::Decay<E>>(util::forward<E>(error));
166 }
167 finish_one();
168 }
169
170 void report_stop() {
171 auto old = failed.exchange(true, sync::MemoryOrder::AcquireRelease);
172 if (!old) {
173 stop_source.request_stop();
174 error.template emplace<Stopped>();
175 }
176 finish_one();
177 }
178
179 void finish_one() {
180 auto old_value = remaining.fetch_sub(1, sync::MemoryOrder::AcquireRelease);
181 if (old_value == 1) {
182 // Reset the stop callback.
183 stop_callback.reset();
184
185 // Check if an error occurred. This does not need to be atomic because we're done and just used
186 // acquire release ordering.
190 [&]<typename E>(E&& e) {
192 execution::set_error(util::move(out_r), util::forward<E>(e));
193 }
194 },
195 util::move(error));
196 } else {
197 execution::set_stopped(util::move(out_r));
198 }
199 } else {
200 if constexpr (!never_sends_value<Env, Sends...>) {
201 // Value is a Tuple<Optional<Vs...>, ...>, and we need to return all Vs's. Do this be
202 // concatenating all the tuples and then unpacking them.
203 auto lvalues = vocab::apply(
204 [](auto&... optionals) {
205 return vocab::tuple_cat(vocab::apply(vocab::tie, *optionals)...);
206 },
207 value);
209 [&](auto&... values) {
210 execution::set_value(util::move(out_r), util::move(values)...);
211 },
212 lvalues);
213 } else {
215 }
216 }
217 }
218 }
219
220 [[no_unique_address]] Value value;
221 [[no_unique_address]] Error error;
222 [[no_unique_address]] Rec out_r;
224 sync::Atomic<Count> remaining { sizeof...(Sends) };
227 };
228 };
229
230 template<concepts::Receiver Rec, concepts::Sender... Sends>
231 using Data = meta::Type<DataT<Rec, Sends...>>;
232
233 template<usize index, typename Send, typename Data>
234 struct ReceiverT {
235 struct Type {
236 using is_receiver = void;
237
239
240 template<typename... Types>
241 friend void tag_invoke(types::Tag<execution::set_value>, Type&& self, Types&&... values) {
242 self.data->report_value(c_<index>, util::forward<Types>(values)...);
243 }
244
245 template<typename E>
246 friend void tag_invoke(types::Tag<execution::set_error>, Type&& self, E&& error) {
247 self.data->report_error(util::forward<E>(error));
248 }
249
250 friend void tag_invoke(types::Tag<execution::set_stopped>, Type&& self) { self.data->report_stop(); }
251
253 return make_env(get_env(self.data->out_r),
254 with(get_stop_token, self.data->stop_source.get_stop_token()));
255 }
256 };
257 };
258
259 template<usize index, concepts::Sender Send, typename Data>
261
262 template<typename Rec, typename Indices, typename Sends>
264
265 template<typename Rec, usize... indices, typename... Sends>
266 struct OperationStateT<Rec, meta::ListV<indices...>, meta::List<Sends...>> {
268 public:
269 using Data = when_all_ns::Data<Rec, Sends...>;
271
272 explicit Type(Rec out_r, Sends&&... senders)
273 : m_data(util::move(out_r))
274 , m_op_states(util::DeferConstruct([&] -> meta::ConnectResult<Sends, Receiver<indices, Sends, Data>> {
275 return execution::connect(util::forward<Sends>(senders),
276 Receiver<indices, Sends, Data> { util::addressof(m_data) });
277 })...) {}
278
279 private:
281 // Emplace construct stop callback.
282 self.m_data.stop_callback.emplace(execution::get_stop_token(execution::get_env(self.m_data.out_r)),
283 StopCallbackFunction { self.m_data.stop_source });
284
285 // Check if stop requested:
286 if (self.m_data.stop_source.stop_requested()) {
287 execution::set_stopped(util::move(self.m_data.out_r));
288 return;
289 }
290
291 // Call start on all operations.
292 vocab::tuple_for_each(execution::start, self.m_op_states);
293 }
294
295 [[no_unique_address]] Data m_data;
296 [[no_unique_address]] OpStates m_op_states;
297 };
298 };
299
300 template<concepts::Receiver Rec, typename Indices, concepts::TypeList Sends>
302
303 template<typename... Senders>
304 struct SenderT {
305 struct Type {
306 public:
307 using is_sender = void;
308
309 template<typename... Ts>
310 explicit Type(InPlace, Ts&&... ts) : m_senders(util::forward<Ts>(ts)...) {}
311
312 private:
313 template<concepts::RemoveCVRefSameAs<Type> Self, typename E>
317 return {};
318 }
319
320 template<concepts::RemoveCVRefSameAs<Type> Self, concepts::Receiver Rec>
322 friend auto tag_invoke(types::Tag<execution::connect>, Self&& self, Rec out_r) {
323 return vocab::apply(
324 [&](auto&&... senders) {
325 return OperationState<Rec, meta::MakeIndexSequence<sizeof...(Senders)>,
327 util::move(out_r), util::forward<decltype(senders)>(senders)...
328 };
329 },
330 util::forward<Self>(self).m_senders);
331 }
332
333 vocab::Tuple<Senders...> m_senders;
334 };
335 };
336
337 template<concepts::Sender... Senders>
338 using Sender = meta::Type<SenderT<Senders...>>;
339
340 struct Function {
341 template<concepts::Sender... Senders>
342 requires(sizeof...(Senders) > 0)
343 auto operator()(Senders&&... senders) const -> concepts::Sender auto {
344 if constexpr (concepts::TagInvocable<Function, Senders...>) {
345 return function::tag_invoke(*this, util::forward<Senders>(senders)...);
346 } else {
347 return Sender<meta::RemoveCVRef<Senders>...> { in_place, util::forward<Senders>(senders)... };
348 }
349 }
350 };
351
353 template<concepts::Sender... Senders>
354 requires(sizeof...(Senders) > 0)
355 auto operator()(Senders&&... senders) const -> concepts::Sender auto {
356 if constexpr (concepts::TagInvocable<VariantFunction, Senders...>) {
357 return function::tag_invoke(*this, util::forward<Senders>(senders)...);
358 } else {
359 return Function {}(execution::into_variant(util::forward<Senders>(senders))...);
360 }
361 }
362 };
363
365 template<concepts::Scheduler Sched, concepts::Sender... Senders>
366 requires(sizeof...(Senders) > 0)
367 auto operator()(Sched&& sched, Senders&&... senders) const -> concepts::Sender auto {
368 if constexpr (concepts::TagInvocable<TransferFunction, Sched, Senders...>) {
369 return function::tag_invoke(*this, util::forward<Sched>(sched), util::forward<Senders>(senders)...);
370 } else {
371 return execution::transfer(Function {}(util::forward<Sender>(senders)...), util::forward<Sched>(sched));
372 }
373 }
374 };
375
377 template<concepts::Scheduler Sched, concepts::Sender... Senders>
378 requires(sizeof...(Senders) > 0)
379 auto operator()(Sched&& sched, Senders&&... senders) const -> concepts::Sender auto {
380 if constexpr (concepts::TagInvocable<TransferVariantFunction, Sched, Senders...>) {
381 return function::tag_invoke(*this, util::forward<Sched>(sched), util::forward<Senders>(senders)...);
382 } else {
383 return TransferFunction {}(util::forward<Sched>(sched),
384 execution::into_variant(util::forward<Senders>(senders))...);
385 }
386 }
387 };
388}
389
390constexpr inline auto when_all = when_all_ns::Function {};
394}
Definition atomic.h:15
Definition in_place_stop_source.h:11
Definition in_place_stop_token.h:6
Definition defer_construct.h:8
Definition optional_forward_declaration.h:5
Definition tuple_forward_declaration.h:5
Definition core.h:117
Definition receiver_of.h:25
Definition receiver.h:10
Definition scheduler.h:21
Definition sender_in.h:10
Definition sender.h:11
Definition tag_invoke.h:33
Definition when_all.h:60
meta::AsTemplate< vocab::Variant, meta::Unique< meta::List< meta::Decay< Types >... > > > DecayedVariant
Definition when_all.h:120
meta::AsLanguageFunction< SetValue, meta::Transform< meta::Concat< meta::ValueTypesOf< Sends, Env, meta::List, meta::TypeIdentity >... >, meta::Quote< DecayedRValue > > > ValueCompletion
Definition when_all.h:83
vocab::Optional< vocab::Tuple< meta::Decay< Types >... > > DecayedOptionalTuple
Definition when_all.h:105
meta::Type< DataT< Rec, Sends... > > Data
Definition when_all.h:231
meta::Type< OperationStateT< Rec, Indices, Sends > > OperationState
Definition when_all.h:301
meta::Unique< meta::PushBack< meta::Transform< meta::Concat< meta::ErrorTypesOf< Sends, Env, meta::List >... >, meta::Compose< meta::BindFront< meta::Quote< meta::AsLanguageFunction >, SetError >, meta::Quote< meta::List >, meta::Quote< DecayedRValue > > >, SetStopped()> > NonValueCompletions
Definition when_all.h:75
meta::Type< CompletionSignaturesT< E, Sends... > > Sigs
Definition when_all.h:102
meta::Type< SenderT< Senders... > > Sender
Definition when_all.h:338
MakeEnv< E, With< Tag< get_stop_token >, sync::InPlaceStopToken > > Env
Definition when_all.h:88
meta::Type< ValueStorageT< Env, Sends... > > ValueStorage
Definition when_all.h:117
meta::Type< ReceiverT< index, Send, Data > > Receiver
Definition when_all.h:260
meta::AsTemplate< DecayedVariant, meta::Concat< meta::List< NotError, Stopped >, meta::ErrorTypesOf< Sends, Env, meta::List >... > > ErrorStorage
Definition when_all.h:126
meta::Decay< T > && DecayedRValue
Definition when_all.h:62
constexpr bool never_sends_value
Definition when_all.h:71
constexpr usize set_value_count
Definition when_all.h:65
Definition bulk.h:30
constexpr auto set_error
Definition set_error.h:14
constexpr auto transfer
Definition transfer.h:30
constexpr auto start
Definition start.h:20
constexpr auto into_variant
Definition into_variant.h:86
constexpr auto make_env
Create an environment with overrides for queries.
Definition make_env.h:147
constexpr auto when_all
Definition when_all.h:390
constexpr auto set_stopped
Definition set_stopped.h:14
constexpr auto with
Specify an override for an environment query.
Definition make_env.h:112
constexpr auto transfer_when_all_with_variant
Definition when_all.h:393
constexpr auto get_stop_token
Definition get_stop_token.h:25
decltype(make_env(util::declval< BaseEnv >(), util::declval< Withs >()...)) MakeEnv
Represent an environment with overrides for queries.
Definition make_env.h:189
constexpr auto get_env
Definition get_env.h:27
constexpr auto transfer_when_all
Definition when_all.h:392
constexpr auto connect
Definition connect.h:42
constexpr auto set_value
Definition set_value.h:14
constexpr auto when_all_with_variant
Definition when_all.h:391
constexpr tag_invoke_detail::TagInvokeFn tag_invoke
Definition tag_invoke.h:22
Definition merge_interfaces.h:6
meta::RemoveCVRef< decltype(execution::get_stop_token(util::declval< T >()))> StopTokenOf
Definition stop_token_of.h:7
T::Type Type
Definition core.h:26
detail::ComposeHelper< Funs... > Compose
Definition function.h:99
Type< detail::AsTemplateHelper< Template, T > > AsTemplate
Definition algorithm.h:60
Conditional< concepts::LanguageArray< RemoveReference< T > >, RemoveExtent< RemoveReference< T > > *, Conditional< concepts::LanguageFunction< RemoveReference< T > >, AddPointer< RemoveReference< T > >, RemoveCVRef< T > > > Decay
Definition language.h:574
constexpr usize Size
Definition list.h:106
Type< detail::LikeHelper< T, U > > Like
Definition language.h:468
MakeIntegerSequence< usize, count > MakeIndexSequence
Definition algorithm.h:285
Fold< Lst, List<>, detail::PushBackIfUnique > Unique
Definition algorithm.h:203
GatherSignatures< execution::SetValue, Sender, Env, Tup, Var > ValueTypesOf
Definition value_types_of.h:14
Concat< L, List< T > > PushBack
Definition algorithm.h:86
GatherSignatures< execution::SetError, Sender, Env, meta::TypeIdentity, Var > ErrorTypesOf
Definition error_types_of.h:13
Type< detail::ConcatHelper< Lists... > > Concat
Definition algorithm.h:80
detail::TransformHelper< List, Function >::Type Transform
Definition algorithm.h:186
Type< detail::AsLanguageFunction< R, T > > AsLanguageFunction
Definition algorithm.h:31
decltype(execution::get_env(util::declval< T >())) EnvOf
Definition env_of.h:8
@ AcquireRelease
Definition memory_order.h:11
Definition method.h:5
size_t usize
Definition integers.h:33
di::meta::Decay< decltype(T)> Tag
Definition tag_invoke.h:28
Definition vocab.h:96
constexpr auto get(T &&value) -> decltype(auto)
Definition get.h:8
void unreachable()
Definition unreachable.h:4
Definition lazy.h:165
constexpr auto tuple_cat(Tups &&... tuples)
Definition tuple_cat.h:11
constexpr auto tie
Definition tie.h:15
constexpr auto apply(F &&f, Tup &&tuple) -> decltype(detail::apply_impl(meta::MakeIndexSequence< meta::TupleSize< Tup > > {}, util::forward< F >(f), util::forward< Tup >(tuple)))
Definition apply.h:22
constexpr auto visit(Vis &&visitor, Vars &&... variants) -> R
Definition visit.h:39
constexpr void tuple_for_each(F &&function, Tup &&tuple)
Definition tuple_for_each.h:22
constexpr auto holds_alternative
Definition holds_alternative.h:21
constexpr auto c_
A value of type Constexpr<val>.
Definition constexpr.h:252
constexpr auto in_place
Definition in_place.h:8
Definition set_error.h:6
Definition set_stopped.h:6
Definition set_value.h:6
StopToken::template CallbackType< StopCallbackFunction > StopCallback
Definition when_all.h:145
Type(Rec out_r_)
Definition when_all.h:147
ErrorStorage< Env, Sends... > Error
Definition when_all.h:142
void report_value(Constexpr< index >, Types &&... values)
Definition when_all.h:150
usize Count
Definition when_all.h:143
ValueStorage< Env, Sends... > Value
Definition when_all.h:141
sync::Atomic< bool > failed
Definition when_all.h:225
sync::Atomic< Count > remaining
Definition when_all.h:224
meta::EnvOf< Rec > Env
Definition when_all.h:140
Error error
Definition when_all.h:221
void finish_one()
Definition when_all.h:179
void report_error(E &&error)
Definition when_all.h:161
vocab::Optional< StopCallback > stop_callback
Definition when_all.h:226
void report_stop()
Definition when_all.h:170
sync::InPlaceStopSource stop_source
Definition when_all.h:223
Rec out_r
Definition when_all.h:222
meta::StopTokenOf< Env > StopToken
Definition when_all.h:144
Value value
Definition when_all.h:220
Definition when_all.h:137
Definition when_all.h:340
Definition when_all.h:122
vocab::Tuple< meta::ConnectResult< Sends, Receiver< indices, Sends, Data > >... > OpStates
Definition when_all.h:270
friend void tag_invoke(types::Tag< execution::start >, Type &self)
Definition when_all.h:280
Data * data
Definition when_all.h:238
friend void tag_invoke(types::Tag< execution::set_stopped >, Type &&self)
Definition when_all.h:250
friend auto tag_invoke(types::Tag< execution::get_env >, Type const &self)
Definition when_all.h:252
friend void tag_invoke(types::Tag< execution::set_error >, Type &&self, E &&error)
Definition when_all.h:246
void is_receiver
Definition when_all.h:236
friend void tag_invoke(types::Tag< execution::set_value >, Type &&self, Types &&... values)
Definition when_all.h:241
Definition when_all.h:234
Type(InPlace, Ts &&... ts)
Definition when_all.h:310
void is_sender
Definition when_all.h:307
friend auto tag_invoke(types::Tag< execution::get_completion_signatures >, Self &&, E &&) -> Sigs< E, meta::Like< Self, Senders >... >
Definition when_all.h:315
friend auto tag_invoke(types::Tag< execution::connect >, Self &&self, Rec out_r)
Definition when_all.h:322
Definition when_all.h:304
sync::InPlaceStopSource & stop_source
Definition when_all.h:131
void operator()() const noexcept
Definition when_all.h:133
Definition when_all.h:123
A wrapper for a constexpr value.
Definition core.h:77
Definition core.h:5
Definition function.h:30
Definition core.h:18
Definition completion_signuatures.h:7
Definition in_place.h:4
Definition immovable.h:4