asynchrony 0.0.0
Add asynchrony to your C++ applications using standard C++20
Loading...
Searching...
No Matches
periodic_worker.hpp
1/*
2 asynchrony-lib : Add asynchrony to your apps
3
4 BSD 3-Clause License
5
6 Copyright (c) 2021, Siddiq Software LLC
7 All rights reserved.
8
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
11
12 1. Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
14
15 2. Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
18
19 3. Neither the name of the copyright holder nor the names of its
20 contributors may be used to endorse or promote products derived from
21 this software without specific prior written permission.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
26 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
27 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
29 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 */
34
35#pragma once
36#include <siddiqsoft/RunOnEnd.hpp>
37#ifndef PERIODIC_WORKER_HPP
38#define PERIODIC_WORKER_HPP
39
40
41#include <iostream>
42#include <functional>
43#include <memory>
44#include <thread>
45#include <mutex>
46#include <shared_mutex>
47#include <deque>
48#include <semaphore>
49#include <stop_token>
50#include <utility>
51#include <exception>
52#include <source_location>
53#include <atomic>
54
55#if defined(_Linux_) || defined(__linux__) || defined(__linux) || (defined(__APPLE__) && defined(__MACH__))
56#include <pthread.h>
57#elif defined(_WIN32) || defined(WIN32) || defined(_WIN64) || defined(WIN64)
58#include <windows.h>
59#include <processthreadsapi.h>
60#endif
61
62#include "private/common.hpp"
63
64namespace siddiqsoft
65{
69 template <int Pri = 0>
70 requires((Pri >= -10) && (Pri <= 10))
71 struct periodic_worker
72 {
73 static constexpr std::chrono::milliseconds DEFAULT_WAIT_FOR_NEXT_ITEM_MS {1500};
74
75 public:
76 // Not copy-able
77 periodic_worker(periodic_worker&) = delete;
78 auto& operator=(periodic_worker&) = delete;
79 // Not move-able
80 periodic_worker(periodic_worker&&) = delete;
81 auto& operator=(periodic_worker&&) = delete;
82
83
94 {
95#if defined(DEBUG) || defined(_DEBUG)
96 std::cerr << std::format(
97 "Shutting down periodic worker [{}] with outstanding callbacks [{}] and total invoke count [{}]\n",
98 threadName,
99 outstandingCallback.load(std::memory_order_acquire),
100 invokeCounter.load(std::memory_order_acquire));
101#endif
102
103 // This is critical step since we wait on the semaphore for a long time (keeps threads suspended) and if we do not
104 // decrease this interval then the shutdown will be quite delayed.
105 // FIX: Use atomic store with release semantics to safely modify invokePeriod from destructor
106 invokePeriod.store(std::chrono::microseconds(0), std::memory_order_release);
107 // Empty signal to get our thread to wake up
108 signal.release();
109
110#if defined(DEBUG) || defined(_DEBUG)
111 std::cerr << std::format("Signaled shutdown for periodic worker [{}], waiting for thread to join...\n", threadName);
112#endif
113
114 try {
115 // Notify the thread to stop.. and wait for a bit.. and then instead of joining we should just let the jthread
116 // destroy. Ask thread to shutdown and if joinable.. join.
117 processor.request_stop();
118 std::this_thread::sleep_for(std::chrono::milliseconds(100));
119 // if (processor.joinable()) processor.join();
120 }
121 catch (const std::exception& ex) {
122 std::cerr << std::format("Exception while shutting down periodic worker [{}]: {}", threadName, ex.what());
123 }
124
125#if defined(DEBUG) || defined(_DEBUG)
126 std::cerr << std::format("End of destructor for periodic worker [{}], waiting for thread to join...\n", threadName);
127#endif
128 }
129
133 void forceCleanupTerminate(const std::source_location& sl = std::source_location::current())
134 {
135 std::call_once(flag_forceCleanupTerminate, [&]() {
136 try {
137 // Notify the thread to stop.. and wait for a bit.. and then instead of joining we should just let the jthread
138 // destroy. Ask thread to shutdown and if joinable.. join.
139 processor.request_stop();
140 std::this_thread::sleep_for(std::chrono::milliseconds(100));
141#if defined(_Linux_) || defined(__linux__) || defined(__linux) || (defined(__APPLE__) && defined(__MACH__))
142 auto nativeHandle = processor.native_handle();
143 std::cerr << std::format(
144 "forceCleanupTerminate - WARNING!! Calling native thread shutdown; only perform this when app is "
145 "ending! from: {}:{}",
146 sl.file_name(),
147 sl.line());
148 pthread_cancel(nativeHandle);
149 processor.detach();
150#elif defined(_WIN32) || defined(WIN32) || defined(_WIN64) || defined(WIN64)
151 auto nativeHandle = processor.native_handle();
152 std::cerr << std::format(
153 "forceCleanupTerminate - WARNING!! Calling native thread shutdown; only perform this when app is "
154 "ending! from: {}:{}",
155 sl.file_name(),
156 sl.line());
157 TerminateThread(nativeHandle, 0);
158 processor.detach();
159#endif
160 }
161 catch (const std::exception& ex) {
162 std::cerr << std::format("forceCleanupTerminate - Exception while shutting down worker: {}", ex.what());
163 }
164 });
165 }
166
170 periodic_worker(std::function<void()> c,
171 std::chrono::microseconds interval,
172 std::string name = {"anonymous-periodic-worker"})
173 : callback(std::move(c))
174 , invokePeriod(interval)
175 , threadName(std::move(name))
176 {
177 }
178
179
180#if defined(NLOHMANN_JSON_VERSION_MAJOR)
186 nlohmann::json toJson() const
187 {
188 using namespace std;
189
190 return {{"_typver"s, "siddiqsoft.asynchrony-lib.periodic_worker/0.10"s},
191 {"threadName", threadName},
192 {"outstandingCallbacks", outstandingCallback.load(std::memory_order_acquire)},
193 {"invokeCounter"s, invokeCounter.load(std::memory_order_acquire)},
194 {"threadPriority"s, Pri},
195 {"waitInterval"s, invokePeriod.load(std::memory_order_acquire).count()}};
196 }
197#endif
198
199 private:
200 std::once_flag flag_forceCleanupTerminate {};
201
204 std::atomic_uint outstandingCallback {0};
206 std::string threadName {"anonymous-periodic-worker"};
208 std::atomic_uint64_t invokeCounter {0};
210 std::counting_semaphore<1> signal {0};
213 std::atomic<std::chrono::microseconds> invokePeriod {std::chrono::milliseconds(1500)};
215 std::function<void()> callback;
222 std::jthread processor {[&](std::stop_token st) {
223#if defined(WIN64) || defined(_WIN64) || defined(WIN32) || defined(_WIN32)
224 // Set the thread priority if possible
225 if constexpr (Pri != 0) SetThreadPriority(GetCurrentThread(), Pri);
226#endif
227
228 while (!st.stop_requested()) {
229 try {
230 // This will wait until our period and return.
231 // We do not care about the return from try_acquire_for..
232 // We're using it as an efficient "wait" facility for period.
233 // FIX: Load invokePeriod atomically with acquire semantics
234 auto _ = signal.try_acquire_for(invokePeriod.load(std::memory_order_acquire));
235
236 if (!st.stop_requested()) {
237 auto decrementOutstandingCallback = siddiqsoft::RunOnEnd {[&] {
238 // Decrement outstanding callback
239 outstandingCallback.fetch_sub(1, std::memory_order_release);
240 }};
241
242 // Increment outstanding callback with release semantics (FIX: was acquire, should be release)
243 outstandingCallback.fetch_add(1, std::memory_order_release);
244 try {
245 // Delegate to the callback outside the lock
246 if (callback) callback();
247 invokeCounter.fetch_add(1, std::memory_order_release);
248 }
249 catch (const std::exception& ex) {
250 // We swallow exceptions from the callback to avoid thread termination and log it if needed.
251 std::cerr << std::format("Ignoring Exception (inner) in simple_worker callback: {}", ex.what());
252 }
253 }
254 }
255 catch (const std::exception& ex) {
256 // We swallow exceptions from the callback to avoid thread termination and log it if needed.
257 std::cerr << std::format("Ignoring Exception (outer) in simple_worker callback: {}", ex.what());
258 }
259 } // while ..continue until we're asked to stop
260 }};
261 };
262
263#if defined(NLOHMANN_JSON_VERSION_MAJOR)
268 template <int Pri = 0>
269 static void to_json(nlohmann::json& dest, const siddiqsoft::periodic_worker<Pri>& src)
270 {
271 dest = src.toJson();
272 }
273#endif
274
275} // namespace siddiqsoft
276#endif // PERIODIC_WORKER_HPP
periodic_worker(std::function< void()> c, std::chrono::microseconds interval, std::string name={"anonymous-periodic-worker"})
Constructor requires the callback for the thread.
void forceCleanupTerminate(const std::source_location &sl=std::source_location::current())
This method is to be used by the user when they shutdown their application. This is best used for cas...
~periodic_worker()
Destructor Cancel the semaphore by first resetting the interval to zero. Signal the semaphore follow ...