Iros
 
Loading...
Searching...
No Matches
run_loop.h
Go to the documentation of this file.
1#pragma once
2
12#include "di/platform/prelude.h"
15#include "di/util/immovable.h"
16
17namespace di::execution {
18template<concepts::Lock Lock = DefaultLock>
19class RunLoop {
20private:
21 struct OperationStateBase : IntrusiveForwardListNode<> {
22 public:
23 OperationStateBase(RunLoop* parent_) : parent(parent_) {}
24
25 virtual void execute() = 0;
26
27 RunLoop* parent { nullptr };
28 };
29
30 template<typename Receiver>
31 struct OperationStateT {
32 struct Type : OperationStateBase {
33 public:
34 Type(RunLoop* parent, Receiver&& receiver) : OperationStateBase(parent), m_receiver(util::move(receiver)) {}
35
36 void execute() override {
37 if (get_stop_token(m_receiver).stop_requested()) {
38 set_stopped(util::move(m_receiver));
39 } else {
40 set_value(util::move(m_receiver));
41 }
42 }
43
44 private:
45 void do_start() { this->parent->push_back(this); }
46
47 friend void tag_invoke(types::Tag<start>, Type& self) { self.do_start(); }
48
49 [[no_unique_address]] Receiver m_receiver;
50 };
51 };
52
53 template<typename Receiver>
54 using OperationState = meta::Type<OperationStateT<Receiver>>;
55
56 struct Scheduler {
57 private:
58 struct Sender {
59 using is_sender = void;
60
62
63 RunLoop* parent;
64
65 private:
66 template<concepts::ReceiverOf<CompletionSignatures> Receiver>
67 friend auto tag_invoke(types::Tag<connect>, Sender self, Receiver receiver) {
68 return OperationState<Receiver> { self.parent, util::move(receiver) };
69 }
70
71 struct Env {
72 RunLoop* parent;
73
74 template<typename CPO>
75 constexpr friend auto tag_invoke(GetCompletionScheduler<CPO>, Env const& self) {
76 return Scheduler { self.parent };
77 }
78 };
79
80 constexpr friend auto tag_invoke(types::Tag<get_env>, Sender const& self) { return Env { self.parent }; }
81 };
82
83 public:
84 RunLoop* parent { nullptr };
85
86 private:
87 friend auto tag_invoke(types::Tag<schedule>, Scheduler const& self) { return Sender { self.parent }; }
88
89 constexpr friend auto operator==(Scheduler const&, Scheduler const&) -> bool = default;
90 };
91
92 struct State {
93 Queue<OperationStateBase, IntrusiveForwardList<OperationStateBase>> queue;
94 bool stopped { false };
95 };
96
97public:
98 RunLoop() = default;
99 RunLoop(RunLoop&&) = delete;
100
101 auto get_scheduler() -> Scheduler { return Scheduler { this }; }
102
103 void run() {
104 while (auto* operation = pop_front()) {
105 operation->execute();
106 }
107 }
108
109 void finish() {
110 m_state.with_lock([](State& state) {
111 state.stopped = true;
112 });
113 }
114
115private:
116 auto pop_front() -> OperationStateBase* {
117 // FIXME: block instead of busy polling the queue when it is empty.
118 for (;;) {
119 auto [operation, is_stopped] = m_state.with_lock([](State& state) -> Tuple<OperationStateBase*, bool> {
120 // NOTE: even if a stop is requested, we must continue first empty the queue
121 // before returning stopping execution. Otherwise, the receiver contract
122 // will be violated (operation state will be destroyed without completion
123 // ever occuring).
124 if (!state.queue.empty()) {
125 return make_tuple(util::addressof(*state.queue.pop()), false);
126 }
127 if (state.stopped) {
128 return make_tuple(nullptr, true);
129 }
130 return make_tuple(nullptr, false);
131 });
132
133 if (is_stopped) {
134 return nullptr;
135 }
136 return operation;
137 }
138 }
139
140 void push_back(OperationStateBase* operation) {
141 m_state.with_lock([&](State& state) {
142 state.queue.push(*operation);
143 });
144 }
145
147};
148}
149
150namespace di {
152}
Definition forward_list_node.h:9
Definition run_loop.h:19
void run()
Definition run_loop.h:103
auto get_scheduler() -> Scheduler
Definition run_loop.h:101
RunLoop(RunLoop &&)=delete
void finish()
Definition run_loop.h:109
Definition tuple_forward_declaration.h:5
Definition bulk.h:30
constexpr auto execute
Executes a function on a scheduler.
Definition execute.h:45
constexpr auto set_stopped
Definition set_stopped.h:14
constexpr auto get_stop_token
Definition get_stop_token.h:25
constexpr auto set_value
Definition set_value.h:14
T::Type Type
Definition core.h:26
Synchronized(T &&) -> Synchronized< T >
di::meta::Decay< decltype(T)> Tag
Definition tag_invoke.h:28
Definition vocab.h:96
Definition zstring_parser.h:9
constexpr tag_invoke_detail::TagInvokeFn tag_invoke
Definition tag_invoke.h:22
constexpr auto make_tuple(Args &&... args)
Definition make_tuple.h:9
friend void tag_invoke(types::Tag< start >, Type &self)
Definition run_loop.h:47
void execute() override
Definition run_loop.h:36
Type(RunLoop *parent, Receiver &&receiver)
Definition run_loop.h:34
Definition set_stopped.h:6
Definition set_value.h:6
Definition completion_signuatures.h:7