C++ Library Extensions 2022.12.09
To help learn modern C++ programming
023-wakeup.cpp
Go to the documentation of this file.
1#include <tpf_output.hpp>
2#include <future>
3#include <thread>
4#include <windows.h>
5#include <atomic>
6#include <map>
7
8struct threads
9{
10 using id_t = int;
11 using handle_t = HANDLE;
12
13 using id_handle_t = std::pair<id_t, handle_t>;
14 using active_threads_t = std::map<id_t, handle_t>;
15
17 using recursive_mutex_t = std::recursive_mutex;
18
19 inline static mutex_t mutex;
20
21 using lock_guard_t = std::lock_guard<mutex_t>;
22 using unique_lock_t = std::unique_lock<mutex_t>;
23
26
27 inline static std::atomic<int> kill_id{-1};
29
30 static void WINAPI apc_wakeup(ULONG_PTR dwParam)
31 {
33 stream << "APC called " << tpf::endl;
34 }
35
36 static void register_me(int worker_id)
37 {
38 active_threads.insert(std::make_pair(worker_id, GetCurrentThread()));
39 }
40
41 static void unregister_me(int worker_id)
42 {
43 auto itr = active_threads.find(worker_id);
44
45 if(itr != active_threads.end())
46 active_threads.erase(itr);
47 }
48
50 {
51 private:
52 id_t m_id;
53 public:
54 register_thread(int worker_id): m_id{worker_id}
55 {
56 unique_lock_t lock(mutex);
58 threads::cv_register.notify_all();
59 }
61 {
62 unique_lock_t lock(mutex);
64 threads::cv_register.notify_all();
65 }
66 };
67
68 static void WINAPI worker(int my_id)
69 {
71
72 while(kill_id != my_id)
73 {
75 stream << "I am a worker thread ["
76 << std::this_thread::get_id() <<"],";
77 stream << " falling into deep sleep."<<tpf::nl;
78
79 {
80 unique_lock_t unique_lock(mutex);
82 }
83
84 // std::this_thread::sleep_for(std::chrono::seconds(1));
85
86 if(WAIT_IO_COMPLETION == ::SleepEx(100, TRUE))
87 {
89 stream <<"Wakeup --- " << tpf::endl;
90 break;
91 }
92 }
93
95 stream << "I am a worker thread ["
96 << std::this_thread::get_id() <<"],";
97 stream << " now exiting."<<tpf::nl;
98
99 {
100 unique_lock_t unique_lock(mutex);
102 }
103 }
104};
105
107{
108 try
109 {
110 size_t size = 0;
111
112 auto f0 = std::async(std::launch::async, &threads::worker, 0);
113 {
115
116 threads::cv_register.wait(lock, [size](){ return size !=
117 threads::active_threads.size(); });
118
119 size = threads::active_threads.size();
120
121
122 }
123 auto f1 = std::async(std::launch::async, &threads::worker, 1);
124 {
126
127 threads::cv_register.wait(lock, [size](){ return size !=
128 threads::active_threads.size(); });
129
130 size = threads::active_threads.size();
131 }
132 auto f2 = std::async(std::launch::async, &threads::worker, 2);
133 {
135
136 threads::cv_register.wait(lock, [size](){ return size !=
137 threads::active_threads.size(); });
138
139 size = threads::active_threads.size();
140 }
141
142 int count = 10;
143
144 while(count > 0)
145 {
147 stream <<"I am the main thread [" << std::this_thread::get_id()
148 << "], doing my own work." << tpf::nl;
149
150 {
153 }
154
155 auto hHandle = GetCurrentThread();
156
157 std::this_thread::sleep_for(std::chrono::seconds(1));
158
159 --count;
160
161 if(count == 3)
162 {
163 if(::QueueUserAPC(threads::apc_wakeup, threads::active_threads[0], 0))
164 {
166 stream <<count <<" - "<< 3 <<"Okay queued" << tpf::endl;
167 }
168 else
169 {
171 stream <<"Sorry failed to queue" << tpf::endl;
172 }
173
174 f0.wait();
175 }
176 else if(count == 5)
177 {
178 if(::QueueUserAPC(threads::apc_wakeup, threads::active_threads[1], 0))
179 {
181 stream <<count <<" - "<< 5 <<"Okay queued" << tpf::endl;
182 }
183 else
184 {
186 stream <<"Sorry failed to queue" << tpf::endl;
187 }
188
189 f1.wait();
190 }
191
192 else if(count == 7)
193 {
194 if(::QueueUserAPC(threads::apc_wakeup, threads::active_threads[2], 0))
195 {
197 stream <<count <<" - "<< 2 <<"Okay queued" << tpf::endl;
198 }
199 else
200 {
202 stream <<"Sorry failed to queue" << tpf::endl;
203 }
204
205 f2.wait();
206 }
207 }
208 }
209 catch(const std::exception& e)
210 {
211 std::cerr << e.what() << '\n';
212 }
213}
214
215int main()
216{
218}
std::mutex mutex
Definition: 022-mutex.cpp:12
std::atomic< int > count
Definition: 022-mutex.cpp:10
std::condition_variable condition_variable
Definition: 022-mutex.cpp:13
void example_for_wakeup()
Definition: 023-wakeup.cpp:106
int main()
Definition: 023-wakeup.cpp:215
tpf::sstream stream
constexpr auto flush
Definition: tpf_output.hpp:970
constexpr auto endl
Definition: tpf_output.hpp:973
constexpr auto nl
Definition: tpf_output.hpp:971
register_thread(int worker_id)
Definition: 023-wakeup.cpp:54
static mutex_t mutex
Definition: 023-wakeup.cpp:19
HANDLE handle_t
Definition: 023-wakeup.cpp:11
static void register_me(int worker_id)
Definition: 023-wakeup.cpp:36
std::unique_lock< mutex_t > unique_lock_t
Definition: 023-wakeup.cpp:22
static void WINAPI worker(int my_id)
Definition: 023-wakeup.cpp:68
int id_t
Definition: 023-wakeup.cpp:10
std::mutex mutex_t
Definition: 023-wakeup.cpp:16
std::recursive_mutex recursive_mutex_t
Definition: 023-wakeup.cpp:17
static active_threads_t active_threads
Definition: 023-wakeup.cpp:28
static std::atomic< int > kill_id
Definition: 023-wakeup.cpp:27
static std::condition_variable cv_register
Definition: 023-wakeup.cpp:24
static std::condition_variable cv_unregister
Definition: 023-wakeup.cpp:25
static void unregister_me(int worker_id)
Definition: 023-wakeup.cpp:41
std::map< id_t, handle_t > active_threads_t
Definition: 023-wakeup.cpp:14
static void WINAPI apc_wakeup(ULONG_PTR dwParam)
Definition: 023-wakeup.cpp:30
std::lock_guard< mutex_t > lock_guard_t
Definition: 023-wakeup.cpp:21
std::pair< id_t, handle_t > id_handle_t
Definition: 023-wakeup.cpp:13
Stream output operators << are implemented.