Iros
 
Loading...
Searching...
No Matches
split.h
Go to the documentation of this file.
1#pragma once
2
20#include "di/meta/algorithm.h"
22#include "di/sync/atomic.h"
25#include "di/util/addressof.h"
26#include "di/util/immovable.h"
28
29namespace di::execution {
30namespace split_ns {
31 // NOTE: The offical specification uses std::stop_source, which we do not have. An InPlaceStopSource should be fine
32 // however, since we manually manage its lifetime.
35
36 template<typename T>
38
40
41 template<typename... Values>
43
44 template<typename E>
46
47 template<typename Send>
48 using Sigs =
50
53
54 void operator()() const noexcept { stop_source.request_stop(); }
55 };
56
59 explicit Type(void (*did_complete_)(Type*)) : did_complete(did_complete_) {}
60
61 void (*did_complete)(Type*);
62 Type* next { nullptr };
63 };
64 };
65
67
68 template<typename SharedState>
70 struct Type {
71 using is_receiver = void;
72
74
75 template<typename... Args>
76 friend void tag_invoke(Tag<set_value>, Type&& self, Args&&... values) {
77 self.state->complete_with(set_value, util::forward<Args>(values)...);
78 }
79
80 template<typename E>
81 friend void tag_invoke(Tag<set_error>, Type&& self, E&& error) {
82 self.state->complete_with(set_error, util::forward<E>(error));
83 }
84
85 friend void tag_invoke(Tag<set_stopped>, Type&& self) { self.state->complete_with(set_stopped); }
86
87 friend auto tag_invoke(Tag<get_env>, Type const& self) -> Env {
88 return make_env(empty_env, with(get_stop_token, self.state->stop_source.get_stop_token()));
89 }
90 };
91 };
92
93 template<typename SharedState>
95
96 template<typename Send, typename SenderAttr, typename Alloc>
97 struct SharedStateT {
100 using Tags =
104 using Storage =
111
113
114 template<typename A>
115 explicit Type(Send&& sender, A&& allocator_)
116 : sender_attr(get_env(sender))
117 , allocator(util::forward<A>(allocator_))
118 , operation(connect(util::forward<Send>(sender), SharedReceiver<Type>(this))) {}
119
120 template<typename... Args>
121 void complete_with(Args&&... args) {
122 storage.template emplace<meta::DecayedTuple<Args...>>(util::forward<Args>(args)...);
123
124 did_complete();
125 }
126
128 // Notify all pending operations that the operation has completed. We use ourselves as the sentinel
129 // value which indicates that the operation has completed, since we know that we are a valid pointer
130 // but are not an operation state. This could instead be done using a bitflag in the pointer, but this
131 // is simpler.
132 auto* operations = waiting.exchange(static_cast<void*>(this), sync::MemoryOrder::AcquireRelease);
133
134 // Now walk the linked list of pending operations and notify them that the operation has completed.
135 // There cannot be races here because we just stole the list of pending operations, so it is not visible
136 // to other threads.
137 while (operations) {
138 auto* operation = static_cast<OperationStateBase*>(operations);
139 operations = operation->next;
140 operation->did_complete(operation);
141 }
142 }
143
146 if (ref_count.fetch_sub(1, sync::MemoryOrder::Release) == 1) {
147 // Destroy the shared state when the reference count reaches 0. Since we store the allocator
148 // ourselves, we must first move it out of the operation state before destroying ourselves.
149 auto allocator = util::move(this->allocator);
150 auto* pointer = this;
151 util::destroy_at(pointer);
153 }
154 }
155
158 SenderAttr sender_attr;
161 [[no_unique_address]] Alloc allocator;
163 };
164 };
165
166 template<typename Send, typename SenderAttr, typename Alloc>
168
169 template<typename Send, typename Rec, typename SenderAttr, typename Alloc>
172 public:
174
175 explicit Type(State* state, Rec receiver)
176 : OperationStateBase([](OperationStateBase* void_self) {
177 auto& self = *static_cast<Type*>(void_self);
178
179 // Reset the stop callback.
180 self.m_stop_callback.reset();
181
182 // Forward the completion to the receiver.
183 auto& state = *self.m_state;
185 [&](auto const& tuple) {
187 [&](auto tag, auto const&... values) {
188 if constexpr (concepts::SameAs<SetValue, decltype(tag)>) {
189 set_value(util::move(self.m_receiver), values...);
190 } else if constexpr (concepts::SameAs<SetError, decltype(tag)>) {
191 set_error(util::move(self.m_receiver), values...);
192 } else if constexpr (concepts::SameAs<SetStopped, decltype(tag)>) {
193 set_stopped(util::move(self.m_receiver));
194 }
195 },
196 util::forward<decltype(tuple)>(tuple));
197 },
198 util::as_const(state.storage));
199 })
200 , m_state(state)
201 , m_receiver(util::forward<Rec>(receiver)) {}
202
203 ~Type() { m_state->drop_ref_count(); }
204
205 private:
206 friend void tag_invoke(Tag<start>, Type& self) {
207 auto& state = *self.m_state;
208
209 // Add ourselves to the list of pending operations. The basic idea is to fetch the old head of the list
210 // and set out next pointer to it. Then we try to CAS the old head to point to us. If this fails, we try
211 // again. Additionally, we stop trying if the old head is the sentinel value, which indicates that the
212 // operation has already completed, and just return after forwarding the values. We need to load the old
213 // value with acquire semantics to since we write the sentinel value with acquire-release semantics.
214 auto* old_head = state.waiting.load(sync::MemoryOrder::Acquire);
215 auto* sentinel = static_cast<void*>(self.m_state);
216 do {
217 if (old_head == sentinel) {
218 // The operation has already completed, so just return after forwarding the values.
219 self.did_complete(util::addressof(self));
220 return;
221 }
222
223 self.next = static_cast<OperationStateBase*>(old_head);
224 } while (!state.waiting.compare_exchange_weak(old_head, static_cast<void*>(util::addressof(self)),
226
227 // Emplace the stop callback.
228 self.m_stop_callback.emplace(execution::get_stop_token(execution::get_env(self.m_receiver)),
229 StopCallbackFunction { state.stop_source });
230
231 if (old_head == nullptr) {
232 // We were the first operation to be added to the list, so we must start the operation.
233 // If stop was requested, complete all operations with set stopped.
234 if (state.stop_source.stop_requested()) {
235 state.did_complete();
236 } else {
237 // Start the operation.
238 start(state.operation);
239 }
240 }
241 }
242
243 State* m_state;
244 Rec m_receiver;
245 vocab::Optional<typename meta::StopTokenOf<meta::EnvOf<Rec>>::template CallbackType<StopCallbackFunction>>
246 m_stop_callback;
247 };
248 };
249
250 template<typename Send, typename Rec, typename SenderAttr, typename Alloc>
252
253 template<typename Send, typename Alloc>
254 struct SenderT {
255 struct Type {
256 public:
257 using is_sender = void;
258
260
262
263 explicit Type(State* state) : m_state(state) { DI_ASSERT(state); }
264
265 Type(Type const& other) : m_state(other.m_state) {
266 if (m_state) {
267 m_state->bump_ref_count();
268 }
269 }
270 Type(Type&& other) : m_state(util::exchange(other.m_state, nullptr)) {}
271
273 auto state = util::exchange(m_state, nullptr);
274 if (state) {
275 state->drop_ref_count();
276 }
277 }
278
279 // NOLINTNEXTLINE(bugprone-unhandled-self-assignment,cert-oop54-cpp)
280 auto operator=(Type const& other) -> Type& {
281 if (m_state != other.m_state) {
282 auto state = util::exchange(m_state, other.m_state);
283 if (state) {
284 state->drop_ref_count();
285 }
286 if (m_state) {
287 m_state->bump_ref_count();
288 }
289 }
290 return *this;
291 }
292 auto operator=(Type&& other) -> Type& {
293 this->m_state = util::exchange(other.m_state, nullptr);
294 return *this;
295 }
296
297 private:
298 template<concepts::ReceiverOf<CompletionSignatures> Rec>
299 friend auto tag_invoke(Tag<connect>, Type const& self, Rec receiver) {
300 DI_ASSERT(self.m_state);
301 self.m_state->bump_ref_count();
302 return OperationState<Send, Rec, meta::EnvOf<Send>, Alloc>(self.m_state, util::move(receiver));
303 }
304
305 template<concepts::ReceiverOf<CompletionSignatures> Rec>
306 friend auto tag_invoke(Tag<connect>, Type&& self, Rec receiver) {
307 DI_ASSERT(self.m_state);
308 return OperationState<Send, Rec, meta::EnvOf<Send>, Alloc>(util::exchange(self.m_state, nullptr),
309 util::move(receiver));
310 }
311
312 friend auto tag_invoke(Tag<get_env>, Type const& self) -> meta::Decay<meta::EnvOf<Send>> const& {
313 DI_ASSERT(self.m_state);
314 return self.m_state->sender_attr;
315 }
316
317 State* m_state { nullptr };
318 };
319 };
320
321 template<typename Send, typename Alloc>
323
324 struct Function {
325 template<concepts::SenderIn<Env> Send, concepts::Allocator Alloc = platform::DefaultAllocator>
327 auto operator()(Send&& sender, Alloc&& allocator = {}) const {
328 if constexpr (requires {
330 util::forward<Send>(sender), util::forward<Alloc>(allocator));
331 }) {
333 util::forward<Send>(sender), util::forward<Alloc>(allocator));
334 } else if constexpr (requires {
335 tag_invoke(*this, util::forward<Send>(sender), util::forward<Alloc>(allocator));
336 }) {
337 return tag_invoke(*this, util::forward<Send>(sender), util::forward<Alloc>(allocator));
338 } else {
339 using State = SharedState<Send, meta::EnvOf<Send>, Alloc>;
340 return vocab::as_fallible(container::allocate_one<State>(allocator)) % [&](State* state) {
341 util::construct_at(state, util::forward<Send>(sender), util::forward<Alloc>(allocator));
342 return Sender<Send, Alloc>(state);
344 }
345 }
346 };
347}
348
362}
#define DI_ASSERT(...)
Definition assert_bool.h:7
Definition atomic.h:15
Definition in_place_stop_source.h:11
Definition in_place_stop_token.h:6
Definition optional_forward_declaration.h:5
Definition tuple_forward_declaration.h:5
Definition variant_forward_declaration.h:6
#define DI_IMMOVABLE_NO_UNIQUE_ADDRESS
Definition compiler.h:15
Definition core.h:114
constexpr auto allocate_one
Definition allocate_one.h:29
constexpr auto deallocate_one
Definition deallocate_one.h:27
Definition split.h:30
meta::Type< OperationStateT< Send, Rec, SenderAttr, Alloc > > OperationState
Definition split.h:251
MakeEnv< EmptyEnv, With< Tag< get_stop_token >, StopToken > > Env
Definition split.h:39
CompletionSignatures< SetError(meta::Decay< DecayedCLValue< E > >)> SigSetError
Definition split.h:45
meta::Type< SharedStateT< Send, meta::Decay< SenderAttr >, meta::Decay< Alloc > > > SharedState
Definition split.h:167
sync::InPlaceStopToken StopToken
Definition split.h:34
meta::Type< OperationStateBaseT > OperationStateBase
Definition split.h:66
meta::MakeCompletionSignatures< Send, Env, CompletionSignatures< SetStopped()>, SigSetValue, SigSetError > Sigs
Definition split.h:48
meta::Type< SenderT< Send, meta::Decay< Alloc > > > Sender
Definition split.h:322
CompletionSignatures< SetValue(meta::Decay< DecayedCLValue< Values > >...)> SigSetValue
Definition split.h:42
meta::Decay< T > const & DecayedCLValue
Definition split.h:37
meta::Type< SharedReceiverT< SharedState > > SharedReceiver
Definition split.h:94
sync::InPlaceStopSource StopSource
Definition split.h:33
Definition bulk.h:30
constexpr auto set_error
Definition set_error.h:14
constexpr auto start
Definition start.h:20
constexpr auto make_env
Create an environment with overrides for queries.
Definition make_env.h:147
constexpr auto set_stopped
Definition set_stopped.h:14
constexpr auto with
Specify an override for an environment query.
Definition make_env.h:112
constexpr auto split
Split a sender into a sender which sends the same value to multiple receivers.
Definition split.h:361
constexpr auto get_completion_scheduler
Definition get_completion_scheduler.h:19
constexpr auto get_stop_token
Definition get_stop_token.h:25
decltype(make_env(util::declval< BaseEnv >(), util::declval< Withs >()...)) MakeEnv
Represent an environment with overrides for queries.
Definition make_env.h:189
constexpr auto get_env
Definition get_env.h:27
constexpr auto connect
Definition connect.h:42
constexpr auto set_value
Definition set_value.h:14
constexpr auto curry_back
Definition curry_back.h:141
T::Type Type
Definition core.h:26
detail::ComposeHelper< Funs... > Compose
Definition function.h:99
Type< detail::AsTemplateHelper< Template, T > > AsTemplate
Definition algorithm.h:60
Conditional< concepts::LanguageArray< RemoveReference< T > >, RemoveExtent< RemoveReference< T > > *, Conditional< concepts::LanguageFunction< RemoveReference< T > >, AddPointer< RemoveReference< T > >, RemoveCVRef< T > > > Decay
Definition language.h:574
Concat< List< T >, L > PushFront
Definition algorithm.h:83
Type< detail::AsListHelper< T > > AsList
Definition algorithm.h:48
Fold< Lst, List<>, detail::PushBackIfUnique > Unique
Definition algorithm.h:203
vocab::Tuple< meta::Decay< Types >... > DecayedTuple
Definition decayed_tuple.h:8
Type< detail::MakeCompletionSignaturesHelper< ExtraSigs, meta::ValueTypesOf< Send, Env, SetValue, meta::List >, meta::Transform< meta::ErrorTypesOf< Send, Env, meta::List >, meta::Quote< SetError > >, meta::Conditional< meta::sends_stopped< Send, Env >, SetStopped, types::CompletionSignatures<> > > > MakeCompletionSignatures
Definition make_completion_signatures.h:36
detail::TransformHelper< List, Function >::Type Transform
Definition algorithm.h:186
decltype(execution::connect(util::declval< Send >(), util::declval< Rec >())) ConnectResult
Definition connect_result.h:7
@ Relaxed
Definition memory_order.h:7
@ AcquireRelease
Definition memory_order.h:11
@ Acquire
Definition memory_order.h:9
@ Release
Definition memory_order.h:10
di::meta::Decay< decltype(T)> Tag
Definition tag_invoke.h:28
Definition vocab.h:96
constexpr auto exchange(T &object, U &&new_value) -> T
Definition exchange.h:8
constexpr auto destroy_at
Definition destroy_at.h:24
constexpr auto construct_at
Definition construct_at.h:27
constexpr auto apply(F &&f, Tup &&tuple) -> decltype(detail::apply_impl(meta::MakeIndexSequence< meta::TupleSize< Tup > > {}, util::forward< F >(f), util::forward< Tup >(tuple)))
Definition apply.h:22
constexpr auto as_fallible
Definition as_fallible.h:26
constexpr auto try_infallible
Definition try_infallible.h:31
constexpr auto visit(Vis &&visitor, Vars &&... variants) -> R
Definition visit.h:39
constexpr tag_invoke_detail::TagInvokeFn tag_invoke
Definition tag_invoke.h:22
constexpr auto exchange(T &object, U &&new_value) -> T
Definition exchange.h:8
constexpr auto c_
A value of type Constexpr<val>.
Definition constexpr.h:252
constexpr auto empty_env
Definition empty_env.h:6
Definition set_error.h:6
Definition set_stopped.h:6
Definition set_value.h:6
Definition split.h:324
void(* did_complete)(Type *)
Definition split.h:61
Type(void(*did_complete_)(Type *))
Definition split.h:59
Type(State *state, Rec receiver)
Definition split.h:175
friend void tag_invoke(Tag< start >, Type &self)
Definition split.h:206
SharedState< Send, SenderAttr, Alloc > State
Definition split.h:173
Type(Type &&other)
Definition split.h:270
SharedState< Send, meta::EnvOf< Send >, Alloc > State
Definition split.h:261
void is_sender
Definition split.h:257
Type(Type const &other)
Definition split.h:265
auto operator=(Type &&other) -> Type &
Definition split.h:292
auto operator=(Type const &other) -> Type &
Definition split.h:280
Sigs< Send > CompletionSignatures
Definition split.h:259
friend auto tag_invoke(Tag< connect >, Type &&self, Rec receiver)
Definition split.h:306
friend auto tag_invoke(Tag< get_env >, Type const &self) -> meta::Decay< meta::EnvOf< Send > > const &
Definition split.h:312
friend auto tag_invoke(Tag< connect >, Type const &self, Rec receiver)
Definition split.h:299
Type(State *state)
Definition split.h:263
Definition split.h:254
friend void tag_invoke(Tag< set_error >, Type &&self, E &&error)
Definition split.h:81
friend void tag_invoke(Tag< set_stopped >, Type &&self)
Definition split.h:85
SharedState * state
Definition split.h:73
friend auto tag_invoke(Tag< get_env >, Type const &self) -> Env
Definition split.h:87
friend void tag_invoke(Tag< set_value >, Type &&self, Args &&... values)
Definition split.h:76
Alloc allocator
Definition split.h:161
void drop_ref_count()
Definition split.h:145
StopSource stop_source
Definition split.h:157
SenderAttr sender_attr
Definition split.h:158
void did_complete()
Definition split.h:127
void bump_ref_count()
Definition split.h:144
meta::Transform< Completions, meta::Quote< meta::AsList > > DecayedArgs
Definition split.h:103
meta::AsTemplate< vocab::Variant, meta::Unique< meta::PushFront< meta::Transform< meta::Zip< Tags, DecayedArgs >, meta::Compose< meta::Uncurry< meta::Quote< meta::DecayedTuple > >, meta::Quote< meta::Join > > >, vocab::Tuple< SetStopped > > > > Storage
Definition split.h:104
meta::Transform< Completions, meta::Compose< meta::Quote< meta::List >, meta::Quote< meta::LanguageFunctionReturn > > > Tags
Definition split.h:100
void complete_with(Args &&... args)
Definition split.h:121
DI_IMMOVABLE_NO_UNIQUE_ADDRESS Op operation
Definition split.h:162
meta::AsList< meta::CompletionSignaturesOf< Send, Env > > Completions
Definition split.h:99
Storage storage
Definition split.h:156
sync::Atomic< void * > waiting
Definition split.h:160
sync::Atomic< usize > ref_count
Definition split.h:159
meta::ConnectResult< Send, SharedReceiver< Type > > Op
Definition split.h:112
Type(Send &&sender, A &&allocator_)
Definition split.h:115
StopSource & stop_source
Definition split.h:52
void operator()() const noexcept
Definition split.h:54
Definition function.h:30
Definition completion_signuatures.h:7
Definition immovable.h:4