Iros
 
Loading...
Searching...
No Matches
io_context.h
Go to the documentation of this file.
1#pragma once
2
16#include "di/function/prelude.h"
18#include "di/sync/prelude.h"
19#include "di/util/prelude.h"
21#include "dius/config.h"
22#include "dius/sync_file.h"
23
24#ifdef DIUS_PLATFORM_LINUX
26#endif
27
28namespace dius {
29#ifdef DIUS_PLATFORM_LINUX
31#else
32namespace execution = di::execution;
33
34class IoContext {
35private:
36 struct OperationStateBase : di::IntrusiveForwardListNode<> {
37 public:
38 OperationStateBase(IoContext* parent_) : parent(parent_) {}
39
40 virtual void execute() = 0;
41
42 IoContext* parent { nullptr };
43 };
44
45 struct Env {
46 IoContext* parent;
47
48 template<typename CPO>
49 constexpr friend auto tag_invoke(execution::GetCompletionScheduler<CPO>, Env const& self) {
50 return self.parent->get_scheduler();
51 }
52 };
53
54 template<typename Receiver>
55 struct OperationStateT {
56 struct Type : OperationStateBase {
57 public:
58 Type(IoContext* parent, Receiver&& receiver) : OperationStateBase(parent), m_receiver(di::move(receiver)) {}
59
60 void execute() override {
61 if (execution::get_stop_token(m_receiver).stop_requested()) {
62 execution::set_stopped(di::move(m_receiver));
63 } else {
64 execution::set_value(di::move(m_receiver));
65 }
66 }
67
68 private:
69 void do_start() { this->parent->push_back(this); }
70
71 friend void tag_invoke(di::Tag<execution::start>, Type& self) { self.do_start(); }
72
73 [[no_unique_address]] Receiver m_receiver;
74 };
75 };
76
77 template<typename Receiver>
78 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
79
80 struct Scheduler {
81 private:
82 struct Sender {
83 using is_sender = void;
84
85 using CompletionSignatures = di::CompletionSignatures<di::SetValue(), di::SetStopped()>;
86
87 IoContext* parent;
88
89 private:
90 template<di::ReceiverOf<CompletionSignatures> Receiver>
91 friend auto tag_invoke(di::Tag<execution::connect>, Sender self, Receiver receiver) {
92 return OperationState<Receiver> { self.parent, di::move(receiver) };
93 }
94
95 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, Sender const& self) {
96 return Env { self.parent };
97 }
98 };
99
100 struct AsyncFile : di::Immovable {
101 public:
102 explicit AsyncFile(IoContext* parent, di::PathView path, OpenMode mode, u16 create_mode)
103 : m_parent(parent), m_path(path), m_mode(mode), m_create_mode(create_mode) {}
104
105 private:
106 struct ReadSomeSender {
107 public:
108 using is_sender = void;
109
110 using CompletionSignatures = di::CompletionSignatures<di::SetValue(size_t), di::SetError(di::Error)>;
111
112 IoContext* parent;
113 int file_descriptor;
114 di::Span<di::Byte> buffer;
115 di::Optional<u64> offset;
116
117 private:
118 template<typename Rec>
119 struct OperationStateT {
120 struct Type {
125 [[no_unique_address]] Rec receiver;
126
127 private:
129 auto sync_file = SyncFile(SyncFile::Owned::No, self.file_descriptor);
130 auto result = self.offset ? sync_file.read_some(self.offset.value(), self.buffer)
131 : sync_file.read_some(self.buffer);
132 if (!result.has_value()) {
133 execution::set_error(di::move(self.receiver), di::Error(di::move(result).error()));
134 } else {
135 execution::set_value(di::move(self.receiver), di::move(result).value());
136 }
137 }
138 };
139 };
140
141 template<di::ReceiverOf<CompletionSignatures> Receiver>
142 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
143
144 template<di::ReceiverOf<CompletionSignatures> Receiver>
145 friend auto tag_invoke(di::Tag<execution::connect>, ReadSomeSender self, Receiver receiver) {
146 return OperationState<Receiver> { self.parent, self.file_descriptor, self.buffer, self.offset,
147 di::move(receiver) };
148 }
149
150 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, ReadSomeSender const& self) {
151 return Env { self.parent };
152 }
153 };
154
155 struct WriteSomeSender {
156 public:
157 using is_sender = void;
158
159 using CompletionSignatures = di::CompletionSignatures<di::SetValue(size_t), di::SetError(di::Error)>;
160
161 IoContext* parent;
162 int file_descriptor;
163 di::Span<di::Byte const> buffer;
164 di::Optional<u64> offset;
165
166 private:
167 template<typename Rec>
168 struct OperationStateT {
169 struct Type {
174 [[no_unique_address]] Rec receiver;
175
176 private:
178 auto sync_file = SyncFile(SyncFile::Owned::No, self.file_descriptor);
179 auto result = self.offset ? sync_file.write_some(self.offset.value(), self.buffer)
180 : sync_file.write_some(self.buffer);
181 if (!result) {
182 execution::set_error(di::move(self.receiver), di::Error(di::move(result).error()));
183 } else {
184 execution::set_value(di::move(self.receiver), di::move(result).value());
185 }
186 }
187 };
188 };
189
190 template<di::ReceiverOf<CompletionSignatures> Receiver>
191 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
192
193 template<di::ReceiverOf<CompletionSignatures> Receiver>
194 friend auto tag_invoke(di::Tag<execution::connect>, WriteSomeSender self, Receiver receiver) {
195 return OperationState<Receiver> { self.parent, self.file_descriptor, self.buffer, self.offset,
196 di::move(receiver) };
197 }
198
199 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, WriteSomeSender const& self) {
200 return Env { self.parent };
201 }
202 };
203
204 struct OpenSender {
205 public:
206 using is_sender = void;
207
208 using CompletionSignatures =
209 di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<AsyncFile>), di::SetError(di::Error)>;
210
211 IoContext* parent;
212 di::ReferenceWrapper<AsyncFile> file;
213
214 private:
215 template<typename Rec>
216 struct OperationStateT {
217 struct Type {
220 [[no_unique_address]] Rec receiver;
221
222 private:
224 auto result = open_sync(self.file.get().m_path, self.file.get().m_mode,
225 self.file.get().m_create_mode);
226 if (!result) {
227 execution::set_error(di::move(self.receiver), di::Error(di::move(result).error()));
228 } else {
229 self.file.get().m_file = di::move(result).value();
230 execution::set_value(di::move(self.receiver), self.file);
231 }
232 }
233 };
234 };
235
236 template<di::ReceiverOf<CompletionSignatures> Receiver>
237 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
238
239 template<di::ReceiverOf<CompletionSignatures> Receiver>
240 friend auto tag_invoke(di::Tag<execution::connect>, OpenSender self, Receiver receiver) {
241 return OperationState<Receiver> { self.parent, self.file, di::move(receiver) };
242 }
243
244 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, OpenSender const& self) {
245 return Env { self.parent };
246 }
247 };
248
249 struct RunSender {
250 public:
251 using is_sender = di::SequenceTag;
252
253 using CompletionSignatures =
254 di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<AsyncFile>), di::SetError(di::Error)>;
255
256 IoContext* parent;
257 di::ReferenceWrapper<AsyncFile> file;
258
259 private:
260 template<typename Rec>
261 struct OperationStateT {
262 struct Type {
263 struct Receiver {
264 using is_receiver = void;
265
267
269 self.parent->close();
270 }
272 self.parent->close();
273 }
274 };
275
278
281 [[no_unique_address]] Rec receiver;
283
284 void close() {
285 auto result = file.get().m_file.close();
286 if (!result) {
287 execution::set_error(di::move(receiver), di::Error(di::move(result).error()));
288 } else {
290 }
291 }
292
293 private:
295 self.op.emplace(di::DeferConstruct([&] {
296 return execution::connect(
297 execution::set_next(self.receiver, OpenSender(self.parent, self.file)),
298 Receiver(&self));
299 }));
300 execution::start(*self.op);
301 }
302 };
303 };
304
305 template<typename Receiver>
306 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
307
308 template<di::concepts::Receiver Receiver>
309 friend auto tag_invoke(di::Tag<execution::subscribe>, RunSender self, Receiver receiver) {
310 return OperationState<Receiver> { self.parent, self.file, di::move(receiver), di::nullopt };
311 }
312
313 friend auto tag_invoke(di::Tag<execution::get_env>, RunSender const& self) {
314 return execution::make_env(Env(self.parent),
316 }
317 };
318
319 friend auto tag_invoke(di::Tag<di::execution::async_read_some>, AsyncFile& self, di::Span<di::Byte> buffer,
320 di::Optional<u64> offset) {
321 return ReadSomeSender { self.m_parent, self.m_file.file_descriptor(), buffer, offset };
322 }
323
324 friend auto tag_invoke(di::Tag<di::execution::async_write_some>, AsyncFile& self,
325 di::Span<di::Byte const> buffer, di::Optional<u64> offset) {
326 return WriteSomeSender { self.m_parent, self.m_file.file_descriptor(), buffer, offset };
327 }
328
329 friend auto tag_invoke(di::Tag<di::execution::run>, AsyncFile& self) {
330 return RunSender(self.m_parent, di::ref(self));
331 }
332
333 IoContext* m_parent;
334 di::PathView m_path;
335 OpenMode m_mode;
336 u16 m_create_mode;
337 SyncFile m_file;
338 };
339
340 public:
341 IoContext* parent { nullptr };
342
343 private:
344 friend auto tag_invoke(di::Tag<execution::schedule>, Scheduler const& self) { return Sender { self.parent }; }
345
346 friend auto tag_invoke(di::Tag<execution::async_open>, Scheduler const& self, di::PathView path, OpenMode mode,
347 u16 create_mode = 0666) {
348 return di::make_deferred<AsyncFile>(self.parent, path, mode, create_mode);
349 }
350
351 constexpr friend auto operator==(Scheduler const&, Scheduler const&) -> bool = default;
352 };
353
354 struct State {
355 State() {} // NOLINT(modernize-use-equals-default)
356
357 di::Queue<OperationStateBase, di::IntrusiveForwardList<OperationStateBase>> queue;
358 bool stopped { false };
359 };
360
361 auto state() -> di::Synchronized<State>& { return m_state.value(); }
362
363public:
364 static auto create() -> di::Result<IoContext> { return IoContext {}; }
365
366 IoContext(IoContext&&) = default;
367
368 auto get_scheduler() -> Scheduler { return Scheduler { this }; }
369
370 void run() {
371 while (auto* operation = pop_front()) {
372 operation->execute();
373 }
374 }
375
376 void finish() {
377 state().with_lock([](State& state) {
378 state.stopped = true;
379 });
380 }
381
382private:
383 IoContext() = default;
384
385 auto pop_front() -> OperationStateBase* {
386 // FIXME: block instead of busy polling the queue when it is empty.
387 for (;;) {
388 auto [operation, is_stopped] = state().with_lock([](State& state) -> di::Tuple<OperationStateBase*, bool> {
389 if (!state.queue.empty()) {
390 return di::make_tuple(di::addressof(*state.queue.pop()), false);
391 }
392 if (state.stopped) {
393 return di::make_tuple(nullptr, true);
394 }
395 return di::make_tuple(nullptr, false);
396 });
397
398 if (is_stopped) {
399 return nullptr;
400 }
401 if (operation) {
402 return operation;
403 }
404 }
405 }
406
407 void push_back(OperationStateBase* operation) {
408 state().with_lock([&](State& state) {
409 state.queue.push(*operation);
410 });
411 }
412
413 di::MovableBox<di::Synchronized<State>> m_state;
414};
415#endif
416}
Definition forward_list_node.h:9
Definition defer_construct.h:8
Definition reference_wrapper.h:14
Definition optional_forward_declaration.h:5
Definition span_forward_declaration.h:10
Definition tuple_forward_declaration.h:5
Definition io_context.h:34
void run()
Definition io_context.h:370
void finish()
Definition io_context.h:376
static auto create() -> di::Result< IoContext >
Definition io_context.h:364
IoContext(IoContext &&)=default
auto get_scheduler() -> Scheduler
Definition io_context.h:368
Definition sync_file.h:40
@ No
Definition sync_file.h:42
#define DI_IMMOVABLE_NO_UNIQUE_ADDRESS
Definition compiler.h:15
PathViewImpl< string::TransparentEncoding > PathView
Definition path_view.h:11
MakeEnv< EmptyEnv, With< Tag< get_stop_token >, StopToken > > Env
Definition ensure_started.h:52
Definition bulk.h:30
constexpr auto set_error
Definition set_error.h:14
constexpr auto set_next
Set the next sender of a sequence.
Definition sequence_sender.h:77
constexpr auto start
Definition start.h:20
constexpr auto make_env
Create an environment with overrides for queries.
Definition make_env.h:147
constexpr auto set_stopped
Definition set_stopped.h:14
constexpr auto get_sequence_cardinality
A query that returns the cardinality of a sequence.
Definition get_sequence_cardinality.h:45
constexpr auto with
Specify an override for an environment query.
Definition make_env.h:112
constexpr auto get_stop_token
Definition get_stop_token.h:25
constexpr auto connect
Definition connect.h:42
constexpr auto set_value
Definition set_value.h:14
T::Type Type
Definition core.h:26
decltype(execution::set_next(util::declval< meta::RemoveCVRef< Rec > & >(), util::declval< Send >())) NextSenderOf
Definition sequence_sender.h:82
decltype(execution::connect(util::declval< Send >(), util::declval< Rec >())) ConnectResult
Definition connect_result.h:7
di::meta::Decay< decltype(T)> Tag
Definition tag_invoke.h:28
__UINT16_TYPE__ u16
Definition integers.h:10
Expected< T, Error > Result
Definition result.h:8
StatusCode< Erased< long > > Error
Definition error.h:8
Definition zstring_parser.h:9
constexpr auto make_deferred
Creates a deferred function object.
Definition make_deferred.h:75
constexpr tag_invoke_detail::TagInvokeFn tag_invoke
Definition tag_invoke.h:22
constexpr auto ref
Definition reference_wrapper.h:98
constexpr auto nullopt
Definition nullopt.h:15
constexpr auto make_tuple(Args &&... args)
Definition make_tuple.h:9
constexpr auto c_
A value of type Constexpr<val>.
Definition constexpr.h:252
Definition directory_entry.h:11
auto open_sync(di::PathView path, OpenMode open_mode, u16 create_mode=0666) -> di::Expected< SyncFile, di::GenericCode >
Definition sync_file.cpp:76
OpenMode
Definition sync_file.h:109
Defines the sequence sender concepts and related CPOs.
Definition get_completion_scheduler.h:10
Definition set_stopped.h:6
Definition set_value.h:6
Definition completion_signuatures.h:7
Type(IoContext *parent, Receiver &&receiver)
Definition io_context.h:58
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:71
void execute() override
Definition io_context.h:60
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:223
di::ReferenceWrapper< AsyncFile > file
Definition io_context.h:219
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:128
friend void tag_invoke(di::Tag< execution::set_stopped >, Receiver &&self)
Definition io_context.h:271
friend void tag_invoke(di::Tag< execution::set_value >, Receiver &&self)
Definition io_context.h:268
DI_IMMOVABLE_NO_UNIQUE_ADDRESS di::Optional< Op > op
Definition io_context.h:282
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:294
di::ReferenceWrapper< AsyncFile > file
Definition io_context.h:280
di::meta::NextSenderOf< Rec, OpenSender > Next
Definition io_context.h:276
di::meta::ConnectResult< Next, Receiver > Op
Definition io_context.h:277
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:177
di::Span< di::Byte const > buffer
Definition io_context.h:172
Definition io_uring_context.h:63