dius 0.1.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages Concepts
io_uring_context.h
Go to the documentation of this file.
1#pragma once
2
3#include "di/assert/prelude.h"
4#include "di/container/algorithm/copy.h"
5#include "di/container/algorithm/prelude.h"
6#include "di/container/intrusive/prelude.h"
7#include "di/container/queue/prelude.h"
8#include "di/execution/interface/connect.h"
9#include "di/execution/interface/run.h"
10#include "di/execution/io/async_net.h"
11#include "di/execution/meta/connect_result.h"
12#include "di/execution/prelude.h"
13#include "di/execution/query/make_env.h"
14#include "di/execution/receiver/prelude.h"
15#include "di/execution/sequence/sequence_sender.h"
16#include "di/execution/types/completion_signuatures.h"
17#include "di/function/make_deferred.h"
18#include "di/function/prelude.h"
19#include "di/meta/operations.h"
20#include "di/platform/compiler.h"
21#include "di/util/addressof.h"
22#include "di/vocab/optional/prelude.h"
23#include "di/vocab/variant/prelude.h"
24#include "dius/c_definitions.h"
25#include "dius/error.h"
26#include "dius/linux/io_uring.h"
27#include "dius/net/address.h"
28#include "dius/net/socket.h"
29#include "dius/sync_file.h"
31
32namespace dius::linux {
33struct IoUringContext;
34struct IoUringContextImpl;
36struct IoUringScheduler;
37struct ScheduleSender;
38struct OpenSender;
39struct AcceptSender;
40struct MakeSocketSender;
41
42template<di::concepts::Invocable<io_uring::SQE*> Fun>
43static void enqueue_io_operation(IoUringContext*, OperationStateBase* op, Fun&& function);
44
46
48
49struct OperationStateBase : di::IntrusiveForwardListNode<> {
50public:
51 virtual void execute() = 0;
52 virtual void did_complete(io_uring::CQE const*) {}
53};
54
56public:
57 static auto create() -> di::Result<IoUringContext>;
58
59 IoUringContext(IoUringContext&& other) : m_handle(di::move(other.m_handle)) {}
60
62
64
65 void run();
66 void finish() { m_done.store(true); }
67
68private:
69 IoUringContext(io_uring::IoUringHandle handle) : m_handle(di::move(handle)) {};
70
71public:
73 di::Queue<OperationStateBase, di::IntrusiveForwardList<OperationStateBase>> m_queue;
74 di::Atomic<bool> m_done { false };
75};
76
78public:
79 IoUringContext* parent { nullptr };
80
81private:
82 friend auto tag_invoke(di::Tag<di::execution::schedule>, IoUringScheduler const& self) -> ScheduleSender;
83
84 constexpr friend auto operator==(IoUringScheduler const&, IoUringScheduler const&) -> bool = default;
85};
86
87struct Env {
88 IoUringContext* parent { nullptr };
89
90 template<typename CPO>
91 constexpr friend auto tag_invoke(di::execution::GetCompletionScheduler<CPO>, Env const& self) {
92 return get_scheduler(self.parent);
93 }
94};
95
97public:
98 using is_sender = void;
99
101 di::CompletionSignatures<di::SetValue(size_t), di::SetError(di::Error), di::SetStopped()>;
102
103 IoUringContext* parent { nullptr };
104 int file_descriptor { -1 };
105 di::Span<di::Byte> buffer;
106 di::Optional<u64> offset;
107
108private:
109 template<typename Rec>
110 struct OperationStateT {
112 explicit Type(IoUringContext* parent, int file_descriptor, di::Span<di::Byte> buffer,
113 di::Optional<u64> offset, Rec receiver)
114 : m_parent(parent)
115 , m_file_descriptor(file_descriptor)
116 , m_buffer(buffer)
117 , m_offset(offset)
118 , m_receiver(di::move(receiver)) {}
119
120 void execute() override {
121 if (di::execution::get_stop_token(m_receiver).stop_requested()) {
122 di::execution::set_stopped(di::move(m_receiver));
123 } else {
124 // Enqueue io_uring sqe with the read request.
125 enqueue_io_operation(m_parent, this, [&](auto* sqe) {
126 sqe->opcode = IORING_OP_READ;
127 sqe->fd = m_file_descriptor;
128 sqe->off = m_offset.value_or((u64) -1);
129 sqe->addr = reinterpret_cast<u64>(m_buffer.data());
130 sqe->len = m_buffer.size();
131 });
132 }
133 }
134
135 void did_complete(io_uring::CQE const* cqe) override {
136 if (cqe->res < 0) {
137 di::execution::set_error(di::move(m_receiver), di::Error(PosixError(-cqe->res)));
138 } else {
139 di::execution::set_value(di::move(m_receiver), static_cast<size_t>(cqe->res));
140 }
141 }
142
143 private:
144 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
145 enqueue_operation(self.m_parent, di::addressof(self));
146 }
147
148 IoUringContext* m_parent;
149 int m_file_descriptor;
150 di::Span<di::Byte> m_buffer;
151 di::Optional<u64> m_offset;
152 [[no_unique_address]] Rec m_receiver;
153 };
154 };
155
156 template<di::ReceiverOf<CompletionSignatures> Receiver>
157 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
158
159 template<di::ReceiverOf<CompletionSignatures> Receiver>
160 friend auto tag_invoke(di::Tag<di::execution::connect>, ReadSomeSender self, Receiver receiver) {
161 return OperationState<Receiver> { self.parent, self.file_descriptor, self.buffer, self.offset,
162 di::move(receiver) };
163 }
164
165 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, ReadSomeSender const& self) {
166 return Env(self.parent);
167 }
168};
169
171public:
172 using is_sender = void;
173
175 di::CompletionSignatures<di::SetValue(size_t), di::SetError(di::Error), di::SetStopped()>;
176
177 IoUringContext* parent { nullptr };
178 int file_descriptor { -1 };
179 di::Span<di::Byte const> buffer;
180 di::Optional<u64> offset;
181
182private:
183 template<typename Rec>
184 struct OperationStateT {
186 explicit Type(IoUringContext* parent, int file_descriptor, di::Span<di::Byte const> buffer,
187 di::Optional<u64> offset, Rec receiver)
188 : m_parent(parent)
189 , m_file_descriptor(file_descriptor)
190 , m_buffer(buffer)
191 , m_offset(offset)
192 , m_receiver(di::move(receiver)) {}
193
194 void execute() override {
195 if (di::execution::get_stop_token(m_receiver).stop_requested()) {
196 di::execution::set_stopped(di::move(m_receiver));
197 } else {
198 // Enqueue io_uring sqe with the write request.
199 enqueue_io_operation(m_parent, this, [&](auto* sqe) {
200 sqe->opcode = IORING_OP_WRITE;
201 sqe->fd = m_file_descriptor;
202 sqe->off = m_offset.value_or((u64) -1);
203 sqe->addr = reinterpret_cast<u64>(m_buffer.data());
204 sqe->len = m_buffer.size();
205 });
206 }
207 }
208
209 void did_complete(io_uring::CQE const* cqe) override {
210 if (cqe->res < 0) {
211 di::execution::set_error(di::move(m_receiver), di::Error(PosixError(-cqe->res)));
212 } else {
213 di::execution::set_value(di::move(m_receiver), static_cast<size_t>(cqe->res));
214 }
215 }
216
217 private:
218 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
219 enqueue_operation(self.m_parent, di::addressof(self));
220 }
221
222 IoUringContext* m_parent;
223 int m_file_descriptor;
224 di::Span<di::Byte const> m_buffer;
225 di::Optional<u64> m_offset;
226 [[no_unique_address]] Rec m_receiver;
227 };
228 };
229
230 template<di::ReceiverOf<CompletionSignatures> Receiver>
231 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
232
233 template<di::ReceiverOf<CompletionSignatures> Receiver>
234 friend auto tag_invoke(di::Tag<di::execution::connect>, WriteSomeSender self, Receiver receiver) {
235 return OperationState<Receiver> { self.parent, self.file_descriptor, self.buffer, self.offset,
236 di::move(receiver) };
237 }
238
239 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, WriteSomeSender const& self) {
240 return Env(self.parent);
241 }
242};
243
245public:
246 using is_sender = void;
247
248 using CompletionSignatures = di::CompletionSignatures<di::SetValue(), di::SetError(di::Error), di::SetStopped()>;
249
250 IoUringContext* parent { nullptr };
251 int file_descriptor { -1 };
252
253private:
254 template<typename Rec>
255 struct OperationStateT {
257 explicit Type(IoUringContext* parent, int file_descriptor, Rec receiver)
258 : m_parent(parent), m_file_descriptor(file_descriptor), m_receiver(di::move(receiver)) {}
259
260 void execute() override {
261 if (di::execution::get_stop_token(m_receiver).stop_requested()) {
262 di::execution::set_stopped(di::move(m_receiver));
263 } else {
264 // Enqueue io_uring sqe with the close request.
265 enqueue_io_operation(m_parent, this, [&](auto* sqe) {
266 sqe->opcode = IORING_OP_CLOSE;
267 sqe->fd = m_file_descriptor;
268 });
269 }
270 }
271
272 void did_complete(io_uring::CQE const* cqe) override {
273 if (cqe->res < 0) {
274 di::execution::set_error(di::move(m_receiver), di::Error(PosixError(-cqe->res)));
275 } else {
276 di::execution::set_value(di::move(m_receiver));
277 }
278 }
279
280 private:
281 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
282 enqueue_operation(self.m_parent, di::addressof(self));
283 }
284
285 IoUringContext* m_parent;
286 int m_file_descriptor;
287 [[no_unique_address]] Rec m_receiver;
288 };
289 };
290
291 template<di::ReceiverOf<CompletionSignatures> Receiver>
292 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
293
294 template<di::ReceiverOf<CompletionSignatures> Receiver>
295 friend auto tag_invoke(di::Tag<di::execution::connect>, CloseSender self, Receiver receiver) {
296 return OperationState<Receiver> { self.parent, self.file_descriptor, di::move(receiver) };
297 }
298
299 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, CloseSender const& self) {
300 return Env(self.parent);
301 }
302};
303
305public:
306 using is_sender = void;
307
308 using CompletionSignatures = di::CompletionSignatures<di::SetValue(), di::SetError(di::Error), di::SetStopped()>;
309
310 IoUringContext* parent { nullptr };
311 int file_descriptor { -1 };
313
314private:
315 template<typename Rec>
316 struct OperationStateT {
319 : m_parent(parent), m_file_descriptor(file_descriptor), m_receiver(di::move(receiver)) {
320 if (address.path().size() < 108 - 1) {
321 sockaddr_un addr = {};
322 addr.sun_family = 1;
323 di::copy(address.path(), addr.sun_path);
324 m_address = addr;
325 }
326 }
327
328 void execute() override {
329 if (di::execution::get_stop_token(m_receiver).stop_requested()) {
330 di::execution::set_stopped(di::move(m_receiver));
331 } else {
332 // If the path was too long, fail fast.
333 if (!m_address) {
334 di::execution::set_error(di::move(m_receiver), di::Error(dius::PosixError::FilenameTooLong));
335 return;
336 }
337
338 // Enqueue io_uring sqe with the close request.
339 enqueue_io_operation(m_parent, this, [&](auto* sqe) {
340 sqe->opcode = IORING_OP_CONNECT;
341 sqe->fd = m_file_descriptor;
342 sqe->addr = (u64) di::addressof(m_address.value());
343 sqe->off = sizeof(m_address.value());
344 });
345 }
346 }
347
348 void did_complete(io_uring::CQE const* cqe) override {
349 if (cqe->res < 0) {
350 di::execution::set_error(di::move(m_receiver), di::Error(PosixError(-cqe->res)));
351 } else {
352 di::execution::set_value(di::move(m_receiver));
353 }
354 }
355
356 private:
357 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
358 enqueue_operation(self.m_parent, di::addressof(self));
359 }
360
361 IoUringContext* m_parent { nullptr };
362 int m_file_descriptor { -1 };
363 di::Optional<sockaddr_un> m_address;
364 [[no_unique_address]] Rec m_receiver;
365 };
366 };
367
368 template<di::ReceiverOf<CompletionSignatures> Receiver>
369 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
370
371 template<di::ReceiverOf<CompletionSignatures> Receiver>
372 friend auto tag_invoke(di::Tag<di::execution::connect>, ConnectSender self, Receiver receiver) {
373 return OperationState<Receiver> { self.parent, self.file_descriptor, di::move(self.address),
374 di::move(receiver) };
375 }
376
377 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, ConnectSender const& self) {
378 return Env(self.parent);
379 }
380};
381
383public:
384 using is_sender = void;
385
386 using CompletionSignatures = di::CompletionSignatures<di::SetValue(), di::SetError(di::Error), di::SetStopped()>;
387
388 IoUringContext* parent { nullptr };
389 int file_descriptor { -1 };
391
392private:
393 template<typename Rec>
394 struct OperationStateT {
397 : m_parent(parent), m_file_descriptor(file_descriptor), m_receiver(di::move(receiver)) {
398 if (address.path().size() < 108 - 1) {
399 sockaddr_un addr = {};
400 addr.sun_family = 1;
401 di::copy(address.path(), addr.sun_path);
402 m_address = addr;
403 }
404 }
405
406 void execute() override {
407 if (di::execution::get_stop_token(m_receiver).stop_requested()) {
408 di::execution::set_stopped(di::move(m_receiver));
409 } else {
410 // If the path was too long, fail fast.
411 if (!m_address) {
412 di::execution::set_error(di::move(m_receiver), di::Error(dius::PosixError::FilenameTooLong));
413 return;
414 }
415
416 // io_uring doesn't support bind() and listen()...
417 auto result = system::system_call<int>(system::Number::bind, m_file_descriptor,
418 di::addressof(m_address.value()), sizeof(m_address.value()));
419 if (!result.has_value()) {
420 di::execution::set_error(di::move(m_receiver), di::Error(di::move(result).error()));
421 } else {
422 di::execution::set_value(di::move(m_receiver));
423 }
424 }
425 }
426
427 private:
428 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
429 enqueue_operation(self.m_parent, di::addressof(self));
430 }
431
432 IoUringContext* m_parent { nullptr };
433 int m_file_descriptor { -1 };
434 di::Optional<sockaddr_un> m_address;
435 [[no_unique_address]] Rec m_receiver;
436 };
437 };
438
439 template<di::ReceiverOf<CompletionSignatures> Receiver>
440 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
441
442 template<di::ReceiverOf<CompletionSignatures> Receiver>
443 friend auto tag_invoke(di::Tag<di::execution::connect>, BindSender self, Receiver receiver) {
444 return OperationState<Receiver> { self.parent, self.file_descriptor, di::move(self.address),
445 di::move(receiver) };
446 }
447
448 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, BindSender const& self) {
449 return Env(self.parent);
450 }
451};
452
454public:
455 using is_sender = void;
456
457 using CompletionSignatures = di::CompletionSignatures<di::SetValue(), di::SetError(di::Error), di::SetStopped()>;
458
459 IoUringContext* parent { nullptr };
460 int file_descriptor { -1 };
461 int count { 0 };
462
463private:
464 template<typename Rec>
465 struct OperationStateT {
467 explicit Type(IoUringContext* parent, int file_descriptor, int count, Rec receiver)
468 : m_parent(parent)
469 , m_file_descriptor(file_descriptor)
470 , m_count(count)
471 , m_receiver(di::move(receiver)) {}
472
473 void execute() override {
474 if (di::execution::get_stop_token(m_receiver).stop_requested()) {
475 di::execution::set_stopped(di::move(m_receiver));
476 } else {
477 // io_uring doesn't support bind() and listen()...
478 auto result = system::system_call<int>(system::Number::listen, m_file_descriptor, m_count);
479 if (!result.has_value()) {
480 di::execution::set_error(di::move(m_receiver), di::Error(di::move(result).error()));
481 } else {
482 di::execution::set_value(di::move(m_receiver));
483 }
484 }
485 }
486
487 private:
488 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
489 enqueue_operation(self.m_parent, di::addressof(self));
490 }
491
492 IoUringContext* m_parent { nullptr };
493 int m_file_descriptor { -1 };
494 int m_count {};
495 [[no_unique_address]] Rec m_receiver;
496 };
497 };
498
499 template<di::ReceiverOf<CompletionSignatures> Receiver>
500 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
501
502 template<di::ReceiverOf<CompletionSignatures> Receiver>
503 friend auto tag_invoke(di::Tag<di::execution::connect>, ListenSender self, Receiver receiver) {
504 return OperationState<Receiver> { self.parent, self.file_descriptor, self.count, di::move(receiver) };
505 }
506
507 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, ListenSender const& self) {
508 return Env(self.parent);
509 }
510};
511
513public:
514 using is_sender = void;
515
516 using CompletionSignatures = di::CompletionSignatures<di::SetValue(), di::SetError(di::Error), di::SetStopped()>;
517
518 IoUringContext* parent { nullptr };
519 int file_descriptor { -1 };
521
522private:
523 template<typename Rec>
524 struct OperationStateT {
527 : m_parent(parent), m_file_descriptor(file_descriptor), m_how(how), m_receiver(di::move(receiver)) {}
528
529 void execute() override {
530 if (di::execution::get_stop_token(m_receiver).stop_requested()) {
531 di::execution::set_stopped(di::move(m_receiver));
532 } else {
533 // Enqueue io_uring sqe with the close request.
534 enqueue_io_operation(m_parent, this, [&](auto* sqe) {
535 sqe->opcode = IORING_OP_SHUTDOWN;
536 sqe->fd = m_file_descriptor;
537 sqe->len = (u32) m_how;
538 });
539 }
540 }
541
542 void did_complete(io_uring::CQE const* cqe) override {
543 if (cqe->res < 0) {
544 di::execution::set_error(di::move(m_receiver), di::Error(PosixError(-cqe->res)));
545 } else {
546 di::execution::set_value(di::move(m_receiver));
547 }
548 }
549
550 private:
551 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
552 enqueue_operation(self.m_parent, di::addressof(self));
553 }
554
555 IoUringContext* m_parent { nullptr };
556 int m_file_descriptor { -1 };
557 net::Shutdown m_how {};
558 [[no_unique_address]] Rec m_receiver;
559 };
560 };
561
562 template<di::ReceiverOf<CompletionSignatures> Receiver>
563 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
564
565 template<di::ReceiverOf<CompletionSignatures> Receiver>
566 friend auto tag_invoke(di::Tag<di::execution::connect>, ShutdownSender self, Receiver receiver) {
567 return OperationState<Receiver> { self.parent, self.file_descriptor, self.how, di::move(receiver) };
568 }
569
570 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, ShutdownSender const& self) {
571 return Env(self.parent);
572 }
573};
574
576public:
577 using is_sender = void;
578
579 using CompletionSignatures = di::CompletionSignatures<di::SetValue(), di::SetStopped()>;
580
581 IoUringContext* parent { nullptr };
582
583private:
584 template<typename Rec>
585 struct OperationStateT {
587 public:
588 Type(IoUringContext* parent, Rec&& receiver) : m_parent(parent), m_receiver(di::move(receiver)) {}
589
590 void execute() override {
591 if (di::execution::get_stop_token(m_receiver).stop_requested()) {
592 di::execution::set_stopped(di::move(m_receiver));
593 } else {
594 di::execution::set_value(di::move(m_receiver));
595 }
596 }
597
598 private:
599 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
600 enqueue_operation(self.m_parent, di::addressof(self));
601 }
602
603 IoUringContext* m_parent { nullptr };
604 [[no_unique_address]] Rec m_receiver;
605 };
606 };
607
608 template<di::Receiver Rec>
609 using OperationState = di::meta::Type<OperationStateT<Rec>>;
610
611 template<di::ReceiverOf<CompletionSignatures> Rec>
612 friend auto tag_invoke(di::Tag<di::execution::connect>, ScheduleSender self, Rec receiver) {
613 return OperationState<Rec> { self.parent, di::move(receiver) };
614 }
615
616 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, ScheduleSender const& self) {
617 return Env { self.parent };
618 }
619};
620
621class AsyncFile : di::Immovable {
622public:
624 : m_parent(parent), m_path(di::move(path)), m_mode(mode), m_create_mode(create_mode) {}
625
626 auto parent() const -> IoUringContext* { return m_parent; }
627 auto path() const -> di::Path const& { return m_path; }
628 auto mode() const -> OpenMode { return m_mode; }
629 auto create_mode() const -> u16 { return m_create_mode; }
630
631 auto fd() const -> int { return m_fd; }
632 void set_fd(int fd) { m_fd = fd; }
633
634private:
635 friend auto tag_invoke(di::Tag<di::execution::async_read_some>, AsyncFile& self, di::Span<di::Byte> buffer,
636 di::Optional<u64> offset) {
637 return ReadSomeSender { self.m_parent, self.m_fd, buffer, offset };
638 }
639
640 friend auto tag_invoke(di::Tag<di::execution::async_write_some>, AsyncFile& self, di::Span<di::Byte const> buffer,
641 di::Optional<u64> offset) {
642 return WriteSomeSender { self.m_parent, self.m_fd, buffer, offset };
643 }
644
645 IoUringContext* m_parent;
646 di::Path m_path;
647 OpenMode m_mode;
648 u16 m_create_mode;
649 int m_fd { -1 };
650};
651
653public:
654 explicit AcceptSocket(int base_fd) : m_base_fd(base_fd) {}
655
656 auto base_fd() const -> int { return m_base_fd; }
657
658private:
659 int m_base_fd { -1 };
660};
661
662template<typename Base>
664 : public di::Immovable
665 , public Base {
666public:
667 template<typename... Args>
668 requires(di::concepts::ConstructibleFrom<Base, Args...>)
669 explicit AsyncSocket(IoUringContext* context, Args&&... args)
670 : Base(di::forward<Args>(args)...), m_parent(context) {}
671
672 auto parent() const -> IoUringContext* { return m_parent; }
673
674 auto fd() const -> int { return m_fd; }
675 void set_fd(int fd) { m_fd = fd; }
676
677private:
678 friend auto tag_invoke(di::Tag<di::execution::async_read_some>, AsyncSocket& self, di::Span<di::Byte> buffer,
679 di::Optional<u64>) {
680 return ReadSomeSender { self.m_parent, self.m_fd, buffer, {} };
681 }
682
683 friend auto tag_invoke(di::Tag<di::execution::async_write_some>, AsyncSocket& self, di::Span<di::Byte const> buffer,
684 di::Optional<u64>) {
685 return WriteSomeSender { self.m_parent, self.m_fd, buffer, {} };
686 }
687
688 friend auto tag_invoke(di::Tag<di::execution::async_bind>, AsyncSocket& self, net::UnixAddress address) {
689 return BindSender { self.m_parent, self.m_fd, di::move(address) };
690 }
691
692 friend auto tag_invoke(di::Tag<di::execution::async_connect>, AsyncSocket& self, net::UnixAddress address) {
693 return ConnectSender { self.m_parent, self.m_fd, di::move(address) };
694 }
695
696 friend auto tag_invoke(di::Tag<di::execution::async_listen>, AsyncSocket& self, int count) {
697 return ListenSender { self.m_parent, self.m_fd, count };
698 }
699
700 friend auto tag_invoke(di::Tag<di::execution::async_shutdown>, AsyncSocket& self, net::Shutdown how) {
701 return ShutdownSender { self.m_parent, self.m_fd, how };
702 }
703
704 friend auto tag_invoke(di::Tag<di::execution::async_accept>, AsyncSocket& self) {
705 return di::make_deferred<AsyncSocket<AcceptSocket>>(self.m_parent, self.m_fd);
706 }
707
708 IoUringContext* m_parent;
709 int m_fd { -1 };
710};
711
713public:
714 using is_sender = void;
715
716 using CompletionSignatures = di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<AsyncFile>),
717 di::SetError(di::Error), di::SetStopped()>;
718
720 di::ReferenceWrapper<AsyncFile> file;
721
722private:
723 template<typename Rec>
724 struct OperationStateT {
726 explicit Type(IoUringContext* parent, di::ReferenceWrapper<AsyncFile> file, Rec&& receiver)
727 : m_parent(parent), m_file(file), m_receiver(di::move(receiver)) {}
728
729 void execute() override {
730 if (di::execution::get_stop_token(m_receiver).stop_requested()) {
731 di::execution::set_stopped(di::move(m_receiver));
732 } else {
733 auto open_mode_flags = [&] {
734 switch (m_file.get().mode()) {
736 return O_RDONLY;
738 return O_WRONLY | O_EXCL | O_CREAT;
740 return O_WRONLY | O_TRUNC | O_CREAT;
742 return O_RDWR;
744 return O_WRONLY | O_APPEND | O_CREAT;
745 default:
746 di::unreachable();
747 }
748 }();
749
750 // Enqueue io_uring sqe with the open request.
751 enqueue_io_operation(m_parent, this, [&](auto* sqe) {
752 sqe->opcode = IORING_OP_OPENAT;
753 sqe->fd = AT_FDCWD;
754 sqe->addr = reinterpret_cast<u64>(m_file.get().path().c_str());
755 sqe->len = m_file.get().create_mode();
756 sqe->open_flags = open_mode_flags;
757 });
758 }
759 }
760
761 void did_complete(io_uring::CQE const* cqe) override {
762 if (cqe->res < 0) {
763 di::execution::set_error(di::move(m_receiver), di::Error(PosixError(-cqe->res)));
764 } else {
765 m_file.get().set_fd(cqe->res);
766 di::execution::set_value(di::move(m_receiver), di::ref(m_file));
767 }
768 }
769
770 private:
771 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
772 enqueue_operation(self.m_parent, di::addressof(self));
773 }
774
775 IoUringContext* m_parent;
776 di::ReferenceWrapper<AsyncFile> m_file;
777 [[no_unique_address]] Rec m_receiver;
778 };
779 };
780
781 template<di::ReceiverOf<CompletionSignatures> Receiver>
782 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
783
784 template<di::ReceiverOf<CompletionSignatures> Receiver>
785 friend auto tag_invoke(di::Tag<di::execution::connect>, OpenSender self, Receiver receiver) {
786 return OperationState<Receiver> { self.parent, self.file, di::move(receiver) };
787 }
788
789 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, OpenSender const& self) {
790 return Env(self.parent);
791 }
792};
793
795public:
796 using is_sender = void;
797
799
800 using CompletionSignatures = di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<AsyncSocket>),
801 di::SetError(di::Error), di::SetStopped()>;
802
804 di::ReferenceWrapper<AsyncSocket> socket;
805
806private:
807 template<typename Rec>
808 struct OperationStateT {
810 explicit Type(IoUringContext* parent, di::ReferenceWrapper<AsyncSocket> socket, Rec&& receiver)
811 : m_parent(parent), m_socket(socket), m_receiver(di::move(receiver)) {}
812
813 void execute() override {
814 if (di::execution::get_stop_token(m_receiver).stop_requested()) {
815 di::execution::set_stopped(di::move(m_receiver));
816 } else {
817 // Enqueue io_uring sqe with the make socket request.
818 enqueue_io_operation(m_parent, this, [&](auto* sqe) {
819 sqe->opcode = IORING_OP_SOCKET;
820 sqe->fd = 1;
821 sqe->off = 1;
822 });
823 }
824 }
825
826 void did_complete(io_uring::CQE const* cqe) override {
827 if (cqe->res < 0) {
828 di::execution::set_error(di::move(m_receiver), di::Error(PosixError(-cqe->res)));
829 } else {
830 m_socket.get().set_fd(cqe->res);
831 di::execution::set_value(di::move(m_receiver), di::ref(m_socket));
832 }
833 }
834
835 private:
836 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
837 enqueue_operation(self.m_parent, di::addressof(self));
838 }
839
840 IoUringContext* m_parent;
841 di::ReferenceWrapper<AsyncSocket> m_socket;
842 [[no_unique_address]] Rec m_receiver;
843 };
844 };
845
846 template<di::ReceiverOf<CompletionSignatures> Receiver>
847 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
848
849 template<di::ReceiverOf<CompletionSignatures> Receiver>
850 friend auto tag_invoke(di::Tag<di::execution::connect>, MakeSocketSender self, Receiver receiver) {
851 return OperationState<Receiver> { self.parent, self.socket, di::move(receiver) };
852 }
853
854 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, MakeSocketSender const& self) {
855 return Env(self.parent);
856 }
857};
858
860public:
861 using is_sender = void;
862
864
865 using CompletionSignatures = di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<AsyncSocket>),
866 di::SetError(di::Error), di::SetStopped()>;
867
869 di::ReferenceWrapper<AsyncSocket> socket;
870
871private:
872 template<typename Rec>
873 struct OperationStateT {
875 explicit Type(IoUringContext* parent, di::ReferenceWrapper<AsyncSocket> socket, Rec&& receiver)
876 : m_parent(parent), m_socket(socket), m_receiver(di::move(receiver)) {}
877
878 void execute() override {
879 if (di::execution::get_stop_token(m_receiver).stop_requested()) {
880 di::execution::set_stopped(di::move(m_receiver));
881 } else {
882 // Enqueue io_uring sqe with the make socket request.
883 enqueue_io_operation(m_parent, this, [&](auto* sqe) {
884 sqe->opcode = IORING_OP_ACCEPT;
885 sqe->fd = m_socket.get().base_fd();
886 });
887 }
888 }
889
890 void did_complete(io_uring::CQE const* cqe) override {
891 if (cqe->res < 0) {
892 di::execution::set_error(di::move(m_receiver), di::Error(PosixError(-cqe->res)));
893 } else {
894 m_socket.get().set_fd(cqe->res);
895 di::execution::set_value(di::move(m_receiver), di::ref(m_socket));
896 }
897 }
898
899 private:
900 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
901 enqueue_operation(self.m_parent, di::addressof(self));
902 }
903
904 IoUringContext* m_parent;
905 di::ReferenceWrapper<AsyncSocket> m_socket;
906 [[no_unique_address]] Rec m_receiver;
907 };
908 };
909
910 template<di::ReceiverOf<CompletionSignatures> Receiver>
911 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
912
913 template<di::ReceiverOf<CompletionSignatures> Receiver>
914 friend auto tag_invoke(di::Tag<di::execution::connect>, AcceptSender self, Receiver receiver) {
915 return OperationState<Receiver> { self.parent, self.socket, di::move(receiver) };
916 }
917
918 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, AcceptSender const& self) {
919 return Env(self.parent);
920 }
921};
922
923template<typename Object, typename CreateSend>
924struct RunSender {
925public:
926 using is_sender = di::SequenceTag;
927
929 di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<Object>), di::SetError(di::Error), di::SetStopped()>;
930
932 di::ReferenceWrapper<Object> object;
933
934private:
935 template<typename Rec>
936 struct OperationStateT {
937 struct Type : di::Immovable {
938 struct Rec1 {
939 using is_receiver = void;
940
941 di::Function<void()> complete;
942
943 friend auto tag_invoke(di::Tag<di::execution::set_value>, Rec1&& self) { self.complete(); }
944 friend auto tag_invoke(di::Tag<di::execution::set_stopped>, Rec1&& self) { self.complete(); }
945 };
946
947 struct Rec2 : di::ReceiverAdaptor<Rec2> {
948 private:
949 using Base = di::ReceiverAdaptor<Rec2>;
950 friend Base;
951
952 public:
953 explicit Rec2(Rec* receiver) : m_receiver(receiver) {}
954
955 auto base() const& -> Rec const& { return *m_receiver; }
956 auto base() && -> Rec&& { return di::move(*m_receiver); }
957
958 private:
959 Rec* m_receiver;
960 };
961
962 explicit Type(IoUringContext* parent, di::ReferenceWrapper<Object> object, Rec&& receiver)
963 : m_parent(parent), m_object(object), m_receiver(di::move(receiver)) {}
964
965 private:
966 using NextSender = di::meta::NextSenderOf<Rec, CreateSend>;
967 using Op1 = di::meta::ConnectResult<NextSender, Rec1>;
968 using Op2 = di::meta::ConnectResult<CloseSender, Rec2>;
969
970 void finish_phase1() {
971 auto& op = m_op.template emplace<2>(di::DeferConstruct([&] {
972 return di::execution::connect(CloseSender(m_parent, m_object.get().fd()),
973 Rec2(di::addressof(m_receiver)));
974 }));
975 di::execution::start(op);
976 }
977
978 friend void tag_invoke(di::Tag<di::execution::start>, Type& self) {
979 auto& op = self.m_op.template emplace<1>(di::DeferConstruct([&] {
980 return di::execution::connect(
981 di::execution::set_next(self.m_receiver, CreateSend(self.m_parent, self.m_object)),
982 Rec1([&self] {
983 return self.finish_phase1();
984 }));
985 }));
986 di::execution::start(op);
987 }
988
989 IoUringContext* m_parent;
990 di::ReferenceWrapper<Object> m_object;
991 [[no_unique_address]] Rec m_receiver;
992 DI_IMMOVABLE_NO_UNIQUE_ADDRESS di::Variant<di::Void, Op1, Op2> m_op;
993 };
994 };
995
996 template<typename Receiver>
997 using OperationState = di::meta::Type<OperationStateT<Receiver>>;
998
999 template<
1000 di::ReceiverOf<di::CompletionSignatures<di::SetValue(), di::SetError(di::Error), di::SetStopped()>> Receiver>
1001 friend auto tag_invoke(di::Tag<di::execution::subscribe>, RunSender self, Receiver receiver) {
1002 return OperationState<Receiver> { self.parent, self.object, di::move(receiver) };
1003 }
1004
1005 constexpr friend auto tag_invoke(di::Tag<di::execution::get_env>, RunSender const& self) {
1006 return di::execution::make_env(Env(self.parent),
1007 di::execution::with(di::execution::get_sequence_cardinality, di::c_<1ZU>));
1008 }
1009};
1010
1011inline auto tag_invoke(di::Tag<di::execution::run>, AsyncFile& self) {
1012 return RunSender<AsyncFile, OpenSender> { self.parent(), self };
1013}
1014
1015inline auto tag_invoke(di::Tag<di::execution::run>, AsyncSocket<di::Void>& self) {
1017}
1018
1019inline auto tag_invoke(di::Tag<di::execution::run>, AsyncSocket<AcceptSocket>& self) {
1021}
1022
1023inline auto tag_invoke(di::Tag<di::execution::schedule>, IoUringScheduler const& self) -> ScheduleSender {
1024 return { self.parent };
1025}
1026
1027inline auto tag_invoke(di::Tag<di::execution::async_open>, IoUringScheduler const& self, di::Path path, OpenMode mode,
1028 u16 create_mode) {
1029 return di::make_deferred<AsyncFile>(self.parent, di::move(path), mode, create_mode);
1030}
1031
1032inline auto tag_invoke(di::Tag<di::execution::async_open>, IoUringScheduler const& self, di::Path path, OpenMode mode) {
1033 return di::make_deferred<AsyncFile>(self.parent, di::move(path), mode, 0666);
1034}
1035
1036inline auto tag_invoke(di::Tag<di::execution::async_make_socket>, IoUringScheduler const& self) {
1037 return di::make_deferred<AsyncSocket<di::Void>>(self.parent);
1038}
1039
1040inline auto IoUringContext::create() -> di::Result<IoUringContext> {
1042}
1043
1044template<di::concepts::Invocable<io_uring::SQE*> Fun>
1045inline void enqueue_io_operation(IoUringContext* context, OperationStateBase* op, Fun&& function) {
1046 auto sqe = context->m_handle.get_next_sqe();
1047 ASSERT(sqe);
1048 di::fill_n(reinterpret_cast<di::Byte*>(sqe.data()), sizeof(sqe), 0_b);
1049 di::invoke(di::forward<Fun>(function), sqe.data());
1050 sqe->user_data = reinterpret_cast<uintptr_t>(op);
1051}
1052
1054 context->m_queue.push(*op);
1055}
1056
1058 return context->get_scheduler();
1059}
1060
1062 return IoUringScheduler(this);
1063}
1064}
AcceptSocket(int base_fd)
Definition io_uring_context.h:654
auto base_fd() const -> int
Definition io_uring_context.h:656
Definition io_uring_context.h:621
auto mode() const -> OpenMode
Definition io_uring_context.h:628
friend auto tag_invoke(di::Tag< di::execution::async_read_some >, AsyncFile &self, di::Span< di::Byte > buffer, di::Optional< u64 > offset)
Definition io_uring_context.h:635
friend auto tag_invoke(di::Tag< di::execution::async_write_some >, AsyncFile &self, di::Span< di::Byte const > buffer, di::Optional< u64 > offset)
Definition io_uring_context.h:640
auto path() const -> di::Path const &
Definition io_uring_context.h:627
auto create_mode() const -> u16
Definition io_uring_context.h:629
auto parent() const -> IoUringContext *
Definition io_uring_context.h:626
auto fd() const -> int
Definition io_uring_context.h:631
AsyncFile(IoUringContext *parent, di::Path path, OpenMode mode, u16 create_mode)
Definition io_uring_context.h:623
void set_fd(int fd)
Definition io_uring_context.h:632
Definition io_uring_context.h:665
void set_fd(int fd)
Definition io_uring_context.h:675
AsyncSocket(IoUringContext *context, Args &&... args)
Definition io_uring_context.h:669
friend auto tag_invoke(di::Tag< di::execution::async_accept >, AsyncSocket &self)
Definition io_uring_context.h:704
friend auto tag_invoke(di::Tag< di::execution::async_write_some >, AsyncSocket &self, di::Span< di::Byte const > buffer, di::Optional< u64 >)
Definition io_uring_context.h:683
friend auto tag_invoke(di::Tag< di::execution::async_connect >, AsyncSocket &self, net::UnixAddress address)
Definition io_uring_context.h:692
friend auto tag_invoke(di::Tag< di::execution::async_bind >, AsyncSocket &self, net::UnixAddress address)
Definition io_uring_context.h:688
friend auto tag_invoke(di::Tag< di::execution::async_read_some >, AsyncSocket &self, di::Span< di::Byte > buffer, di::Optional< u64 >)
Definition io_uring_context.h:678
friend auto tag_invoke(di::Tag< di::execution::async_listen >, AsyncSocket &self, int count)
Definition io_uring_context.h:696
auto fd() const -> int
Definition io_uring_context.h:674
friend auto tag_invoke(di::Tag< di::execution::async_shutdown >, AsyncSocket &self, net::Shutdown how)
Definition io_uring_context.h:700
auto parent() const -> IoUringContext *
Definition io_uring_context.h:672
Definition io_uring.h:21
static auto create() -> di::Result< IoUringHandle >
auto get_next_sqe() -> di::Optional< SQE & >
Definition address.h:8
Definition error.h:7
struct io_uring_cqe CQE
Definition io_uring.h:13
Definition io_uring.h:10
void enqueue_operation(IoUringContext *, OperationStateBase *)
Definition io_uring_context.h:1053
auto get_scheduler(IoUringContext *) -> IoUringScheduler
Definition io_uring_context.h:1057
auto tag_invoke(di::Tag< di::execution::run >, AsyncFile &self)
Definition io_uring_context.h:1011
Shutdown
Definition socket.h:4
PosixError
Definition platform_error.h:7
@ bind
Definition system_call.h:36
@ listen
Definition system_call.h:37
auto system_call(Number number) -> di::Expected< R, di::BasicError >
Definition system_call.h:61
OpenMode
Definition sync_file.h:109
@ Readonly
Definition sync_file.h:109
@ WriteNew
Definition sync_file.h:109
@ AppendOnly
Definition sync_file.h:109
@ ReadWrite
Definition sync_file.h:109
@ WriteClobber
Definition sync_file.h:109
Type(IoUringContext *parent, di::ReferenceWrapper< AsyncSocket > socket, Rec &&receiver)
Definition io_uring_context.h:875
void did_complete(io_uring::CQE const *cqe) override
Definition io_uring_context.h:890
void execute() override
Definition io_uring_context.h:878
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:900
Definition io_uring_context.h:859
linux::AsyncSocket< AcceptSocket > AsyncSocket
Definition io_uring_context.h:863
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, AcceptSender const &self)
Definition io_uring_context.h:918
di::ReferenceWrapper< AsyncSocket > socket
Definition io_uring_context.h:869
void is_sender
Definition io_uring_context.h:861
di::CompletionSignatures< di::SetValue(di::ReferenceWrapper< AsyncSocket >), di::SetError(di::Error), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:865
IoUringContext * parent
Definition io_uring_context.h:868
friend auto tag_invoke(di::Tag< di::execution::connect >, AcceptSender self, Receiver receiver)
Definition io_uring_context.h:914
void execute() override
Definition io_uring_context.h:406
Type(IoUringContext *parent, int file_descriptor, net::UnixAddress address, Rec receiver)
Definition io_uring_context.h:396
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:428
Definition io_uring_context.h:382
int file_descriptor
Definition io_uring_context.h:389
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, BindSender const &self)
Definition io_uring_context.h:448
di::CompletionSignatures< di::SetValue(), di::SetError(di::Error), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:386
void is_sender
Definition io_uring_context.h:384
friend auto tag_invoke(di::Tag< di::execution::connect >, BindSender self, Receiver receiver)
Definition io_uring_context.h:443
IoUringContext * parent
Definition io_uring_context.h:388
net::UnixAddress address
Definition io_uring_context.h:390
Type(IoUringContext *parent, int file_descriptor, Rec receiver)
Definition io_uring_context.h:257
void did_complete(io_uring::CQE const *cqe) override
Definition io_uring_context.h:272
void execute() override
Definition io_uring_context.h:260
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:281
Definition io_uring_context.h:244
void is_sender
Definition io_uring_context.h:246
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, CloseSender const &self)
Definition io_uring_context.h:299
int file_descriptor
Definition io_uring_context.h:251
friend auto tag_invoke(di::Tag< di::execution::connect >, CloseSender self, Receiver receiver)
Definition io_uring_context.h:295
di::CompletionSignatures< di::SetValue(), di::SetError(di::Error), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:248
IoUringContext * parent
Definition io_uring_context.h:250
void execute() override
Definition io_uring_context.h:328
Type(IoUringContext *parent, int file_descriptor, net::UnixAddress address, Rec receiver)
Definition io_uring_context.h:318
void did_complete(io_uring::CQE const *cqe) override
Definition io_uring_context.h:348
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:357
Definition io_uring_context.h:304
di::CompletionSignatures< di::SetValue(), di::SetError(di::Error), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:308
int file_descriptor
Definition io_uring_context.h:311
void is_sender
Definition io_uring_context.h:306
IoUringContext * parent
Definition io_uring_context.h:310
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, ConnectSender const &self)
Definition io_uring_context.h:377
friend auto tag_invoke(di::Tag< di::execution::connect >, ConnectSender self, Receiver receiver)
Definition io_uring_context.h:372
net::UnixAddress address
Definition io_uring_context.h:312
Definition io_uring_context.h:87
constexpr friend auto tag_invoke(di::execution::GetCompletionScheduler< CPO >, Env const &self)
Definition io_uring_context.h:91
IoUringContext * parent
Definition io_uring_context.h:88
Definition io_uring_context.h:55
di::Queue< OperationStateBase, di::IntrusiveForwardList< OperationStateBase > > m_queue
Definition io_uring_context.h:73
void finish()
Definition io_uring_context.h:66
di::Atomic< bool > m_done
Definition io_uring_context.h:74
static auto create() -> di::Result< IoUringContext >
Definition io_uring_context.h:1040
io_uring::IoUringHandle m_handle
Definition io_uring_context.h:72
IoUringContext(IoUringContext &&other)
Definition io_uring_context.h:59
auto get_scheduler() -> IoUringScheduler
Definition io_uring_context.h:1061
Definition io_uring_context.h:77
friend auto tag_invoke(di::Tag< di::execution::schedule >, IoUringScheduler const &self) -> ScheduleSender
Definition io_uring_context.h:1023
IoUringContext * parent
Definition io_uring_context.h:79
constexpr friend auto operator==(IoUringScheduler const &, IoUringScheduler const &) -> bool=default
void execute() override
Definition io_uring_context.h:473
Type(IoUringContext *parent, int file_descriptor, int count, Rec receiver)
Definition io_uring_context.h:467
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:488
Definition io_uring_context.h:453
void is_sender
Definition io_uring_context.h:455
IoUringContext * parent
Definition io_uring_context.h:459
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, ListenSender const &self)
Definition io_uring_context.h:507
int file_descriptor
Definition io_uring_context.h:460
int count
Definition io_uring_context.h:461
di::CompletionSignatures< di::SetValue(), di::SetError(di::Error), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:457
friend auto tag_invoke(di::Tag< di::execution::connect >, ListenSender self, Receiver receiver)
Definition io_uring_context.h:503
void did_complete(io_uring::CQE const *cqe) override
Definition io_uring_context.h:826
void execute() override
Definition io_uring_context.h:813
Type(IoUringContext *parent, di::ReferenceWrapper< AsyncSocket > socket, Rec &&receiver)
Definition io_uring_context.h:810
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:836
Definition io_uring_context.h:794
di::ReferenceWrapper< AsyncSocket > socket
Definition io_uring_context.h:804
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, MakeSocketSender const &self)
Definition io_uring_context.h:854
friend auto tag_invoke(di::Tag< di::execution::connect >, MakeSocketSender self, Receiver receiver)
Definition io_uring_context.h:850
linux::AsyncSocket< di::Void > AsyncSocket
Definition io_uring_context.h:798
void is_sender
Definition io_uring_context.h:796
IoUringContext * parent
Definition io_uring_context.h:803
di::CompletionSignatures< di::SetValue(di::ReferenceWrapper< AsyncSocket >), di::SetError(di::Error), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:800
void did_complete(io_uring::CQE const *cqe) override
Definition io_uring_context.h:761
Type(IoUringContext *parent, di::ReferenceWrapper< AsyncFile > file, Rec &&receiver)
Definition io_uring_context.h:726
void execute() override
Definition io_uring_context.h:729
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:771
Definition io_uring_context.h:712
friend auto tag_invoke(di::Tag< di::execution::connect >, OpenSender self, Receiver receiver)
Definition io_uring_context.h:785
di::CompletionSignatures< di::SetValue(di::ReferenceWrapper< AsyncFile >), di::SetError(di::Error), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:716
di::ReferenceWrapper< AsyncFile > file
Definition io_uring_context.h:720
void is_sender
Definition io_uring_context.h:714
IoUringContext * parent
Definition io_uring_context.h:719
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, OpenSender const &self)
Definition io_uring_context.h:789
Definition io_uring_context.h:49
virtual void did_complete(io_uring::CQE const *)
Definition io_uring_context.h:52
void did_complete(io_uring::CQE const *cqe) override
Definition io_uring_context.h:135
void execute() override
Definition io_uring_context.h:120
Type(IoUringContext *parent, int file_descriptor, di::Span< di::Byte > buffer, di::Optional< u64 > offset, Rec receiver)
Definition io_uring_context.h:112
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:144
Definition io_uring_context.h:96
void is_sender
Definition io_uring_context.h:98
friend auto tag_invoke(di::Tag< di::execution::connect >, ReadSomeSender self, Receiver receiver)
Definition io_uring_context.h:160
int file_descriptor
Definition io_uring_context.h:104
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, ReadSomeSender const &self)
Definition io_uring_context.h:165
di::Span< di::Byte > buffer
Definition io_uring_context.h:105
IoUringContext * parent
Definition io_uring_context.h:103
di::Optional< u64 > offset
Definition io_uring_context.h:106
di::CompletionSignatures< di::SetValue(size_t), di::SetError(di::Error), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:100
friend auto tag_invoke(di::Tag< di::execution::set_value >, Rec1 &&self)
Definition io_uring_context.h:943
void is_receiver
Definition io_uring_context.h:939
friend auto tag_invoke(di::Tag< di::execution::set_stopped >, Rec1 &&self)
Definition io_uring_context.h:944
di::Function< void()> complete
Definition io_uring_context.h:941
auto base() const &-> Rec const &
Definition io_uring_context.h:955
Rec2(Rec *receiver)
Definition io_uring_context.h:953
auto base() &&-> Rec &&
Definition io_uring_context.h:956
Type(IoUringContext *parent, di::ReferenceWrapper< Object > object, Rec &&receiver)
Definition io_uring_context.h:962
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:978
Definition io_uring_context.h:924
di::SequenceTag is_sender
Definition io_uring_context.h:926
di::CompletionSignatures< di::SetValue(di::ReferenceWrapper< Object >), di::SetError(di::Error), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:928
friend auto tag_invoke(di::Tag< di::execution::subscribe >, RunSender self, Receiver receiver)
Definition io_uring_context.h:1001
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, RunSender const &self)
Definition io_uring_context.h:1005
IoUringContext * parent
Definition io_uring_context.h:931
di::ReferenceWrapper< Object > object
Definition io_uring_context.h:932
void execute() override
Definition io_uring_context.h:590
Type(IoUringContext *parent, Rec &&receiver)
Definition io_uring_context.h:588
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:599
Definition io_uring_context.h:575
friend auto tag_invoke(di::Tag< di::execution::connect >, ScheduleSender self, Rec receiver)
Definition io_uring_context.h:612
IoUringContext * parent
Definition io_uring_context.h:581
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, ScheduleSender const &self)
Definition io_uring_context.h:616
void is_sender
Definition io_uring_context.h:577
di::CompletionSignatures< di::SetValue(), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:579
void execute() override
Definition io_uring_context.h:529
Type(IoUringContext *parent, int file_descriptor, net::Shutdown how, Rec receiver)
Definition io_uring_context.h:526
void did_complete(io_uring::CQE const *cqe) override
Definition io_uring_context.h:542
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:551
Definition io_uring_context.h:512
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, ShutdownSender const &self)
Definition io_uring_context.h:570
int file_descriptor
Definition io_uring_context.h:519
IoUringContext * parent
Definition io_uring_context.h:518
void is_sender
Definition io_uring_context.h:514
di::CompletionSignatures< di::SetValue(), di::SetError(di::Error), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:516
friend auto tag_invoke(di::Tag< di::execution::connect >, ShutdownSender self, Receiver receiver)
Definition io_uring_context.h:566
net::Shutdown how
Definition io_uring_context.h:520
void did_complete(io_uring::CQE const *cqe) override
Definition io_uring_context.h:209
void execute() override
Definition io_uring_context.h:194
Type(IoUringContext *parent, int file_descriptor, di::Span< di::Byte const > buffer, di::Optional< u64 > offset, Rec receiver)
Definition io_uring_context.h:186
friend void tag_invoke(di::Tag< di::execution::start >, Type &self)
Definition io_uring_context.h:218
Definition io_uring_context.h:170
di::Span< di::Byte const > buffer
Definition io_uring_context.h:179
di::Optional< u64 > offset
Definition io_uring_context.h:180
int file_descriptor
Definition io_uring_context.h:178
void is_sender
Definition io_uring_context.h:172
friend auto tag_invoke(di::Tag< di::execution::connect >, WriteSomeSender self, Receiver receiver)
Definition io_uring_context.h:234
constexpr friend auto tag_invoke(di::Tag< di::execution::get_env >, WriteSomeSender const &self)
Definition io_uring_context.h:239
IoUringContext * parent
Definition io_uring_context.h:177
di::CompletionSignatures< di::SetValue(size_t), di::SetError(di::Error), di::SetStopped()> CompletionSignatures
Definition io_uring_context.h:174