Guitarix
Loading...
Searching...
No Matches
ParallelThread.h
Go to the documentation of this file.
1/*
2 * ParallelThread.h
3 *
4 * SPDX-License-Identifier: BSD-3-Clause
5 *
6 * Copyright (C) 2024 brummer <brummer@web.de>
7 */
8
9/****************************************************************
10 ** ParallelThread - class to run a processes in a parallel thread
11 * requires minimum c++17
12 * works best with c++20 (std::atomic::wait)
13 *
14 * ParallelThread aims to be suitable in real-time processes
15 * to provide a parallel processor.
16 *
17 * But ParallelThread could be used in trivial environments,
18 * as worker thread, as well.
19 *
20 * usage:
21 * //Create a instance for ParallelThread
22 * ParallelThread proc;
23 * // start the thread
24 * proc.start();
25 * // optional set a name for the thread, default name is "anonymous"
26 * that may be helpful for diagnostics.
27 * proc.setThreadName("YourName");
28 * // optional set the scheduling class and the priority (as int32_t)
29 * proc.setPriority(priority, scheduling_class)
30 * // optional set the timeout value for the waiting functions
31 * in microseconds. Default is 400 micro seconds.
32 * This is a safety guard to avoid dead looks.
33 * When overrun this time, the process will break and data may be lost.
34 * A reasonable value for usage in real-time could be calculated by
35 * proc.setTimeOut(std::max(100,static_cast<int>((bufferSize/(sampleRate*0.000001))*0.1)));
36 * // set the function to run in the parallel thread
37 * function should be defined in YourClass as void YourFunction();
38 * proc.set<YourClass, &YourClass::YourFunction>(*this);
39 * // now anything is setup to run the thread,
40 * so try to get the processing pointer by getProcess()
41 * getProcess() check if the thread is in waiting state, if not,
42 * it waits as max two times the time set by setTimeOut()
43 * so be prepared to run the function in the main process
44 * in case the parallel process is not ready to run.
45 * That is the worst case, and shouldn't happen under normal
46 * circumstances.
47 * if getProcess() return true, runProcess(), otherwise
48 * run the function in the main thread.
49 * if (proc.getProcess()) proc.runProcess() else functionToRun();
50 * // optional at the point were processed data needs to be merged,
51 * wait for the data. In case there is no data processed,
52 * or the data is already ready, processWait() returns directly
53 * proc.processWait();
54 * processWait() waits maximal 5 times the timeout. If the
55 * process isn't ready in that time, the data is lost and
56 * processWait() break to avoid Xruns or dead looks.
57 * That is the worst case and shouldn't happen
58 * under normal circumstances.
59 * // Finally stop the thread before exit.
60 * proc.stop();
61 */
62
63#if defined(_WIN32)
64#define MINGW_STDTHREAD_REDUNDANCY_WARNING
65#endif
66
67#include <atomic>
68#include <cstdint>
69#include <unistd.h>
70#include <mutex>
71#include <thread>
72#include <cstring>
73#include <ctime>
74#include <condition_variable>
75
76#include <pthread.h>
77
78#pragma once
79
80#ifndef PARALLEL_THREAD_H_
81#define PARALLEL_THREAD_H_
82
84{
85public:
91
92 template <class C, void (C::*Function)()>
93 void set(C* instance) {
94 instPtr[i] = instance;
96 }
97
98 template <uint32_t s, class C, void (C::*Function)()>
99 void set(C* instance) {
100 instPtr[s] = instance;
102 }
103
104 void setProcessor(uint32_t i_) {
105 i = i_;
106 }
107
108 void process() const {
109 return memberFunc[i](instPtr[i]);
110 }
111
112 void dummyFunc() {}
113
114private:
115 typedef void* InstancePtr;
116 typedef void (*MemberFunc)(InstancePtr);
117
118 template <class C, void (C::*Function)()>
119 static inline void wrap(InstancePtr instance) {
120 return (static_cast<C*>(instance)->*Function)();
121 }
122
123
126 uint32_t i;
127};
128
130{
131public:
132 //Constructor
134 : pRun(false)
135 ,pWait(false)
136 ,isWaiting(false)
137 #if __cplusplus > 201703L
138 ,pWorkCond(false)
139 #endif
140 {
141 timeoutPeriod = 400;
142 threadName = "anonymous";
143 init();
144 }
145
146 //Destructor
148 if( pRun.load(std::memory_order_acquire) ) {
149 stop();
150 };
151 }
152
153 // start the new thread
154 void start() noexcept {
155 if (!isRunning()) run();
156 }
157
158 // helper function: check if thread is running
159 inline bool isRunning() const noexcept {
160 return (pRun.load(std::memory_order_acquire) &&
161 pThd.joinable());
162 }
163
164 // set a name for the thread (may help on diagnostics)
165 void setThreadName(std::string name) noexcept {
166 threadName = name;
167 }
168
169 // set thread policy and priority class, this may fail silent
170 void setPriority(int32_t rt_prio, int32_t rt_policy) noexcept {
171 if (isRunning())
172 setThreadPolicy(rt_prio, rt_policy);
173 }
174
175 // set the time out for the thread waiting functions in milliseconds
176 void setTimeOut(uint32_t timeout) noexcept {
177 timeoutPeriod = timeout;
178 }
179
180 // try to get the process pointer, return false when thread is busy
181 inline bool getProcess() noexcept {
182 if (isRunning() && !getState()) {
183 int maxDuration = 0;
184 while (!getState()) {
185 pthread_mutex_lock(&pWaitProc);
186 if (pthread_cond_timedwait(&pProcCond, &pWaitProc, getTimeOut()) == ETIMEDOUT) {
187 pthread_mutex_unlock(&pWaitProc);
188 maxDuration +=1;
189 //fprintf(stderr, "%s wait for process %i\n", threadName.c_str(), maxDuration);
190 if (maxDuration > 2) {
191 //fprintf(stderr, "%s break waitForProcess\n", threadName.c_str());
192 break;
193 }
194 } else {
195 pthread_mutex_unlock(&pWaitProc);;
196 }
197 }
198 }
199 if (getState()) pWait.store(true, std::memory_order_release);
200 return getState();
201 }
202
203 // notify the thread that work is to be done
204 inline void runProcess() noexcept {
205 #if __cplusplus > 201703L
206 pWorkCond.store(true);
207 #endif
208 pWorkCond.notify_one();
209 }
210
211 // wait for the processed data from the thread,
212 // in worst case this may fail silent
213 // when to much time expires (5 * timeOut time)
214 // to avoid Xruns or dead looks.
215 inline void processWait() noexcept {
216 if (isRunning()) {
217 int maxDuration = 0;
218 while (pWait.load(std::memory_order_acquire)) {
219 pthread_mutex_lock(&pWaitProc);
220 if (pthread_cond_timedwait(&pProcCond, &pWaitProc, getTimeOut()) == ETIMEDOUT) {
221 pthread_mutex_unlock(&pWaitProc);
222 maxDuration +=1;
223 //fprintf(stderr, "%s wait for data %i\n", threadName.c_str(), maxDuration);
224 if (maxDuration > 5) {
225 pWait.store(false, std::memory_order_release);
226 //fprintf(stderr, "%s break processWait\n", threadName.c_str());
227 }
228 } else {
229 pthread_mutex_unlock(&pWaitProc);;
230 }
231 }
232 //fprintf(stderr, "%s processed data %i\n", threadName.c_str(), maxDuration);
233 }
234 }
235
236 // stop the thread (at least on Destruction)
237 void stop() noexcept {
238 if (isRunning()) {
239 pRun.store(false, std::memory_order_release);
240 if (pThd.joinable()) {
242 #if __cplusplus > 201703L
243 pWorkCond.store(true);
244 #endif
245 pWorkCond.notify_one();
246 pThd.join();
247 }
248 }
249 }
250
251
252private:
253 std::atomic<bool> pRun;
254 std::atomic<bool> pWait;
255 std::atomic<bool> isWaiting;
256
257 #if __cplusplus > 201703L
258 std::atomic<bool> pWorkCond;
259 #else
260 std::mutex pWaitWork;
261 std::condition_variable pWorkCond;
262 #endif
263
264 std::thread pThd;
265 std::string threadName;
267
268 pthread_mutex_t pWaitProc;
269 pthread_cond_t pProcCond;
270 struct timespec timeOut;
271
272 // init pthread_cond_t and pthread_mutex_t
273 inline void init() noexcept {
274 pthread_condattr_t cond_attr;
275 pthread_condattr_init(&cond_attr);
276 pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC);
277 pthread_cond_init(&pProcCond, &cond_attr);
278 pthread_condattr_destroy(&cond_attr);
279 pWaitProc = PTHREAD_MUTEX_INITIALIZER;
280 }
281
282 // run the thread, wait for signal and process the given function
283 inline void run() noexcept {
284 if( pRun.load(std::memory_order_acquire) ) {
285 stop();
286 };
287 pRun.store(true, std::memory_order_release);
288 pThd = std::thread([this]() {
289 #if __cplusplus <= 201703L
290 std::unique_lock<std::mutex> lk(pWaitWork);
291 #endif
292 while (pRun.load(std::memory_order_acquire)) {
293 isWaiting.store(true, std::memory_order_release);
294 pthread_cond_broadcast(&pProcCond);
295 // wait for signal from parent thread that work is to do
296 #if __cplusplus > 201703L
297 pWorkCond.wait(false);
298 pWorkCond.store(false);
299 #else
300 pWorkCond.wait(lk);
301 #endif
302 isWaiting.store(false, std::memory_order_release);
303 pWait.store(true, std::memory_order_release);
304 process();
305 pWait.store(false, std::memory_order_release);
306 }
307 // when done
308 });
309 }
310
311 // check if thread is busy, return true when not
312 inline bool getState() const noexcept {
313 return isWaiting.load(std::memory_order_acquire);
314 }
315
316 // set thread scheduling class and priority level
317 inline void setThreadPolicy(int32_t rt_prio, int32_t rt_policy) noexcept {
318 #if defined(__linux__) || defined(_UNIX) || defined(__APPLE__) || defined(_OS_UNIX_)
319 sched_param sch_params;
320 if (rt_prio <= 0) {
321 rt_prio = sched_get_priority_max(rt_policy);
322 }
323 if ((rt_prio/5) > 0) rt_prio = rt_prio/5;
324 sch_params.sched_priority = rt_prio;
325 if (pthread_setschedparam(pThd.native_handle(), rt_policy, &sch_params)) {
326 fprintf(stderr, "ParallelThread:%s fail to set priority %i shed %i\n", threadName.c_str(), rt_prio, rt_policy);
327 }
328 #elif defined(_WIN32)
329 // REALTIME_PRIORITY_CLASS, THREAD_PRIORITY_NORMAL
330 if (SetThreadPriority(pThd.native_handle(), 24)) {
331 fprintf(stderr, "ParallelThread:%s fail to set priority\n", threadName.c_str());
332 }
333 #else
334 //system does not supports thread priority!
335 #endif
336 }
337
338 // calculate the timeout for the thread wait functions
339 inline struct timespec *getTimeOut() noexcept {
340 clock_gettime (CLOCK_MONOTONIC, &timeOut);
341 long int at = (timeoutPeriod * 1000);
342 if (timeOut.tv_nsec + at > 1000000000) {
343 timeOut.tv_sec +=1;
344 at -= 1000000000;
345 }
346 timeOut.tv_nsec += at;
347 return &timeOut;
348 }
349
350 // simple implement clock_gettime for windows (8)
351 #if defined(_WIN32)
352 int clock_gettime(int, struct timespec *spec) {
353 int64_t wTime;
354 GetSystemTimePreciseAsFileTime((FILETIME*)&wTime);
355 wTime -=116444736000000000LL; //1jan1601 to 1jan1970
356 spec->tv_sec =wTime / 10000000LL; //seconds
357 spec->tv_nsec =wTime % 10000000LL *100; //nano-seconds
358 return 0;
359 }
360 #endif
361
362};
363
364#endif
std::atomic< bool > pWait
void init() noexcept
bool getProcess() noexcept
std::atomic< bool > pRun
uint32_t timeoutPeriod
void setPriority(int32_t rt_prio, int32_t rt_policy) noexcept
pthread_cond_t pProcCond
void setThreadName(std::string name) noexcept
std::thread pThd
void processWait() noexcept
void setTimeOut(uint32_t timeout) noexcept
bool getState() const noexcept
struct timespec timeOut
std::atomic< bool > isWaiting
pthread_mutex_t pWaitProc
void stop() noexcept
std::condition_variable pWorkCond
void setThreadPolicy(int32_t rt_prio, int32_t rt_policy) noexcept
void run() noexcept
void runProcess() noexcept
bool isRunning() const noexcept
std::mutex pWaitWork
struct timespec * getTimeOut() noexcept
std::string threadName
void start() noexcept
MemberFunc memberFunc[2]
InstancePtr instPtr[2]
void set(C *instance)
static void wrap(InstancePtr instance)
void set(C *instance)
void process() const
void(* MemberFunc)(InstancePtr)
void * InstancePtr
void setProcessor(uint32_t i_)