61 static constexpr std::chrono::milliseconds DEFAULT_WAIT_FOR_NEXT_ITEM_MS {1500};
63 simple_pool(simple_pool&&) =
delete;
64 simple_pool& operator=(simple_pool&&) =
delete;
65 simple_pool(simple_pool&) =
delete;
66 simple_pool& operator=(simple_pool&) =
delete;
76 for (
auto& t : workers) {
80 if (t.request_stop() && t.joinable()) t.join();
87 : callback(std::move(c))
91 workers.reserve((N > 0) ? N : std::thread::hardware_concurrency());
94 for (
unsigned i = 0; i < ((N > 0) ? N : std::thread::hardware_concurrency()); i++) {
96 workers.emplace_back([&](std::stop_token st) {
100 while (!st.stop_requested()) {
105 if (
auto item = getNextItem(); item.has_value() && !st.stop_requested() && callback) {
107 callback(std::move(*item));
110 catch (
const std::exception& ex) {
112 std::cerr << std::format(
"Ignoring Exception in simple_worker callback: {}", ex.what());
125 items.emplace(std::forward<T>(item));
128 queueCounter.fetch_add(1, std::memory_order_release);
132#if defined(NLOHMANN_JSON_VERSION_MAJOR)
139 auto toJson() const -> nlohmann::json
141 const auto sz = items.size();
142 return nlohmann::json {{
"_typver",
"siddiqsoft.asynchrony-lib.simple_pool/0.10"},
143 {
"workersSize", workers.size()},
145 {
"queueCounter", queueCounter.load(std::memory_order_acquire)},
146 {
"waitInterval", DEFAULT_WAIT_FOR_NEXT_ITEM_MS.count()}};
152 std::atomic_uint64_t queueCounter {0};
155 std::atomic_uint64_t queueCounter {0};
159 std::vector<std::jthread> workers {};
160 std::function<void(T&&)> callback;
161 std::counting_semaphore<> signal {0};
163 siddiqsoft::WaitableQueue<T> items {};
169 std::optional<T> getNextItem(
const std::chrono::milliseconds& delta = DEFAULT_WAIT_FOR_NEXT_ITEM_MS)
171 return items.tryWaitItem(delta);
181 static auto to_json(nlohmann::json& dest,
const siddiqsoft::simple_pool<T, N>& src) ->
void const