dius 0.1.0
Loading...
Searching...
No Matches
io_context.h
Go to the documentation of this file.
1#pragma once
2
3#include "di/container/intrusive/prelude.h"
4#include "di/execution/algorithm/just.h"
5#include "di/execution/concepts/receiver.h"
6#include "di/execution/interface/connect.h"
7#include "di/execution/interface/get_env.h"
8#include "di/execution/interface/run.h"
9#include "di/execution/meta/connect_result.h"
10#include "di/execution/prelude.h"
11#include "di/execution/query/get_sequence_cardinality.h"
12#include "di/execution/query/make_env.h"
13#include "di/execution/receiver/set_value.h"
14#include "di/execution/sequence/sequence_sender.h"
15#include "di/function/make_deferred.h"
16#include "di/function/prelude.h"
17#include "di/platform/compiler.h"
18#include "di/sync/prelude.h"
19#include "di/util/prelude.h"
20#include "di/vocab/optional/prelude.h"
21#include "dius/sync_file.h"
22
23namespace dius {
24namespace execution = di::execution;
25
26class IoContext {
27private:
28 struct OperationStateBase : di::IntrusiveForwardListNode<> {
29 public:
30 OperationStateBase(IoContext* parent_) : parent(parent_) {}
31
32 virtual void execute() = 0;
33
34 IoContext* parent { nullptr };
35 };
36
37 struct Env {
38 IoContext* parent;
39
40 template<typename CPO>
41 constexpr friend auto tag_invoke(execution::GetCompletionScheduler<CPO>, Env const& self) {
42 return self.parent->get_scheduler();
43 }
44 };
45
46 template<typename Receiver>
47 struct OperationStateT {
48 struct Type : OperationStateBase {
49 public:
50 Type(IoContext* parent, Receiver&& receiver) : OperationStateBase(parent), m_receiver(di::move(receiver)) {}
51
52 void execute() override {
53 if (execution::get_stop_token(m_receiver).stop_requested()) {
54 execution::set_stopped(di::move(m_receiver));
55 } else {
56 execution::set_value(di::move(m_receiver));
57 }
58 }
59
60 private:
61 void do_start() { this->parent->push_back(this); }
62
63 friend void tag_invoke(di::Tag<execution::start>, Type& self) { self.do_start(); }
64
65 [[no_unique_address]] Receiver m_receiver;
66 };
67 };
68
69 template<typename Receiver>
70 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
71
72 struct Scheduler {
73 private:
74 struct Sender {
75 using is_sender = void;
76
77 using CompletionSignatures = di::CompletionSignatures<di::SetValue(), di::SetStopped()>;
78
79 IoContext* parent;
80
81 private:
82 template<di::ReceiverOf<CompletionSignatures> Receiver>
83 friend auto tag_invoke(di::Tag<execution::connect>, Sender self, Receiver receiver) {
84 return OperationState<Receiver> { self.parent, di::move(receiver) };
85 }
86
87 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, Sender const& self) {
88 return Env { self.parent };
89 }
90 };
91
92 struct AsyncFile : di::Immovable {
93 public:
94 explicit AsyncFile(IoContext* parent, di::PathView path, OpenMode mode, u16 create_mode)
95 : m_parent(parent), m_path(path), m_mode(mode), m_create_mode(create_mode) {}
96
97 private:
98 struct ReadSomeSender {
99 public:
100 using is_sender = void;
101
102 using CompletionSignatures = di::CompletionSignatures<di::SetValue(size_t), di::SetError(di::Error)>;
103
104 IoContext* parent;
105 int file_descriptor;
106 di::Span<di::Byte> buffer;
107 di::Optional<u64> offset;
108
109 private:
110 template<typename Rec>
111 struct OperationStateT {
112 struct Type {
115 di::Span<di::Byte> buffer;
116 di::Optional<u64> offset;
117 [[no_unique_address]] Rec receiver;
118
119 private:
120 friend void tag_invoke(di::Tag<execution::start>, Type& self) {
121 auto sync_file = SyncFile(SyncFile::Owned::No, self.file_descriptor);
122 auto result = self.offset ? sync_file.read_some(self.offset.value(), self.buffer)
123 : sync_file.read_some(self.buffer);
124 if (!result.has_value()) {
125 execution::set_error(di::move(self.receiver), di::Error(di::move(result).error()));
126 } else {
127 execution::set_value(di::move(self.receiver), di::move(result).value());
128 }
129 }
130 };
131 };
132
133 template<di::ReceiverOf<CompletionSignatures> Receiver>
134 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
135
136 template<di::ReceiverOf<CompletionSignatures> Receiver>
137 friend auto tag_invoke(di::Tag<execution::connect>, ReadSomeSender self, Receiver receiver) {
138 return OperationState<Receiver> { self.parent, self.file_descriptor, self.buffer, self.offset,
139 di::move(receiver) };
140 }
141
142 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, ReadSomeSender const& self) {
143 return Env { self.parent };
144 }
145 };
146
147 struct WriteSomeSender {
148 public:
149 using is_sender = void;
150
151 using CompletionSignatures = di::CompletionSignatures<di::SetValue(size_t), di::SetError(di::Error)>;
152
153 IoContext* parent;
154 int file_descriptor;
155 di::Span<di::Byte const> buffer;
156 di::Optional<u64> offset;
157
158 private:
159 template<typename Rec>
160 struct OperationStateT {
161 struct Type {
164 di::Span<di::Byte const> buffer;
165 di::Optional<u64> offset;
166 [[no_unique_address]] Rec receiver;
167
168 private:
169 friend void tag_invoke(di::Tag<execution::start>, Type& self) {
170 auto sync_file = SyncFile(SyncFile::Owned::No, self.file_descriptor);
171 auto result = self.offset ? sync_file.write_some(self.offset.value(), self.buffer)
172 : sync_file.write_some(self.buffer);
173 if (!result) {
174 execution::set_error(di::move(self.receiver), di::Error(di::move(result).error()));
175 } else {
176 execution::set_value(di::move(self.receiver), di::move(result).value());
177 }
178 }
179 };
180 };
181
182 template<di::ReceiverOf<CompletionSignatures> Receiver>
183 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
184
185 template<di::ReceiverOf<CompletionSignatures> Receiver>
186 friend auto tag_invoke(di::Tag<execution::connect>, WriteSomeSender self, Receiver receiver) {
187 return OperationState<Receiver> { self.parent, self.file_descriptor, self.buffer, self.offset,
188 di::move(receiver) };
189 }
190
191 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, WriteSomeSender const& self) {
192 return Env { self.parent };
193 }
194 };
195
196 struct OpenSender {
197 public:
198 using is_sender = void;
199
200 using CompletionSignatures =
201 di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<AsyncFile>), di::SetError(di::Error)>;
202
203 IoContext* parent;
204 di::ReferenceWrapper<AsyncFile> file;
205
206 private:
207 template<typename Rec>
208 struct OperationStateT {
209 struct Type {
211 di::ReferenceWrapper<AsyncFile> file;
212 [[no_unique_address]] Rec receiver;
213
214 private:
215 friend void tag_invoke(di::Tag<execution::start>, Type& self) {
216 auto result = open_sync(self.file.get().m_path, self.file.get().m_mode,
217 self.file.get().m_create_mode);
218 if (!result) {
219 execution::set_error(di::move(self.receiver), di::Error(di::move(result).error()));
220 } else {
221 self.file.get().m_file = di::move(result).value();
222 execution::set_value(di::move(self.receiver), self.file);
223 }
224 }
225 };
226 };
227
228 template<di::ReceiverOf<CompletionSignatures> Receiver>
229 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
230
231 template<di::ReceiverOf<CompletionSignatures> Receiver>
232 friend auto tag_invoke(di::Tag<execution::connect>, OpenSender self, Receiver receiver) {
233 return OperationState<Receiver> { self.parent, self.file, di::move(receiver) };
234 }
235
236 constexpr friend auto tag_invoke(di::Tag<execution::get_env>, OpenSender const& self) {
237 return Env { self.parent };
238 }
239 };
240
241 struct RunSender {
242 public:
243 using is_sender = di::SequenceTag;
244
245 using CompletionSignatures =
246 di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<AsyncFile>), di::SetError(di::Error)>;
247
248 IoContext* parent;
249 di::ReferenceWrapper<AsyncFile> file;
250
251 private:
252 template<typename Rec>
253 struct OperationStateT {
254 struct Type {
255 struct Receiver {
256 using is_receiver = void;
257
259
260 friend void tag_invoke(di::Tag<execution::set_value>, Receiver&& self) {
261 self.parent->close();
262 }
263 friend void tag_invoke(di::Tag<execution::set_stopped>, Receiver&& self) {
264 self.parent->close();
265 }
266 };
267
268 using Next = di::meta::NextSenderOf<Rec, OpenSender>;
269 using Op = di::meta::ConnectResult<Next, Receiver>;
270
272 di::ReferenceWrapper<AsyncFile> file;
273 [[no_unique_address]] Rec receiver;
274 DI_IMMOVABLE_NO_UNIQUE_ADDRESS di::Optional<Op> op;
275
276 void close() {
277 auto result = file.get().m_file.close();
278 if (!result) {
279 execution::set_error(di::move(receiver), di::Error(di::move(result).error()));
280 } else {
281 execution::set_value(di::move(receiver));
282 }
283 }
284
285 private:
286 friend void tag_invoke(di::Tag<execution::start>, Type& self) {
287 self.op.emplace(di::DeferConstruct([&] {
288 return execution::connect(
289 execution::set_next(self.receiver, OpenSender(self.parent, self.file)),
290 Receiver(&self));
291 }));
292 execution::start(*self.op);
293 }
294 };
295 };
296
297 template<typename Receiver>
298 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
299
300 template<di::concepts::Receiver Receiver>
301 friend auto tag_invoke(di::Tag<execution::subscribe>, RunSender self, Receiver receiver) {
302 return OperationState<Receiver> { self.parent, self.file, di::move(receiver), di::nullopt };
303 }
304
305 friend auto tag_invoke(di::Tag<execution::get_env>, RunSender const& self) {
306 return execution::make_env(Env(self.parent),
307 execution::with(execution::get_sequence_cardinality, di::c_<1ZU>));
308 }
309 };
310
311 friend auto tag_invoke(di::Tag<di::execution::async_read_some>, AsyncFile& self, di::Span<di::Byte> buffer,
312 di::Optional<u64> offset) {
313 return ReadSomeSender { self.m_parent, self.m_file.file_descriptor(), buffer, offset };
314 }
315
316 friend auto tag_invoke(di::Tag<di::execution::async_write_some>, AsyncFile& self,
317 di::Span<di::Byte const> buffer, di::Optional<u64> offset) {
318 return WriteSomeSender { self.m_parent, self.m_file.file_descriptor(), buffer, offset };
319 }
320
321 friend auto tag_invoke(di::Tag<di::execution::run>, AsyncFile& self) {
322 return RunSender(self.m_parent, di::ref(self));
323 }
324
325 IoContext* m_parent;
326 di::PathView m_path;
327 [[maybe_unused]] OpenMode m_mode;
328 [[maybe_unused]] u16 m_create_mode;
329 SyncFile m_file;
330 };
331
332 public:
333 IoContext* parent { nullptr };
334
335 private:
336 friend auto tag_invoke(di::Tag<execution::schedule>, Scheduler const& self) { return Sender { self.parent }; }
337
338 friend auto tag_invoke(di::Tag<execution::async_open>, Scheduler const& self, di::PathView path, OpenMode mode,
339 u16 create_mode = 0666) {
340 return di::make_deferred<AsyncFile>(self.parent, path, mode, create_mode);
341 }
342
343 constexpr friend auto operator==(Scheduler const&, Scheduler const&) -> bool = default;
344 };
345
346 struct State {
347 State() {} // NOLINT(modernize-use-equals-default)
348
349 di::Queue<OperationStateBase, di::IntrusiveForwardList<OperationStateBase>> queue;
350 bool stopped { false };
351 };
352
353 auto state() -> di::Synchronized<State>& { return m_state.value(); }
354
355public:
356 static auto create() -> di::Result<IoContext> { return IoContext {}; }
357
358 IoContext(IoContext&&) = default;
359
360 auto get_scheduler() -> Scheduler { return Scheduler { this }; }
361
362 void run() {
363 while (auto* operation = pop_front()) {
364 operation->execute();
365 }
366 }
367
368 void finish() {
369 state().with_lock([](State& state) {
370 state.stopped = true;
371 });
372 }
373
374private:
375 IoContext() = default;
376
377 auto pop_front() -> OperationStateBase* {
378 // FIXME: block instead of busy polling the queue when it is empty.
379 for (;;) {
380 auto [operation, is_stopped] = state().with_lock([](State& state) -> di::Tuple<OperationStateBase*, bool> {
381 if (!state.queue.empty()) {
382 return di::make_tuple(di::addressof(*state.queue.pop()), false);
383 }
384 if (state.stopped) {
385 return di::make_tuple(nullptr, true);
386 }
387 return di::make_tuple(nullptr, false);
388 });
389
390 if (is_stopped) {
391 return nullptr;
392 }
393 if (operation) {
394 return operation;
395 }
396 }
397 }
398
399 void push_back(OperationStateBase* operation) {
400 state().with_lock([&](State& state) {
401 state.queue.push(*operation);
402 });
403 }
404
405 di::MovableBox<di::Synchronized<State>> m_state;
406};
407}
Definition io_context.h:26
void run()
Definition io_context.h:362
void finish()
Definition io_context.h:368
static auto create() -> di::Result< IoContext >
Definition io_context.h:356
IoContext(IoContext &&)=default
auto get_scheduler() -> Scheduler
Definition io_context.h:360
Definition sync_file.h:40
@ No
Definition sync_file.h:42
Definition error.h:7
Definition directory_entry.h:11
linux::IoUringContext IoContext
Definition io_context.h:6
auto open_sync(di::PathView path, OpenMode open_mode, u16 create_mode=0666, OpenFlags flags=OpenFlags::None) -> di::Expected< SyncFile, di::GenericCode >
OpenMode
Definition sync_file.h:109
Type(IoContext *parent, Receiver &&receiver)
Definition io_context.h:50
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:63
void execute() override
Definition io_context.h:52
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:215
di::ReferenceWrapper< AsyncFile > file
Definition io_context.h:211
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:120
friend void tag_invoke(di::Tag< execution::set_stopped >, Receiver &&self)
Definition io_context.h:263
friend void tag_invoke(di::Tag< execution::set_value >, Receiver &&self)
Definition io_context.h:260
DI_IMMOVABLE_NO_UNIQUE_ADDRESS di::Optional< Op > op
Definition io_context.h:274
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:286
di::ReferenceWrapper< AsyncFile > file
Definition io_context.h:272
di::meta::NextSenderOf< Rec, OpenSender > Next
Definition io_context.h:268
di::meta::ConnectResult< Next, Receiver > Op
Definition io_context.h:269
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:169
di::Span< di::Byte const > buffer
Definition io_context.h:164