24#ifdef DIUS_PLATFORM_LINUX
29#ifdef DIUS_PLATFORM_LINUX
38 OperationStateBase(
IoContext* parent_) : parent(parent_) {}
40 virtual void execute() = 0;
48 template<
typename CPO>
54 template<
typename Receiver>
55 struct OperationStateT {
56 struct Type : OperationStateBase {
58 Type(
IoContext* parent, Receiver&& receiver) : OperationStateBase(parent), m_receiver(
di::move(receiver)) {}
69 void do_start() { this->parent->push_back(
this); }
73 [[no_unique_address]] Receiver m_receiver;
77 template<
typename Receiver>
83 using is_sender = void;
90 template<di::ReceiverOf<CompletionSignatures> Receiver>
92 return OperationState<Receiver> { self.parent, di::move(receiver) };
96 return Env { self.parent };
100 struct AsyncFile : di::Immovable {
103 : m_parent(parent), m_path(path), m_mode(mode), m_create_mode(create_mode) {}
106 struct ReadSomeSender {
108 using is_sender = void;
110 using CompletionSignatures = di::CompletionSignatures<di::SetValue(
size_t), di::SetError(
di::Error)>;
114 di::Span<di::Byte> buffer;
115 di::Optional<u64> offset;
118 template<
typename Rec>
119 struct OperationStateT {
131 : sync_file.read_some(self.
buffer);
132 if (!result.has_value()) {
141 template<di::ReceiverOf<CompletionSignatures> Receiver>
144 template<di::ReceiverOf<CompletionSignatures> Receiver>
146 return OperationState<Receiver> { self.parent, self.file_descriptor, self.buffer, self.offset,
147 di::move(receiver) };
151 return Env { self.parent };
155 struct WriteSomeSender {
157 using is_sender = void;
159 using CompletionSignatures = di::CompletionSignatures<di::SetValue(
size_t), di::SetError(
di::Error)>;
163 di::Span<di::Byte const> buffer;
164 di::Optional<u64> offset;
167 template<
typename Rec>
168 struct OperationStateT {
180 : sync_file.write_some(self.
buffer);
190 template<di::ReceiverOf<CompletionSignatures> Receiver>
193 template<di::ReceiverOf<CompletionSignatures> Receiver>
195 return OperationState<Receiver> { self.parent, self.file_descriptor, self.buffer, self.offset,
196 di::move(receiver) };
200 return Env { self.parent };
206 using is_sender = void;
208 using CompletionSignatures =
209 di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<AsyncFile>), di::SetError(
di::Error)>;
212 di::ReferenceWrapper<AsyncFile> file;
215 template<
typename Rec>
216 struct OperationStateT {
225 self.
file.get().m_create_mode);
229 self.
file.get().m_file = di::move(result).value();
236 template<di::ReceiverOf<CompletionSignatures> Receiver>
239 template<di::ReceiverOf<CompletionSignatures> Receiver>
241 return OperationState<Receiver> { self.parent, self.file, di::move(receiver) };
245 return Env { self.parent };
251 using is_sender = di::SequenceTag;
253 using CompletionSignatures =
254 di::CompletionSignatures<di::SetValue(di::ReferenceWrapper<AsyncFile>), di::SetError(
di::Error)>;
257 di::ReferenceWrapper<AsyncFile> file;
260 template<
typename Rec>
261 struct OperationStateT {
269 self.parent->close();
272 self.parent->close();
285 auto result =
file.get().m_file.close();
305 template<
typename Receiver>
308 template<di::concepts::Receiver Receiver>
310 return OperationState<Receiver> { self.parent, self.file, di::move(receiver),
di::nullopt };
321 return ReadSomeSender { self.m_parent, self.m_file.file_descriptor(), buffer, offset };
325 di::Span<di::Byte const> buffer, di::Optional<u64> offset) {
326 return WriteSomeSender { self.m_parent, self.m_file.file_descriptor(), buffer, offset };
330 return RunSender(self.m_parent,
di::ref(self));
341 IoContext* parent {
nullptr };
347 u16 create_mode = 0666) {
351 constexpr friend auto operator==(Scheduler
const&, Scheduler
const&) ->
bool =
default;
357 di::Queue<OperationStateBase, di::IntrusiveForwardList<OperationStateBase>> queue;
358 bool stopped {
false };
361 auto state() -> di::Synchronized<State>& {
return m_state.value(); }
371 while (
auto* operation = pop_front()) {
372 operation->execute();
377 state().with_lock([](State& state) {
378 state.stopped =
true;
385 auto pop_front() -> OperationStateBase* {
389 if (!state.queue.empty()) {
407 void push_back(OperationStateBase* operation) {
408 state().with_lock([&](State& state) {
409 state.queue.push(*operation);
413 di::MovableBox<di::Synchronized<State>> m_state;
Definition forward_list_node.h:9
Definition defer_construct.h:8
Definition reference_wrapper.h:14
Definition optional_forward_declaration.h:5
Definition span_forward_declaration.h:10
Definition tuple_forward_declaration.h:5
Definition io_context.h:34
void run()
Definition io_context.h:370
void finish()
Definition io_context.h:376
static auto create() -> di::Result< IoContext >
Definition io_context.h:364
IoContext(IoContext &&)=default
auto get_scheduler() -> Scheduler
Definition io_context.h:368
Definition sync_file.h:40
@ No
Definition sync_file.h:42
#define DI_IMMOVABLE_NO_UNIQUE_ADDRESS
Definition compiler.h:15
PathViewImpl< string::TransparentEncoding > PathView
Definition path_view.h:11
MakeEnv< EmptyEnv, With< Tag< get_stop_token >, StopToken > > Env
Definition ensure_started.h:52
constexpr auto set_error
Definition set_error.h:14
constexpr auto set_next
Set the next sender of a sequence.
Definition sequence_sender.h:77
constexpr auto start
Definition start.h:20
constexpr auto make_env
Create an environment with overrides for queries.
Definition make_env.h:147
constexpr auto set_stopped
Definition set_stopped.h:14
constexpr auto get_sequence_cardinality
A query that returns the cardinality of a sequence.
Definition get_sequence_cardinality.h:45
constexpr auto with
Specify an override for an environment query.
Definition make_env.h:112
constexpr auto get_stop_token
Definition get_stop_token.h:25
constexpr auto connect
Definition connect.h:42
constexpr auto set_value
Definition set_value.h:14
di::meta::Decay< decltype(T)> Tag
Definition tag_invoke.h:28
__UINT16_TYPE__ u16
Definition integers.h:10
Expected< T, Error > Result
Definition result.h:8
StatusCode< Erased< long > > Error
Definition error.h:8
Definition zstring_parser.h:9
constexpr auto make_deferred
Creates a deferred function object.
Definition make_deferred.h:75
constexpr tag_invoke_detail::TagInvokeFn tag_invoke
Definition tag_invoke.h:22
constexpr auto ref
Definition reference_wrapper.h:98
constexpr auto nullopt
Definition nullopt.h:15
constexpr auto make_tuple(Args &&... args)
Definition make_tuple.h:9
constexpr auto c_
A value of type Constexpr<val>.
Definition constexpr.h:252
Definition directory_entry.h:11
auto open_sync(di::PathView path, OpenMode open_mode, u16 create_mode=0666) -> di::Expected< SyncFile, di::GenericCode >
Definition sync_file.cpp:76
OpenMode
Definition sync_file.h:109
Defines the sequence sender concepts and related CPOs.
Definition get_completion_scheduler.h:10
Definition set_stopped.h:6
Definition completion_signuatures.h:7
Type(IoContext *parent, Receiver &&receiver)
Definition io_context.h:58
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:71
void execute() override
Definition io_context.h:60
Definition io_context.h:217
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:223
Rec receiver
Definition io_context.h:220
di::ReferenceWrapper< AsyncFile > file
Definition io_context.h:219
IoContext * parent
Definition io_context.h:218
Definition io_context.h:120
Rec receiver
Definition io_context.h:125
int file_descriptor
Definition io_context.h:122
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:128
di::Span< di::Byte > buffer
Definition io_context.h:123
IoContext * parent
Definition io_context.h:121
di::Optional< u64 > offset
Definition io_context.h:124
Definition io_context.h:263
void is_receiver
Definition io_context.h:264
friend void tag_invoke(di::Tag< execution::set_stopped >, Receiver &&self)
Definition io_context.h:271
Type * parent
Definition io_context.h:266
friend void tag_invoke(di::Tag< execution::set_value >, Receiver &&self)
Definition io_context.h:268
Definition io_context.h:262
IoContext * parent
Definition io_context.h:279
Rec receiver
Definition io_context.h:281
DI_IMMOVABLE_NO_UNIQUE_ADDRESS di::Optional< Op > op
Definition io_context.h:282
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:294
void close()
Definition io_context.h:284
di::ReferenceWrapper< AsyncFile > file
Definition io_context.h:280
di::meta::NextSenderOf< Rec, OpenSender > Next
Definition io_context.h:276
di::meta::ConnectResult< Next, Receiver > Op
Definition io_context.h:277
Definition io_context.h:169
di::Optional< u64 > offset
Definition io_context.h:173
Rec receiver
Definition io_context.h:174
friend void tag_invoke(di::Tag< execution::start >, Type &self)
Definition io_context.h:177
int file_descriptor
Definition io_context.h:171
di::Span< di::Byte const > buffer
Definition io_context.h:172
IoContext * parent
Definition io_context.h:170
Definition io_uring_context.h:63