71 struct periodic_worker
73 static constexpr std::chrono::milliseconds DEFAULT_WAIT_FOR_NEXT_ITEM_MS {1500};
77 periodic_worker(periodic_worker&) =
delete;
78 auto& operator=(periodic_worker&) =
delete;
80 periodic_worker(periodic_worker&&) =
delete;
81 auto& operator=(periodic_worker&&) =
delete;
95#if defined(DEBUG) || defined(_DEBUG)
96 std::cerr << std::format(
97 "Shutting down periodic worker [{}] with outstanding callbacks [{}] and total invoke count [{}]\n",
99 outstandingCallback.load(std::memory_order_acquire),
100 invokeCounter.load(std::memory_order_acquire));
106 invokePeriod.store(std::chrono::microseconds(0), std::memory_order_release);
110#if defined(DEBUG) || defined(_DEBUG)
111 std::cerr << std::format(
"Signaled shutdown for periodic worker [{}], waiting for thread to join...\n", threadName);
117 processor.request_stop();
118 std::this_thread::sleep_for(std::chrono::milliseconds(100));
121 catch (
const std::exception& ex) {
122 std::cerr << std::format(
"Exception while shutting down periodic worker [{}]: {}", threadName, ex.what());
125#if defined(DEBUG) || defined(_DEBUG)
126 std::cerr << std::format(
"End of destructor for periodic worker [{}], waiting for thread to join...\n", threadName);
135 std::call_once(flag_forceCleanupTerminate, [&]() {
139 processor.request_stop();
140 std::this_thread::sleep_for(std::chrono::milliseconds(100));
141#if defined(_Linux_) || defined(__linux__) || defined(__linux) || (defined(__APPLE__) && defined(__MACH__))
142 auto nativeHandle = processor.native_handle();
143 std::cerr << std::format(
144 "forceCleanupTerminate - WARNING!! Calling native thread shutdown; only perform this when app is "
145 "ending! from: {}:{}",
148 pthread_cancel(nativeHandle);
150#elif defined(_WIN32) || defined(WIN32) || defined(_WIN64) || defined(WIN64)
151 auto nativeHandle = processor.native_handle();
152 std::cerr << std::format(
153 "forceCleanupTerminate - WARNING!! Calling native thread shutdown; only perform this when app is "
154 "ending! from: {}:{}",
157 TerminateThread(nativeHandle, 0);
161 catch (
const std::exception& ex) {
162 std::cerr << std::format(
"forceCleanupTerminate - Exception while shutting down worker: {}", ex.what());
171 std::chrono::microseconds interval,
172 std::string name = {
"anonymous-periodic-worker"})
173 : callback(std::move(c))
174 , invokePeriod(interval)
175 , threadName(std::move(name))
180#if defined(NLOHMANN_JSON_VERSION_MAJOR)
186 nlohmann::json toJson()
const
190 return {{
"_typver"s,
"siddiqsoft.asynchrony-lib.periodic_worker/0.10"s},
191 {
"threadName", threadName},
192 {
"outstandingCallbacks", outstandingCallback.load(std::memory_order_acquire)},
193 {
"invokeCounter"s, invokeCounter.load(std::memory_order_acquire)},
194 {
"threadPriority"s, Pri},
195 {
"waitInterval"s, invokePeriod.load(std::memory_order_acquire).count()}};
200 std::once_flag flag_forceCleanupTerminate {};
204 std::atomic_uint outstandingCallback {0};
206 std::string threadName {
"anonymous-periodic-worker"};
208 std::atomic_uint64_t invokeCounter {0};
210 std::counting_semaphore<1> signal {0};
213 std::atomic<std::chrono::microseconds> invokePeriod {std::chrono::milliseconds(1500)};
215 std::function<void()> callback;
222 std::jthread processor {[&](std::stop_token st) {
223#if defined(WIN64) || defined(_WIN64) || defined(WIN32) || defined(_WIN32)
225 if constexpr (Pri != 0) SetThreadPriority(GetCurrentThread(), Pri);
228 while (!st.stop_requested()) {
234 auto _ = signal.try_acquire_for(invokePeriod.load(std::memory_order_acquire));
236 if (!st.stop_requested()) {
237 auto decrementOutstandingCallback = siddiqsoft::RunOnEnd {[&] {
239 outstandingCallback.fetch_sub(1, std::memory_order_release);
243 outstandingCallback.fetch_add(1, std::memory_order_release);
246 if (callback) callback();
247 invokeCounter.fetch_add(1, std::memory_order_release);
249 catch (
const std::exception& ex) {
251 std::cerr << std::format(
"Ignoring Exception (inner) in simple_worker callback: {}", ex.what());
255 catch (
const std::exception& ex) {
257 std::cerr << std::format(
"Ignoring Exception (outer) in simple_worker callback: {}", ex.what());
269 static void to_json(nlohmann::json& dest,
const siddiqsoft::periodic_worker<Pri>& src)