74 static constexpr std::chrono::milliseconds DEFAULT_WAIT_FOR_NEXT_ITEM_MS {1500};
77 simple_worker(
const simple_worker&) =
delete;
78 simple_worker& operator=(
const simple_worker&) =
delete;
83#if defined(DEBUG) || defined(_DEBUG)
84 std::cerr << std::format(
"{} - Waiting for queue to be empty: {}\n", __func__, items.toJson().dump(2));
88 items.waitUntilEmpty();
93 if (processor.request_stop() && processor.joinable()) processor.join();
95 catch (
const std::exception&) {
104 std::call_once(flag_forceCleanupTerminate, [&]() {
108 processor.request_stop();
109 std::this_thread::sleep_for(std::chrono::milliseconds(100));
110#if defined(_Linux_) || defined(__linux__) || defined(__linux) || (defined(__APPLE__) && defined(__MACH__))
111 auto nativeHandle = processor.native_handle();
112 std::cerr << std::format(
113 "forceCleanupTerminate - WARNING!! Calling native thread shutdown; only perform this when app is "
114 "ending! from: {}:{}",
118 pthread_cancel(nativeHandle);
120#elif defined(_WIN32) || defined(WIN32) || defined(_WIN64) || defined(WIN64)
121 auto nativeHandle = processor.native_handle();
122 std::cerr << std::format(
123 "forceCleanupTerminate - WARNING!! Calling native thread shutdown; only perform this when app is "
124 "ending! from: {}:{}",
128 TerminateThread(nativeHandle, 0);
132 catch (
const std::exception& ex) {
133 std::cerr << std::format(
"forceCleanupTerminate - Exception while shutting down worker: {}", ex.what());
140 simple_worker& operator=(simple_worker&&) =
delete;
154 items.emplace(std::move(item));
155 queueCounter.fetch_add(1, std::memory_order_release);
158#if defined(NLOHMANN_JSON_VERSION_MAJOR)
165 auto toJson() const -> nlohmann::json
167 auto itemsSize = items.size();
168 auto itemsQueued = items.addCounter();
169 auto itemsPopped = items.removeCounter();
170 auto itemsOutstanding = itemsQueued - itemsPopped;
172 return {{
"_typver",
"siddiqsoft.asynchrony-lib.simple_worker/0.10"},
173 {
"itemsSize", itemsSize},
174 {
"queueCounter", queueCounter.load(std::memory_order_acquire)},
175 {
"itemsQueued", itemsQueued},
176 {
"itemsPopped", itemsPopped},
177 {
"itemsOutstanding", itemsOutstanding},
178 {
"threadPriority", Pri},
179 {
"outstandingCallback", outstandingCallback.load()},
180 {
"waitInterval", DEFAULT_WAIT_FOR_NEXT_ITEM_MS.count()}};
185 std::once_flag flag_forceCleanupTerminate {};
188 std::atomic_uint outstandingCallback {0};
191 std::atomic_uint64_t queueCounter {0};
194 siddiqsoft::WaitableQueue<T> items {};
197 std::function<void(T&&)> callback;
205 std::jthread processor {[&](std::stop_token st) {
206#if defined(WIN64) || defined(_WIN64) || defined(WIN32) || defined(_WIN32)
208 if constexpr (Pri != 0) SetThreadPriority(GetCurrentThread(), Pri);
211 while (!st.stop_requested()) {
217 if (
auto item = items.tryWaitItem(DEFAULT_WAIT_FOR_NEXT_ITEM_MS); item && !st.stop_requested() && callback) {
221 callback(std::move(*item));
223 catch (
const std::exception& ex) {
225 std::cerr << std::format(
"Ignoring Exception in simple_worker callback: {} - inner\n", ex.what());
229 catch (
const std::exception& ex) {
231 std::cerr << std::format(
"Ignoring Exception in simple_worker callback: {} - outter\n", ex.what());
243 static void to_json(nlohmann::json& dest,
const siddiqsoft::simple_worker<T, Pri>& src)