asynchrony 0.0.0
Add asynchrony to your C++ applications using standard C++20
Loading...
Searching...
No Matches
simple_worker.hpp
1/*
2 asynchrony : Add asynchrony to your apps
3
4 BSD 3-Clause License
5
6 Copyright (c) 2021, Siddiq Software LLC
7 All rights reserved.
8
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
11
12 1. Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
14
15 2. Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
18
19 3. Neither the name of the copyright holder nor the names of its
20 contributors may be used to endorse or promote products derived from
21 this software without specific prior written permission.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
26 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
27 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
29 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 */
34
35#pragma once
36#include <chrono>
37#ifndef SIMPLE_WORKER_HPP
38#define SIMPLE_WORKER_HPP
39
40
41#include <iostream>
42#include <functional>
43#include <memory>
44#include <thread>
45#include <mutex>
46#include <shared_mutex>
47#include <deque>
48#include <semaphore>
49#include <stop_token>
50#include <exception>
51#include <source_location>
52#include <atomic>
53
54#if defined(_Linux_) || defined(__linux__) || defined(__linux) || (defined(__APPLE__) && defined(__MACH__))
55#include <pthread.h>
56#elif defined(_WIN32) || defined(WIN32) || defined(_WIN64) || defined(WIN64)
57#include <windows.h>
58#include <processthreadsapi.h>
59#endif
60
61#include "siddiqsoft/WaitableQueue.hpp"
62#include "siddiqsoft/RunOnEnd.hpp"
63#include "private/common.hpp"
64
65namespace siddiqsoft
66{
70 template <typename T, int Pri = 0>
71 requires((Pri >= -10) && (Pri <= 10)) && std::move_constructible<T>
72 struct simple_worker
73 {
74 static constexpr std::chrono::milliseconds DEFAULT_WAIT_FOR_NEXT_ITEM_MS {1500};
75
76 public:
77 simple_worker(const simple_worker&) = delete;
78 simple_worker& operator=(const simple_worker&) = delete;
79
80
81 ~simple_worker()
82 {
83#if defined(DEBUG) || defined(_DEBUG)
84 std::cerr << std::format("{} - Waiting for queue to be empty: {}\n", __func__, items.toJson().dump(2));
85#endif
86
87 // Drain the existing items..
88 items.waitUntilEmpty();
89
90 // Signal the threads to shutdown..
91 try {
92 // Ask thread to shutdown
93 if (processor.request_stop() && processor.joinable()) processor.join();
94 }
95 catch (const std::exception&) {
96 }
97 }
98
102 void forceCleanupTerminate(const std::source_location& sl = std::source_location::current())
103 {
104 std::call_once(flag_forceCleanupTerminate, [&]() {
105 try {
106 // Notify the thread to stop.. and wait for a bit.. and then instead of joining we should just let the jthread
107 // destroy. Ask thread to shutdown and if joinable.. join.
108 processor.request_stop();
109 std::this_thread::sleep_for(std::chrono::milliseconds(100));
110#if defined(_Linux_) || defined(__linux__) || defined(__linux) || (defined(__APPLE__) && defined(__MACH__))
111 auto nativeHandle = processor.native_handle();
112 std::cerr << std::format(
113 "forceCleanupTerminate - WARNING!! Calling native thread shutdown; only perform this when app is "
114 "ending! from: {}:{}",
115
116 sl.file_name(),
117 sl.line());
118 pthread_cancel(nativeHandle);
119 processor.detach();
120#elif defined(_WIN32) || defined(WIN32) || defined(_WIN64) || defined(WIN64)
121 auto nativeHandle = processor.native_handle();
122 std::cerr << std::format(
123 "forceCleanupTerminate - WARNING!! Calling native thread shutdown; only perform this when app is "
124 "ending! from: {}:{}",
125
126 sl.file_name(),
127 sl.line());
128 TerminateThread(nativeHandle, 0);
129 processor.detach();
130#endif
131 }
132 catch (const std::exception& ex) {
133 std::cerr << std::format("forceCleanupTerminate - Exception while shutting down worker: {}", ex.what());
134 }
135 });
136 }
137
139 simple_worker(simple_worker&&) = delete;
140 simple_worker& operator=(simple_worker&&) = delete;
141
144 simple_worker(std::function<void(T&&)> c)
145 : callback(c)
146 {
147 }
148
149
152 void queue(T&& item)
153 {
154 items.emplace(std::move(item));
155 queueCounter.fetch_add(1, std::memory_order_release);
156 }
157
158#if defined(NLOHMANN_JSON_VERSION_MAJOR)
165 auto toJson() const -> nlohmann::json
166 {
167 auto itemsSize = items.size();
168 auto itemsQueued = items.addCounter();
169 auto itemsPopped = items.removeCounter();
170 auto itemsOutstanding = itemsQueued - itemsPopped;
171
172 return {{"_typver", "siddiqsoft.asynchrony-lib.simple_worker/0.10"},
173 {"itemsSize", itemsSize},
174 {"queueCounter", queueCounter.load(std::memory_order_acquire)},
175 {"itemsQueued", itemsQueued},
176 {"itemsPopped", itemsPopped},
177 {"itemsOutstanding", itemsOutstanding},
178 {"threadPriority", Pri},
179 {"outstandingCallback", outstandingCallback.load()},
180 {"waitInterval", DEFAULT_WAIT_FOR_NEXT_ITEM_MS.count()}};
181 }
182#endif
183
184 private:
185 std::once_flag flag_forceCleanupTerminate {};
186
188 std::atomic_uint outstandingCallback {0};
189
191 std::atomic_uint64_t queueCounter {0};
192
194 siddiqsoft::WaitableQueue<T> items {};
195
197 std::function<void(T&&)> callback;
198
205 std::jthread processor {[&](std::stop_token st) {
206#if defined(WIN64) || defined(_WIN64) || defined(WIN32) || defined(_WIN32)
207 // Set the thread priority if possible
208 if constexpr (Pri != 0) SetThreadPriority(GetCurrentThread(), Pri);
209#endif
210
211 while (!st.stop_requested()) {
212 try {
213 // The getNextItem performs the wait on the signal and if it expires, returns empty.
214 // If there is an item, it will get that item (minimizing move) and performs the pop
215 // and returns the item so we can invoke the callback outside the lock.
216 // We must ensure that the callback is nonempty!
217 if (auto item = items.tryWaitItem(DEFAULT_WAIT_FOR_NEXT_ITEM_MS); item && !st.stop_requested() && callback) {
218 // Delegate to the callback outside the lock
219 try {
220 // We get an optional<> and thus the use of the * to get the value if present..
221 callback(std::move(*item));
222 }
223 catch (const std::exception& ex) {
224 // We swallow exceptions from the callback to avoid thread termination and log it if needed.
225 std::cerr << std::format("Ignoring Exception in simple_worker callback: {} - inner\n", ex.what());
226 }
227 }
228 }
229 catch (const std::exception& ex) {
230 // We swallow exceptions from the callback to avoid thread termination and log it if needed.
231 std::cerr << std::format("Ignoring Exception in simple_worker callback: {} - outter\n", ex.what());
232 }
233 } // while ..continue until we're asked to stop
234 }};
235 };
236
237#if defined(NLOHMANN_JSON_VERSION_MAJOR)
242 template <typename T, int Pri = 0>
243 static void to_json(nlohmann::json& dest, const siddiqsoft::simple_worker<T, Pri>& src)
244 {
245 dest = src.toJson();
246 }
247#endif
248
249} // namespace siddiqsoft
250#endif // !SIMPLE_WORKER_HPP
void forceCleanupTerminate(const std::source_location &sl=std::source_location::current())
This method is to be used by the user when they shutdown their application. This is best used for cas...
simple_worker(simple_worker &&)=delete
Move constructor and assignment are disallowed to avoid transferring thread ownership.
simple_worker(std::function< void(T &&)> c)
Constructor requires the callback for the thread.
void queue(T &&item)
Queue item into this worker thread's deque.