28 struct OperationStateBase : di::IntrusiveForwardListNode<> {
30 OperationStateBase(
IoContext* parent_) : parent(parent_) {}
32 virtual void execute() = 0;
40 template<
typename CPO>
41 constexpr friend auto tag_invoke(execution::GetCompletionScheduler<CPO>, Env
const& self) {
46 template<
typename Receiver>
47 struct OperationStateT {
48 struct Type : OperationStateBase {
50 Type(
IoContext* parent, Receiver&& receiver) : OperationStateBase(parent), m_receiver(
di::move(receiver)) {}
53 if (execution::get_stop_token(m_receiver).stop_requested()) {
54 execution::set_stopped(di::move(m_receiver));
56 execution::set_value(di::move(m_receiver));
61 void do_start() { this->parent->push_back(
this); }
63 friend void tag_invoke(di::Tag<execution::start>,
Type& self) { self.do_start(); }
65 [[no_unique_address]] Receiver m_receiver;
69 template<
typename Receiver>
70 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
75 using is_sender = void;
77 using CompletionSignatures = di::CompletionSignatures<di::SetValue(), di::SetStopped()>;
82 template<di::ReceiverOf<CompletionSignatures> Receiver>
83 friend auto tag_invoke(di::Tag<execution::connect>, Sender self, Receiver receiver) {
84 return OperationState<Receiver> { self.parent, di::move(receiver) };
87 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, Sender
const& self) {
88 return Env { self.parent };
92 struct AsyncFile : di::Immovable {
94 explicit AsyncFile(IoContext* parent, di::PathView path, OpenMode mode, u16 create_mode)
95 : m_parent(parent), m_path(path), m_mode(mode), m_create_mode(create_mode) {}
98 struct ReadSomeSender {
100 using is_sender = void;
102 using CompletionSignatures = di::CompletionSignatures<di::SetValue(
size_t), di::SetError(di::Error)>;
106 di::Span<di::Byte> buffer;
107 di::Optional<u64> offset;
110 template<
typename Rec>
111 struct OperationStateT {
123 : sync_file.read_some(self.
buffer);
124 if (!result.has_value()) {
125 execution::set_error(di::move(self.
receiver), di::Error(di::move(result).error()));
127 execution::set_value(di::move(self.
receiver), di::move(result).value());
133 template<di::ReceiverOf<CompletionSignatures> Receiver>
134 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
136 template<di::ReceiverOf<CompletionSignatures> Receiver>
137 friend auto tag_invoke(di::Tag<execution::connect>, ReadSomeSender self, Receiver receiver) {
138 return OperationState<Receiver> { self.parent, self.file_descriptor, self.buffer, self.offset,
139 di::move(receiver) };
142 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, ReadSomeSender
const& self) {
143 return Env { self.parent };
147 struct WriteSomeSender {
149 using is_sender = void;
151 using CompletionSignatures = di::CompletionSignatures<di::SetValue(
size_t), di::SetError(di::Error)>;
155 di::Span<di::Byte const> buffer;
156 di::Optional<u64> offset;
159 template<
typename Rec>
160 struct OperationStateT {
172 : sync_file.write_some(self.
buffer);
174 execution::set_error(di::move(self.
receiver), di::Error(di::move(result).error()));
176 execution::set_value(di::move(self.
receiver), di::move(result).value());
182 template<di::ReceiverOf<CompletionSignatures> Receiver>
183 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
185 template<di::ReceiverOf<CompletionSignatures> Receiver>
186 friend auto tag_invoke(di::Tag<execution::connect>, WriteSomeSender self, Receiver receiver) {
187 return OperationState<Receiver> { self.parent, self.file_descriptor, self.buffer, self.offset,
188 di::move(receiver) };
191 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, WriteSomeSender
const& self) {
192 return Env { self.parent };
198 using is_sender = void;
200 using CompletionSignatures =
201 di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<AsyncFile>), di::SetError(di::Error)>;
204 di::ReferenceWrapper<AsyncFile> file;
207 template<
typename Rec>
208 struct OperationStateT {
211 di::ReferenceWrapper<AsyncFile>
file;
217 self.
file.get().m_create_mode);
219 execution::set_error(di::move(self.
receiver), di::Error(di::move(result).error()));
221 self.
file.get().m_file = di::move(result).value();
222 execution::set_value(di::move(self.
receiver), self.
file);
228 template<di::ReceiverOf<CompletionSignatures> Receiver>
229 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
231 template<di::ReceiverOf<CompletionSignatures> Receiver>
232 friend auto tag_invoke(di::Tag<execution::connect>, OpenSender self, Receiver receiver) {
233 return OperationState<Receiver> { self.parent, self.file, di::move(receiver) };
236 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, OpenSender
const& self) {
237 return Env { self.parent };
243 using is_sender = di::SequenceTag;
245 using CompletionSignatures =
246 di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<AsyncFile>), di::SetError(di::Error)>;
249 di::ReferenceWrapper<AsyncFile> file;
252 template<
typename Rec>
253 struct OperationStateT {
261 self.parent->close();
264 self.parent->close();
268 using Next = di::meta::NextSenderOf<Rec, OpenSender>;
269 using Op = di::meta::ConnectResult<Next, Receiver>;
272 di::ReferenceWrapper<AsyncFile>
file;
274 DI_IMMOVABLE_NO_UNIQUE_ADDRESS di::Optional<Op>
op;
277 auto result =
file.get().m_file.close();
279 execution::set_error(di::move(
receiver), di::Error(di::move(result).error()));
281 execution::set_value(di::move(
receiver));
287 self.
op.emplace(di::DeferConstruct([&] {
288 return execution::connect(
292 execution::start(*self.
op);
297 template<
typename Receiver>
298 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
300 template<di::concepts::Receiver Receiver>
301 friend auto tag_invoke(di::Tag<execution::subscribe>, RunSender self, Receiver receiver) {
302 return OperationState<Receiver> { self.parent, self.file, di::move(receiver), di::nullopt };
305 friend auto tag_invoke(di::Tag<execution::get_env>, RunSender
const& self) {
306 return execution::make_env(Env(self.parent),
307 execution::with(execution::get_sequence_cardinality, di::c_<1ZU>));
311 friend auto tag_invoke(di::Tag<di::execution::async_read_some>, AsyncFile& self, di::Span<di::Byte> buffer,
312 di::Optional<u64> offset) {
313 return ReadSomeSender { self.m_parent, self.m_file.file_descriptor(), buffer, offset };
316 friend auto tag_invoke(di::Tag<di::execution::async_write_some>, AsyncFile& self,
317 di::Span<di::Byte const> buffer, di::Optional<u64> offset) {
318 return WriteSomeSender { self.m_parent, self.m_file.file_descriptor(), buffer, offset };
321 friend auto tag_invoke(di::Tag<di::execution::run>, AsyncFile& self) {
322 return RunSender(self.m_parent, di::ref(self));
328 [[maybe_unused]] u16 m_create_mode;
336 friend auto tag_invoke(di::Tag<execution::schedule>, Scheduler
const& self) {
return Sender { self.parent }; }
338 friend auto tag_invoke(di::Tag<execution::async_open>, Scheduler
const& self, di::PathView path, OpenMode mode,
339 u16 create_mode = 0666) {
340 return di::make_deferred<AsyncFile>(self.parent, path, mode, create_mode);
343 constexpr friend auto operator==(Scheduler
const&, Scheduler
const&) ->
bool =
default;
349 di::Queue<OperationStateBase, di::IntrusiveForwardList<OperationStateBase>> queue;
350 bool stopped {
false };
353 auto state() -> di::Synchronized<State>& {
return m_state.value(); }
363 while (
auto* operation = pop_front()) {
364 operation->execute();
369 state().with_lock([](State& state) {
370 state.stopped =
true;
377 auto pop_front() -> OperationStateBase* {
380 auto [operation, is_stopped] = state().with_lock([](State& state) -> di::Tuple<OperationStateBase*, bool> {
381 if (!state.queue.empty()) {
382 return di::make_tuple(di::addressof(*state.queue.pop()),
false);
385 return di::make_tuple(
nullptr,
true);
387 return di::make_tuple(
nullptr,
false);
399 void push_back(OperationStateBase* operation) {
400 state().with_lock([&](State& state) {
401 state.queue.push(*operation);
405 di::MovableBox<di::Synchronized<State>> m_state;
static auto create() -> di::Result< IoContext >
Definition io_context.h:356