Iros
 
Loading...
Searching...
No Matches
in_place_stop_source.h
Go to the documentation of this file.
1#pragma once
2
5#include "di/sync/atomic.h"
9
10namespace di::sync {
12private:
13 template<typename>
14 friend class InPlaceStopCallback;
15
17
18 constexpr static u8 stop_flag = 1;
19 constexpr static u8 locked_flag = 2;
20
21public:
22 InPlaceStopSource() = default;
23
25
26 ~InPlaceStopSource() { DI_ASSERT(m_callbacks.empty()); }
27
28 [[nodiscard]] auto get_stop_token() const -> InPlaceStopToken;
29 [[nodiscard]] auto stop_requested() const -> bool { return m_state.load(MemoryOrder::Acquire) & stop_flag; }
30
31 auto request_stop() -> bool {
32 if (!lock_unless_stopped(true)) {
33 // Already stopped, return false.
34 return false;
35 }
36
37 // Remember the thread id which requested the stop.
38 m_stopper_thread = get_current_thread_id();
39
40 // With the lock now aquired, iterate through each stop callback.
41 while (!m_callbacks.empty()) {
42 auto& callback = *m_callbacks.front();
43
44 // Mark the callback as being executed, with relaxed memory order
45 // since this is synchronized by the spin lock.
46 bool did_destroy_itself = false;
47 callback.m_did_destruct_in_same_thread.store(util::addressof(did_destroy_itself), MemoryOrder::Relaxed);
48
49 // Remove the current callback from the list.
50 m_callbacks.pop_front();
51
52 // Unlock the list, allowing callback destructors the ability to
53 // lock the list and remove themselves.
54 unlock(true);
55
56 // Execute the callback.
57 callback.execute();
58
59 // Mark the callback as already done, if the object still exists.
60 if (!did_destroy_itself) {
61 callback.m_already_executed.store(true, MemoryOrder::Release);
62 }
63
64 // Reaquire the lock.
65 lock(true);
66 }
67
68 unlock(true);
69 return true;
70 }
71
72private:
73 auto try_add_callback(detail::InPlaceStopCallbackBase* callback) const -> bool {
74 if (!lock_unless_stopped(false)) {
75 return false;
76 }
77
78 m_callbacks.push_front(*callback);
79
80 unlock(false);
81 return true;
82 }
83
84 void remove_callback(detail::InPlaceStopCallbackBase* callback) const {
85 // Simple case: no stop request has happened.
86 if (lock_unless_stopped(false)) {
87 m_callbacks.erase(*callback);
88
89 unlock(false);
90 return;
91 }
92
93 // If a stop request occurred, synchronize on the spin lock.
94 lock(true);
95
96 auto stopper_thread = m_stopper_thread;
97 auto* did_destruct_in_same_thread = callback->m_did_destruct_in_same_thread.load(MemoryOrder::Relaxed);
98 bool going_to_be_executed = !!did_destruct_in_same_thread;
99
100 // Remove ourselves from the list with the lock held.
101 if (!going_to_be_executed) {
102 m_callbacks.erase(*callback);
103 }
104
105 // Now unlock the spin lock.
106 unlock(true);
107
108 if (going_to_be_executed) {
109 // If we are being executed by the current thread, notify the callback runner this object
110 // has been destroyed. This is not synchronized because we must running on the same thread.
111 if (stopper_thread == get_current_thread_id()) {
112 *did_destruct_in_same_thread = true;
113 } else {
114 // Otherwise, wait for the callback's execution to complete before finishing.
115 while (!callback->m_already_executed.load(MemoryOrder::Acquire)) {
116 ;
117 }
118 }
119 }
120 }
121
122 auto lock_unless_stopped(bool set_stop) const -> bool {
123 u8 flags = set_stop ? (stop_flag | locked_flag) : locked_flag;
124
125 u8 expected = m_state.load(MemoryOrder::Relaxed);
126 for (;;) {
127 // If already locked, return false.
128 if (expected & stop_flag) {
129 return false;
130 }
131
132 if (m_state.compare_exchange_weak(expected, flags, MemoryOrder::AcquireRelease, MemoryOrder::Relaxed)) {
133 // Lock aquired, return true.
134 return true;
135 }
136 }
137 }
138
139 void lock(bool set_stop) const {
140 u8 flags = set_stop ? (stop_flag | locked_flag) : locked_flag;
141 while (!m_state.exchange(flags, MemoryOrder::Acquire)) {
142 ;
143 }
144 }
145
146 void unlock(bool set_stop) const {
147 u8 flags = set_stop ? stop_flag : 0;
148 m_state.store(flags, MemoryOrder::Release);
149 }
150
151 mutable container::IntrusiveList<detail::InPlaceStopCallbackBase> m_callbacks;
152 mutable Atomic<u8> m_state { 0 };
153 ThreadId m_stopper_thread;
154};
155}
#define DI_ASSERT(...)
Definition assert_bool.h:7
auto get_stop_token() const -> InPlaceStopToken
Definition in_place_stop_token.h:31
auto request_stop() -> bool
Definition in_place_stop_source.h:31
friend class InPlaceStopCallback
Definition in_place_stop_source.h:14
~InPlaceStopSource()
Definition in_place_stop_source.h:26
auto stop_requested() const -> bool
Definition in_place_stop_source.h:29
InPlaceStopSource(InPlaceStopSource &&)=delete
Definition in_place_stop_token.h:6
Definition in_place_stop_callback_base.h:8
std::thread::id ThreadId
Definition custom.h:31
Definition atomic.h:12
@ Relaxed
Definition memory_order.h:7
@ AcquireRelease
Definition memory_order.h:11
@ Acquire
Definition memory_order.h:9
@ Release
Definition memory_order.h:10
__UINT8_TYPE__ u8
Definition integers.h:9
auto get_current_thread_id() -> ThreadId
Definition custom.h:33