Iros
 
Loading...
Searching...
No Matches
connect_awaitable.h
Go to the documentation of this file.
1#pragma once
2
11#include "di/function/invoke.h"
12#include "di/platform/prelude.h"
13#include "di/util/addressof.h"
14#include "di/util/exchange.h"
15#include "di/util/immovable.h"
16#include "di/util/move.h"
17#include "di/util/std_new.h"
18#include "di/util/unreachable.h"
21
22namespace di::execution {
23namespace as_awaitable_ns {
24 struct Function;
25}
26
28 template<typename Receiver>
30 struct AllocFailed {};
31
33 private:
34 struct Promise : WithAwaitTransform<Promise> {
35 Receiver& receiver;
36
37 explicit Promise(auto&, Receiver& receiver_) : receiver(receiver_) {}
38
39 auto get_return_object() { return Type { CoroutineHandle<Promise>::from_promise(*this) }; }
40
41 auto operator new(usize size) noexcept -> void* { return ::operator new(size, std::nothrow); }
42
43 void operator delete(void* ptr, usize size) noexcept { ::operator delete(ptr, size); }
44
45 static auto get_return_object_on_allocation_failure() { return Type { AllocFailed {} }; }
46
47 template<typename Fn>
48 auto yield_value(Fn&& function) noexcept {
49 struct Awaiter {
50 Fn&& function;
51
52 auto await_ready() noexcept -> bool { return false; }
53 void await_suspend(CoroutineHandle<>) { function::invoke(util::forward<Fn>(function)); }
54 void await_resume() { util::unreachable(); }
55 };
56
57 return Awaiter { util::forward<Fn>(function) };
58 }
59
60 auto unhandled_error(vocab::Error error) -> CoroutineHandle<> {
61 set_error(util::move(receiver), util::move(error));
62 return noop_coroutine();
63 }
64
65 auto unhandled_stopped() -> CoroutineHandle<> {
66 set_stopped(util::move(receiver));
67 return noop_coroutine();
68 }
69
70 auto initial_suspend() noexcept -> SuspendAlways { return {}; }
71 auto final_suspend() noexcept -> SuspendAlways { util::unreachable(); }
72 void return_void() noexcept { util::unreachable(); }
73 void unhandled_exception() noexcept { util::unreachable(); }
74
75 private:
76 friend auto tag_invoke(types::Tag<get_env>, Promise const& self) -> decltype(auto) {
77 return get_env(self.receiver);
78 }
79 };
80
81 public:
82 using promise_type = Promise;
83
84 Type(Type&& other) : m_coroutine(util::exchange(other.m_coroutine, {})) {}
85
87 if (m_coroutine) {
88 m_coroutine.destroy();
89 }
90 }
91
92 void set_receiver(Receiver&& receiver) { m_receiver = util::move(receiver); }
93
94 auto allocation_failed() const -> bool { return !m_coroutine; }
95
96 private:
97 explicit Type(CoroutineHandle<> coroutine) : m_coroutine(coroutine) {}
98 explicit Type(AllocFailed) {}
99
100 friend void tag_invoke(types::Tag<start>, Type& self) {
101 if (!self.m_coroutine) {
102 DI_ASSERT(self.m_receiver);
103 execution::set_error(util::move(*self.m_receiver), vocab::Error(BasicError::NotEnoughMemory));
104 } else {
105 self.m_coroutine.resume();
106 }
107 }
108
109 CoroutineHandle<> m_coroutine;
110 vocab::Optional<Receiver> m_receiver;
111 };
112 };
113
114 template<typename Receiver>
116
117 template<typename Receiver>
119
120 template<typename Awaitable, typename Receiver, typename Result = meta::AwaitResult<Awaitable, Promise<Receiver>>>
122 : meta::TypeConstant<types::CompletionSignatures<SetValue(Result), SetError(vocab::Error), SetStopped()>> {};
123
124 template<typename Awaitable, typename Receiver, typename Result>
128
129 struct Funciton {
130 template<concepts::Receiver Receiver, concepts::IsAwaitable<Promise<Receiver>> Awaitable>
132 auto operator()(Awaitable&& awaitable, Receiver receiver) const {
133 auto result = impl(util::forward<Awaitable>(awaitable), util::move(receiver));
134 if (result.allocation_failed()) {
135 // Since the allocation failed, the receiver was never move constructued into the operation state.
136 result.set_receiver(util::move(receiver)); // NOLINT(bugprone-use-after-move)
137 }
138 return result;
139 }
140
141 private:
142#pragma GCC diagnostic push
143#ifdef DI_GCC
144#pragma GCC diagnostic ignored "-Wsubobject-linkage"
145#endif
146 template<typename Awaitable, typename Receiver>
147 static auto impl(Awaitable awaitable, Receiver receiver) -> OperationState<Receiver> {
149
150 // Connecting any awaitable with a receiver is a matter of returning an operation
151 // state, which, once started, enters coroutine context, calls co_await on the awaitable,
152 // suspends said coroutine, and finally calls the receiver's completion with the reuslt
153 // of co_await.
154
155 // To do so, the OperationState is a coroutine (has a promise type), whose start operation
156 // resumes the coroutine. To forcefully suspend the coroutine, we co_yield a lambda expression,
157 // which both suspends the coroutine, but also executes the lambda once the suspension occurs.
158 if constexpr (concepts::LanguageVoid<Result>) {
159 co_await util::move(awaitable);
160 co_yield [&] {
161 set_value(util::move(receiver));
162 };
163 } else {
164 auto&& value = co_await util::move(awaitable);
165 co_yield [&] {
166 set_value(util::move(receiver), util::forward<decltype(value)>(value));
167 };
168 }
169 }
170#pragma GCC diagnostic pop
171 };
172
173 constexpr inline auto connect_awaitable = Funciton {};
174}
175}
#define DI_ASSERT(...)
Definition assert_bool.h:7
Definition optional_forward_declaration.h:5
Definition core.h:128
Definition receiver_of.h:25
Definition awaitable_sender.h:7
Definition connect_awaitable.h:27
constexpr auto connect_awaitable
Definition connect_awaitable.h:173
OperationState< Receiver >::promise_type Promise
Definition connect_awaitable.h:118
meta::Type< OperationStateT< Receiver > > OperationState
Definition connect_awaitable.h:115
Definition bulk.h:30
constexpr auto set_error
Definition set_error.h:14
constexpr auto set_stopped
Definition set_stopped.h:14
constexpr auto get_env
Definition get_env.h:27
constexpr auto set_value
Definition set_value.h:14
Definition as_bool.h:8
constexpr auto invoke
Definition invoke.h:100
Definition merge_interfaces.h:6
T::Type Type
Definition core.h:26
decltype(concepts::detail::get_awaiter(util::declval< Awaitable >(), util::declval< Promise * >()).await_resume()) AwaitResult
Definition await_result.h:9
Definition method.h:5
size_t usize
Definition integers.h:33
di::meta::Decay< decltype(T)> Tag
Definition tag_invoke.h:28
Definition vocab.h:96
void unreachable()
Definition unreachable.h:4
Definition lazy.h:165
Expected< T, Error > Result
Definition result.h:8
StatusCode< Erased< long > > Error
Definition error.h:8
constexpr tag_invoke_detail::TagInvokeFn tag_invoke
Definition tag_invoke.h:22
constexpr auto exchange(T &object, U &&new_value) -> T
Definition exchange.h:8
std::coroutine_handle< Promise > CoroutineHandle
Definition coroutine.h:164
constexpr auto size
Definition size.h:54
std::suspend_always SuspendAlways
Definition coroutine.h:169
Definition set_error.h:6
Definition set_stopped.h:6
Definition set_value.h:6
Definition with_await_transform.h:14
Definition as_awaitable.h:86
Definition connect_awaitable.h:129
auto operator()(Awaitable &&awaitable, Receiver receiver) const
Definition connect_awaitable.h:132
friend void tag_invoke(types::Tag< start >, Type &self)
Definition connect_awaitable.h:100
auto allocation_failed() const -> bool
Definition connect_awaitable.h:94
Type(Type &&other)
Definition connect_awaitable.h:84
Promise promise_type
Definition connect_awaitable.h:82
void set_receiver(Receiver &&receiver)
Definition connect_awaitable.h:92
Definition core.h:18
Definition immovable.h:4