asynchrony 0.0.0
Add asynchrony to your C++ applications using standard C++20
Loading...
Searching...
No Matches
simple_pool.hpp
1/*
2 basic-pool : Add asynchrony to your apps
3
4 BSD 3-Clause License
5
6 Copyright (c) 2021, Siddiq Software LLC
7 All rights reserved.
8
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions are met:
11
12 1. Redistributions of source code must retain the above copyright notice, this
13 list of conditions and the following disclaimer.
14
15 2. Redistributions in binary form must reproduce the above copyright notice,
16 this list of conditions and the following disclaimer in the documentation
17 and/or other materials provided with the distribution.
18
19 3. Neither the name of the copyright holder nor the names of its
20 contributors may be used to endorse or promote products derived from
21 this software without specific prior written permission.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
26 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
27 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
29 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 */
34
35#pragma once
36#ifndef SIMPLE_POOL_HPP
37#define SIMPLE_POOL_HPP
38
39#include "simple_worker.hpp"
40#include <optional>
41#include <latch>
42#include <exception>
43#include <atomic>
44#include <utility>
45
46#include "siddiqsoft/WaitableQueue.hpp"
47#include "siddiqsoft/RunOnEnd.hpp"
48
49namespace siddiqsoft
50{
57 template <typename T, uint16_t N = 0>
58 requires std::is_move_constructible_v<T>
59 struct simple_pool
60 {
61 static constexpr std::chrono::milliseconds DEFAULT_WAIT_FOR_NEXT_ITEM_MS {1500};
62
63 simple_pool(simple_pool&&) = delete;
64 simple_pool& operator=(simple_pool&&) = delete;
65 simple_pool(simple_pool&) = delete;
66 simple_pool& operator=(simple_pool&) = delete;
67
68
73 {
74 // Compared to skipping the following code, we save at least about 100ms
75 // of idle time waiting for the threads to be signalled by default.
76 for (auto& t : workers) {
77 // Release the signal to indicate to the threads to abandon.
78 signal.release();
79 // Signal the threads to stop
80 if (t.request_stop() && t.joinable()) t.join();
81 }
82 }
83
86 simple_pool(std::function<void(T&&)> c)
87 : callback(std::move(c))
88 {
89 // *CRITICAL*
90 // This is step is *critical* otherwise we will end up moving threads as we add elements to the vector.
91 workers.reserve((N > 0) ? N : std::thread::hardware_concurrency());
92
93 // Create as many threads as reported by the system..
94 for (unsigned i = 0; i < ((N > 0) ? N : std::thread::hardware_concurrency()); i++) {
95 // Add the thread with the main driver
96 workers.emplace_back([&](std::stop_token st) {
97 // The driver runs forever until signalled to stop
98 // Tries to get next item ready in the queue (for max 1s cycle)
99 // If we have an item, invoke the callback with the item
100 while (!st.stop_requested()) {
101 try {
102 // The getNextItem performs the wait on the signal and if it expires, returns empty.
103 // If there is an item, it will get that item (minimizing move) and performs the pop
104 // and returns the item so we can invoke the callback outside the lock.
105 if (auto item = getNextItem(); item.has_value() && !st.stop_requested() && callback) {
106 // Delegate to the callback outside the lock
107 callback(std::move(*item));
108 }
109 }
110 catch (const std::exception& ex) {
111 // We swallow exceptions from the callback to avoid thread termination and log it if needed.
112 std::cerr << std::format("Ignoring Exception in simple_worker callback: {}", ex.what());
113 }
114 } // while ..continue until we're asked to stop
115 });
116 }
117 }
118
121 void queue(T&& item)
122 {
123 // With this interface, we can peform a perfect forward of the r-value from the caller into the
124 // items internal container without the complexity of lambda capture forwards.
125 items.emplace(std::forward<T>(item));
126
127 // Use atomic fetch_add with release semantics to ensure thread-safe updates
128 queueCounter.fetch_add(1, std::memory_order_release);
129 signal.release();
130 }
131
132#if defined(NLOHMANN_JSON_VERSION_MAJOR)
139 auto toJson() const -> nlohmann::json
140 {
141 const auto sz = items.size();
142 return nlohmann::json {{"_typver", "siddiqsoft.asynchrony-lib.simple_pool/0.10"},
143 {"workersSize", workers.size()},
144 {"dequeSize", sz},
145 {"queueCounter", queueCounter.load(std::memory_order_acquire)},
146 {"waitInterval", DEFAULT_WAIT_FOR_NEXT_ITEM_MS.count()}};
147 }
148#endif
149
150#ifdef _DEBUG
151 public:
152 std::atomic_uint64_t queueCounter {0};
153#else
154 private:
155 std::atomic_uint64_t queueCounter {0};
156#endif
157
158 private:
159 std::vector<std::jthread> workers {};
160 std::function<void(T&&)> callback;
161 std::counting_semaphore<> signal {0};
162 // siddiqsoft::RWLEnvelope<std::deque<T>> items {};
163 siddiqsoft::WaitableQueue<T> items {};
164
169 std::optional<T> getNextItem(const std::chrono::milliseconds& delta = DEFAULT_WAIT_FOR_NEXT_ITEM_MS)
170 {
171 return items.tryWaitItem(delta);
172 }
173 };
174
175#if defined(NLOHMANN_JSON_VERSION_MAJOR)
180 template <typename T, uint16_t N = 0>
181 static auto to_json(nlohmann::json& dest, const siddiqsoft::simple_pool<T, N>& src) -> void const
182 {
183 dest = src.toJson();
184 }
185#endif
186
187} // namespace siddiqsoft
188#endif // !SIMPLE_POOL_HPP
void queue(T &&item)
Queue item into the deque (takes "ownership" of the item).
simple_pool(std::function< void(T &&)> c)
Contructs a threadpool with N threads with the given callback/worker function.