Compilation fixes with recent compiler versions (GCC 13, clang 16).
[far2l.git] / utils / src / ThreadedWorkQueue.cpp
blob9d1de2bcacb8777f73bb56f52df85bbf768c5950
1 #include "ThreadedWorkQueue.h"
2 #include "debug.h"
3 #include <unistd.h>
4 #include <stdio.h>
5 #include <stdexcept>
7 class ThreadedWorker : public Threaded
9 ThreadedWorkQueue *_twq;
11 virtual void *ThreadProc()
13 _twq->WorkerThreadProc();
14 return nullptr;
17 public:
18 ThreadedWorker(ThreadedWorkQueue *twq) : _twq(twq)
20 if (!StartThread()) {
21 throw std::runtime_error("StartThread failed");
25 virtual ~ThreadedWorker()
27 WaitThread();
31 ThreadedWorkQueue::OrderedItemsDestroyer::~OrderedItemsDestroyer()
33 for (auto *twi : *this) {
34 delete twi;
38 ThreadedWorkQueue::ThreadedWorkQueue(size_t threads_count)
40 _threads_count(threads_count ? threads_count : BestThreadsCount())
44 ThreadedWorkQueue::~ThreadedWorkQueue()
46 const size_t workers_count = _workers.size();
47 if (workers_count != 0) {
48 std::unique_lock<std::mutex> lock(_mtx);
49 _stopping = true;
50 _cond.notify_all();
52 _workers.clear(); // this also joins them
53 ASSERT(_working == 0);
55 fprintf(stderr,
56 "%s: threads=%lu/%lu done=%lu finalized=%lu unprocessed_backlog=%lu unprocessed_done=%lu _backlog_waits=%lu\n",
57 __FUNCTION__, (unsigned long)workers_count, (unsigned long)_threads_count,
58 (unsigned long)_done_counter, (unsigned long)_finalized_counter,
59 (unsigned long)_backlog.size(), (unsigned long)_done.size(), (unsigned long)_backlog_waits);
61 for (const auto &it : _done) {
62 delete it.second;
64 for (auto *twi: _backlog) {
65 delete twi;
69 // invoked by ThreadedWorker from its thread routine
70 void ThreadedWorkQueue::WorkerThreadProc()
72 size_t seq = 0;
73 IThreadedWorkItem *twi = nullptr;
74 for (;;) {
75 if (twi) try {
76 twi->WorkProc();
78 } catch (std::exception &e) {
79 fprintf(stderr, "%s/WorkProc: %s", __FUNCTION__, e.what());
82 std::unique_lock<std::mutex> lock(_mtx);
83 while (twi) try {
84 _done.emplace(seq, twi);
85 _working--;
86 if (_notify_on_done) {
87 _notify_on_done = false;
88 _cond.notify_all();
90 twi = nullptr;
92 } catch (std::exception &e) { // OOM? retry in one second til memory will appear
93 fprintf(stderr, "%s: %s", __FUNCTION__, e.what());
94 sleep(1);
97 if (_stopping) {
98 break;
101 if (_backlog.empty()) {
102 _cond.wait(lock);
104 } else {
105 seq = ++_done_counter;
106 twi = _backlog.front();
107 _backlog.pop_front();
108 _working++;
113 void ThreadedWorkQueue::Queue(IThreadedWorkItem *twi, size_t backlog_limit)
115 if (backlog_limit == (size_t)-1) {
116 backlog_limit = 2 * _threads_count;
119 OrderedItemsDestroyer oid;
122 std::unique_lock<std::mutex> lock(_mtx);
123 if (_workers.empty() || (!_backlog.empty() && _workers.size() < _threads_count)) {
124 try {
125 _workers.emplace_back(this);
126 } catch (std::exception &e) {
127 fprintf(stderr, "%s/WORKER: %s\n", __FUNCTION__, e.what());
130 if (!_workers.empty()) {
131 _backlog.emplace_back(twi);
132 twi = nullptr;
133 if (_backlog.size() == 1) {
134 _cond.notify_one();
135 } else {
136 _cond.notify_all();
138 if (_backlog.size() > backlog_limit) {
139 ++_backlog_waits;
140 do {
141 _notify_on_done = true;
142 _cond.wait(lock);
143 } while (_backlog.size() > backlog_limit);
146 FetchOrderedDoneItems(oid);
149 if (twi) {
150 // no workers? fallback to synchronous processing
151 try {
152 twi->WorkProc();
153 } catch (std::exception &e) {
154 fprintf(stderr, "%s/WorkProc: %s", __FUNCTION__, e.what());
156 delete twi;
160 void ThreadedWorkQueue::Finalize()
162 OrderedItemsDestroyer oid;
163 std::unique_lock<std::mutex> lock(_mtx);
164 for (;;) {
165 if (_backlog.empty() && _working == 0) {
166 FetchOrderedDoneItems(oid);
167 break;
169 _notify_on_done = true;
170 _cond.wait(lock);
174 // must be invoked under _mtx lock held
175 void ThreadedWorkQueue::FetchOrderedDoneItems(OrderedItemsDestroyer &oid)
177 while (!_done.empty() && _done.begin()->first == _finalized_counter + 1) {
178 ++_finalized_counter;
179 oid.emplace_back(_done.begin()->second);
180 _done.erase(_done.begin());
184 ////////////////////////////////////////
186 ThreadedWorkQueuePtrScope::ThreadedWorkQueuePtrScope(std::unique_ptr<ThreadedWorkQueue> &pWQ)
187 : _pWQ(pWQ)
189 if (!_pWQ) {
190 _inited = true;
191 _pWQ.reset(new ThreadedWorkQueue);
195 ThreadedWorkQueuePtrScope::~ThreadedWorkQueuePtrScope()
197 try {
198 _pWQ->Finalize();
199 } catch (std::exception &e) {
200 fprintf(stderr, "%s: %s\n", __FUNCTION__, e.what());
202 if (_inited) {
203 _pWQ.reset();