Iros
 
Loading...
Searching...
No Matches
ensure_started.h
Go to the documentation of this file.
1#pragma once
2
25#include "di/meta/algorithm.h"
26#include "di/meta/core.h"
27#include "di/meta/util.h"
29#include "di/sync/atomic.h"
33#include "di/types/in_place.h"
34#include "di/types/integers.h"
35#include "di/types/prelude.h"
36#include "di/util/addressof.h"
37#include "di/util/exchange.h"
38#include "di/util/immovable.h"
41
42namespace di::execution {
44 // NOTE: The offical specification uses std::stop_source, which we do not have. An InPlaceStopSource should be fine
45 // however, since we manually manage its lifetime.
48
49 template<typename T>
51
53
54 template<typename... Values>
56
57 template<typename E>
59
60 template<typename Seq>
62
65
66 void operator()() const noexcept { stop_source.request_stop(); }
67 };
68
69 template<typename SharedState>
71 struct Type {
72 using is_receiver = void;
73
75
76 template<typename... Args>
77 friend void tag_invoke(Tag<set_value>, Type&& self, Args&&... values) {
78 self.state->complete_with(set_value, util::forward<Args>(values)...);
79 }
80
81 template<typename E>
82 friend void tag_invoke(Tag<set_error>, Type&& self, E&& error) {
83 self.state->complete_with(set_error, util::forward<E>(error));
84 }
85
86 friend void tag_invoke(Tag<set_stopped>, Type&& self) { self.state->complete_with(set_stopped); }
87
88 friend auto tag_invoke(Tag<get_env>, Type const& self) -> Env {
89 return make_env(empty_env, with(get_stop_token, self.state->stop_source.get_stop_token()));
90 }
91 };
92 };
93
94 template<typename SharedState>
96
97 template<typename Send, typename SenderAttr, typename Alloc>
98 struct SharedStateT {
101 using Tags =
112
114
115 template<typename A>
116 explicit Type(Send&& sender, A&& allocator_)
117 : did_complete([this] {
118 // Destroy the shared state when the sender completes. Since we store the allocator
119 // ourselves, we must first move it out of the operation state before destroying ourselves.
120 auto allocator = util::move(this->allocator);
121 auto* pointer = this;
122 util::destroy_at(pointer);
124 })
125 , sender_attr(get_env(sender))
126 , allocator(util::forward<A>(allocator_))
127 , operation(connect(util::forward<Send>(sender), SharedReceiver<Type>(this))) {
129 }
130
131 template<typename... Args>
132 void complete_with(Args&&... args) {
133 storage.template emplace<meta::DecayedTuple<Args...>>(util::forward<Args>(args)...);
134 finish_one();
135 }
136
137 void finish_one(bool request_stop = false) {
138 auto old = ref_count.fetch_sub(1, sync::MemoryOrder::AcquireRelease);
139 if (old == 1) {
140 did_complete();
141 } else if (request_stop) {
142 stop_source.request_stop();
143 }
144 }
145
149 SenderAttr sender_attr;
151 [[no_unique_address]] Alloc allocator;
153 };
154 };
155
156 template<typename Send, typename SenderAttr, typename Alloc>
158
159 template<typename Send, typename Rec, typename SenderAttr, typename Alloc>
162 public:
164
165 explicit Type(State* state, Rec receiver) : m_state(state), m_receiver(util::forward<Rec>(receiver)) {}
166
168 auto started = util::exchange(m_started, true);
169 if (!started) {
170 m_state->finish_one(true);
171 }
172 }
173
174 private:
175 friend void tag_invoke(Tag<start>, Type& self) {
176 self.m_started = true;
177
178 auto completion_callback = [&self] {
179 // Reset the stop callback.
180 self.m_stop_callback.reset();
181
182 // Forward the completion to the receiver.
183 auto& state = *self.m_state;
185 [&](auto&& tuple) {
187 [&](auto tag, auto&&... values) {
188 if constexpr (concepts::SameAs<SetValue, decltype(tag)>) {
189 set_value(util::move(self.m_receiver),
190 util::forward<decltype(values)>(values)...);
191 } else if constexpr (concepts::SameAs<SetError, decltype(tag)>) {
192 set_error(util::move(self.m_receiver),
193 util::forward<decltype(values)>(values)...);
194 } else if constexpr (concepts::SameAs<SetStopped, decltype(tag)>) {
195 set_stopped(util::move(self.m_receiver));
196 }
197 },
198 util::forward<decltype(tuple)>(tuple));
199 },
200 util::move(state.storage));
201
202 // Destroy the shared state when the sender completes. Since we store the allocator
203 // ourselves, we must first move it out of the operation state before destroying ourselves.
204 auto allocator = util::move(state.allocator);
205 auto* pointer = util::addressof(state);
206 util::destroy_at(pointer);
207 container::deallocate_one<State>(allocator, pointer);
208 };
209
210 // If the original sender has finished, report the completions:
211 auto& state = *self.m_state;
212 if (state.ref_count.load(sync::MemoryOrder::Relaxed) == 1) {
213 state.did_complete = completion_callback;
214 return state.finish_one();
215 }
216
217 // Emplace the stop callback.
218 self.m_stop_callback.emplace(execution::get_stop_token(execution::get_env(self.m_receiver)),
219 StopCallbackFunction { state.stop_source });
220
221 // If stop was requested, just complete with set_stopped().
222 if (state.stop_source.stop_requested()) {
223 set_stopped(util::move(self.m_receiver));
224 return state.finish_one();
225 }
226
227 // Register the completion callback.
228 state.did_complete = completion_callback;
229 state.finish_one();
230 }
231
232 State* m_state;
233 Rec m_receiver;
234 vocab::Optional<typename meta::StopTokenOf<meta::EnvOf<Rec>>::template CallbackType<StopCallbackFunction>>
235 m_stop_callback;
236 bool m_started { false };
237 };
238 };
239
240 template<typename Send, typename Rec, typename SenderAttr, typename Alloc>
242
243 template<typename Send, typename Alloc>
244 struct SenderT {
245 struct Type {
246 public:
247 using is_sender = void;
248
250
252
253 explicit Type(State* state) : m_state(state) { DI_ASSERT(state); }
254
255 Type(Type&& other) : m_state(util::exchange(other.m_state, nullptr)) {}
256
258 auto state = util::exchange(m_state, nullptr);
259 if (state) {
260 state->finish_one(true);
261 }
262 }
263
264 auto operator=(Type&& other) -> Type& {
265 this->m_state = util::exchange(other.m_state, nullptr);
266 return *this;
267 }
268
269 private:
270 template<concepts::ReceiverOf<CompletionSignatures> Rec>
271 friend auto tag_invoke(Tag<connect>, Type&& self, Rec receiver) {
272 DI_ASSERT(self.m_state);
273 return OperationState<Send, Rec, meta::EnvOf<Send>, Alloc>(util::exchange(self.m_state, nullptr),
274 util::move(receiver));
275 }
276
277 friend auto tag_invoke(Tag<get_env>, Type const& self) -> meta::Decay<meta::EnvOf<Send>> const& {
278 DI_ASSERT(self.m_state);
279 return self.m_state->sender_attr;
280 }
281
282 State* m_state { nullptr };
283 };
284 };
285
286 template<typename Send, typename Alloc>
288
289 struct Function {
290 template<concepts::SenderIn<Env> Send, concepts::Allocator Alloc = platform::DefaultAllocator>
292 auto operator()(Send&& sender, Alloc&& allocator = {}) const {
293 if constexpr (requires {
295 util::forward<Send>(sender), util::forward<Alloc>(allocator));
296 }) {
298 util::forward<Send>(sender), util::forward<Alloc>(allocator));
299 } else if constexpr (requires {
300 tag_invoke(*this, util::forward<Send>(sender), util::forward<Alloc>(allocator));
301 }) {
302 return tag_invoke(*this, util::forward<Send>(sender), util::forward<Alloc>(allocator));
303 } else {
304 using State = SharedState<Send, meta::EnvOf<Send>, Alloc>;
305 return vocab::as_fallible(container::allocate_one<State>(allocator)) % [&](State* state) {
306 util::construct_at(state, util::forward<Send>(sender), util::forward<Alloc>(allocator));
307 return Sender<Send, Alloc>(state);
309 }
310 }
311 };
312}
313
334}
#define DI_ASSERT(...)
Definition assert_bool.h:7
Definition atomic.h:15
Definition in_place_stop_source.h:11
Definition in_place_stop_token.h:6
Definition optional_forward_declaration.h:5
Definition tuple_forward_declaration.h:5
Definition variant_forward_declaration.h:6
#define DI_IMMOVABLE_NO_UNIQUE_ADDRESS
Definition compiler.h:15
Definition core.h:114
constexpr auto allocate_one
Definition allocate_one.h:29
constexpr auto deallocate_one
Definition deallocate_one.h:27
Definition ensure_started.h:43
meta::Decay< T > && DecayedRValue
Definition ensure_started.h:50
meta::Type< SenderT< Send, meta::Decay< Alloc > > > Sender
Definition ensure_started.h:287
meta::Type< SharedReceiverT< SharedState > > SharedReceiver
Definition ensure_started.h:95
meta::Type< OperationStateT< Send, Rec, meta::Decay< SenderAttr >, Alloc > > OperationState
Definition ensure_started.h:241
MakeEnv< EmptyEnv, With< Tag< get_stop_token >, StopToken > > Env
Definition ensure_started.h:52
CompletionSignatures< SetError(meta::Decay< E > &&)> SigSetError
Definition ensure_started.h:58
meta::Type< SharedStateT< Send, meta::Decay< SenderAttr >, meta::Decay< Alloc > > > SharedState
Definition ensure_started.h:157
CompletionSignatures< SetValue(meta::Decay< Values > &&...)> SigSetValue
Definition ensure_started.h:55
meta::MakeCompletionSignatures< Seq, Env, CompletionSignatures<>, SigSetValue, SigSetError > Sigs
Definition ensure_started.h:61
sync::InPlaceStopSource StopSource
Definition ensure_started.h:46
sync::InPlaceStopToken StopToken
Definition ensure_started.h:47
Definition bulk.h:30
constexpr auto set_error
Definition set_error.h:14
constexpr auto ensure_started
Eagerly start a sender.
Definition ensure_started.h:333
constexpr auto start
Definition start.h:20
constexpr auto make_env
Create an environment with overrides for queries.
Definition make_env.h:147
constexpr auto request_stop
Request that a scope stop.
Definition scope.h:48
constexpr auto set_stopped
Definition set_stopped.h:14
constexpr auto with
Specify an override for an environment query.
Definition make_env.h:112
constexpr auto get_completion_scheduler
Definition get_completion_scheduler.h:19
constexpr auto get_stop_token
Definition get_stop_token.h:25
decltype(make_env(util::declval< BaseEnv >(), util::declval< Withs >()...)) MakeEnv
Represent an environment with overrides for queries.
Definition make_env.h:189
constexpr auto get_env
Definition get_env.h:27
constexpr auto connect
Definition connect.h:42
constexpr auto set_value
Definition set_value.h:14
T::Type Type
Definition core.h:26
detail::ComposeHelper< Funs... > Compose
Definition function.h:99
Type< detail::AsTemplateHelper< Template, T > > AsTemplate
Definition algorithm.h:60
Conditional< concepts::LanguageArray< RemoveReference< T > >, RemoveExtent< RemoveReference< T > > *, Conditional< concepts::LanguageFunction< RemoveReference< T > >, AddPointer< RemoveReference< T > >, RemoveCVRef< T > > > Decay
Definition language.h:574
Concat< List< T >, L > PushFront
Definition algorithm.h:83
Type< detail::AsListHelper< T > > AsList
Definition algorithm.h:48
Type< detail::ZipHelper< T, U > > Zip
Definition algorithm.h:216
Fold< Lst, List<>, detail::PushBackIfUnique > Unique
Definition algorithm.h:203
vocab::Tuple< meta::Decay< Types >... > DecayedTuple
Definition decayed_tuple.h:8
Type< detail::MakeCompletionSignaturesHelper< ExtraSigs, meta::ValueTypesOf< Send, Env, SetValue, meta::List >, meta::Transform< meta::ErrorTypesOf< Send, Env, meta::List >, meta::Quote< SetError > >, meta::Conditional< meta::sends_stopped< Send, Env >, SetStopped, types::CompletionSignatures<> > > > MakeCompletionSignatures
Definition make_completion_signatures.h:36
detail::TransformHelper< List, Function >::Type Transform
Definition algorithm.h:186
decltype(execution::connect(util::declval< Send >(), util::declval< Rec >())) ConnectResult
Definition connect_result.h:7
@ Relaxed
Definition memory_order.h:7
@ AcquireRelease
Definition memory_order.h:11
di::meta::Decay< decltype(T)> Tag
Definition tag_invoke.h:28
Definition vocab.h:96
constexpr auto exchange(T &object, U &&new_value) -> T
Definition exchange.h:8
constexpr auto destroy_at
Definition destroy_at.h:24
constexpr auto construct_at
Definition construct_at.h:27
constexpr auto apply(F &&f, Tup &&tuple) -> decltype(detail::apply_impl(meta::MakeIndexSequence< meta::TupleSize< Tup > > {}, util::forward< F >(f), util::forward< Tup >(tuple)))
Definition apply.h:22
constexpr auto as_fallible
Definition as_fallible.h:26
constexpr auto try_infallible
Definition try_infallible.h:31
constexpr auto visit(Vis &&visitor, Vars &&... variants) -> R
Definition visit.h:39
constexpr tag_invoke_detail::TagInvokeFn tag_invoke
Definition tag_invoke.h:22
constexpr auto exchange(T &object, U &&new_value) -> T
Definition exchange.h:8
constexpr auto empty_env
Definition empty_env.h:6
Definition set_error.h:6
Definition set_stopped.h:6
Definition set_value.h:6
Definition ensure_started.h:289
Type(State *state, Rec receiver)
Definition ensure_started.h:165
friend void tag_invoke(Tag< start >, Type &self)
Definition ensure_started.h:175
SharedState< Send, SenderAttr, Alloc > State
Definition ensure_started.h:163
void is_sender
Definition ensure_started.h:247
~Type()
Definition ensure_started.h:257
auto operator=(Type &&other) -> Type &
Definition ensure_started.h:264
Sigs< Send > CompletionSignatures
Definition ensure_started.h:249
Type(State *state)
Definition ensure_started.h:253
Type(Type &&other)
Definition ensure_started.h:255
SharedState< Send, meta::EnvOf< Send >, Alloc > State
Definition ensure_started.h:251
friend auto tag_invoke(Tag< connect >, Type &&self, Rec receiver)
Definition ensure_started.h:271
friend auto tag_invoke(Tag< get_env >, Type const &self) -> meta::Decay< meta::EnvOf< Send > > const &
Definition ensure_started.h:277
Definition ensure_started.h:244
friend void tag_invoke(Tag< set_error >, Type &&self, E &&error)
Definition ensure_started.h:82
friend void tag_invoke(Tag< set_stopped >, Type &&self)
Definition ensure_started.h:86
friend auto tag_invoke(Tag< get_env >, Type const &self) -> Env
Definition ensure_started.h:88
void is_receiver
Definition ensure_started.h:72
friend void tag_invoke(Tag< set_value >, Type &&self, Args &&... values)
Definition ensure_started.h:77
SharedState * state
Definition ensure_started.h:74
sync::Atomic< usize > ref_count
Definition ensure_started.h:150
meta::Transform< Completions, meta::Compose< meta::Quote< meta::List >, meta::Quote< meta::LanguageFunctionReturn > > > Tags
Definition ensure_started.h:101
Alloc allocator
Definition ensure_started.h:151
Type(Send &&sender, A &&allocator_)
Definition ensure_started.h:116
Storage storage
Definition ensure_started.h:147
DI_IMMOVABLE_NO_UNIQUE_ADDRESS Op operation
Definition ensure_started.h:152
void complete_with(Args &&... args)
Definition ensure_started.h:132
Function< void()> did_complete
Definition ensure_started.h:146
meta::AsTemplate< vocab::Variant, meta::PushFront< meta::Unique< meta::Transform< meta::Zip< Tags, DecayedArgs >, meta::Compose< meta::Uncurry< meta::Quote< meta::DecayedTuple > >, meta::Quote< meta::Join > > > >, vocab::Tuple< Void > > > Storage
Definition ensure_started.h:105
void finish_one(bool request_stop=false)
Definition ensure_started.h:137
meta::AsList< meta::CompletionSignaturesOf< Send, Env > > Completions
Definition ensure_started.h:100
SenderAttr sender_attr
Definition ensure_started.h:149
meta::Transform< Completions, meta::Quote< meta::AsList > > DecayedArgs
Definition ensure_started.h:104
StopSource stop_source
Definition ensure_started.h:148
meta::ConnectResult< Send, SharedReceiver< Type > > Op
Definition ensure_started.h:113
void operator()() const noexcept
Definition ensure_started.h:66
StopSource & stop_source
Definition ensure_started.h:64
Definition function.h:30
Definition completion_signuatures.h:7
Definition immovable.h:4