1 /* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
2 file Copyright.txt or https://cmake.org/licensing for details. */
3 #include "cmWorkerPool.h"
7 #include <condition_variable>
19 #include "cmStringAlgorithms.h"
20 #include "cmUVHandlePtr.h"
23 * @brief libuv pipe buffer class
28 using DataRange
= cmRange
<const char*>;
29 using DataFunction
= std::function
<void(DataRange
)>;
30 /// On error the ssize_t argument is a non zero libuv error code
31 using EndFunction
= std::function
<void(ssize_t
)>;
34 * Reset to construction state
39 * Initializes uv_pipe(), uv_stream() and uv_handle()
40 * @return true on success
42 bool init(uv_loop_t
* uv_loop
);
46 * @return true on success
48 bool startRead(DataFunction dataFunction
, EndFunction endFunction
);
51 uv_pipe_t
* uv_pipe() const { return this->UVPipe_
.get(); }
52 //! uv_pipe() casted to libuv stream
53 uv_stream_t
* uv_stream() const
55 return static_cast<uv_stream_t
*>(this->UVPipe_
);
57 //! uv_pipe() casted to libuv handle
58 uv_handle_t
* uv_handle() { return static_cast<uv_handle_t
*>(this->UVPipe_
); }
62 static void UVAlloc(uv_handle_t
* handle
, size_t suggestedSize
,
64 static void UVData(uv_stream_t
* stream
, ssize_t nread
, const uv_buf_t
* buf
);
66 cm::uv_pipe_ptr UVPipe_
;
67 std::vector
<char> Buffer_
;
68 DataFunction DataFunction_
;
69 EndFunction EndFunction_
;
72 void cmUVPipeBuffer::reset()
74 if (this->UVPipe_
.get() != nullptr) {
75 this->EndFunction_
= nullptr;
76 this->DataFunction_
= nullptr;
77 this->Buffer_
.clear();
78 this->Buffer_
.shrink_to_fit();
79 this->UVPipe_
.reset();
83 bool cmUVPipeBuffer::init(uv_loop_t
* uv_loop
)
86 if (uv_loop
== nullptr) {
89 int ret
= this->UVPipe_
.init(*uv_loop
, 0, this);
93 bool cmUVPipeBuffer::startRead(DataFunction dataFunction
,
94 EndFunction endFunction
)
96 if (this->UVPipe_
.get() == nullptr) {
99 if (!dataFunction
|| !endFunction
) {
102 this->DataFunction_
= std::move(dataFunction
);
103 this->EndFunction_
= std::move(endFunction
);
104 int ret
= uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc
,
105 &cmUVPipeBuffer::UVData
);
109 void cmUVPipeBuffer::UVAlloc(uv_handle_t
* handle
, size_t suggestedSize
,
112 auto& pipe
= *reinterpret_cast<cmUVPipeBuffer
*>(handle
->data
);
113 pipe
.Buffer_
.resize(suggestedSize
);
114 buf
->base
= pipe
.Buffer_
.data();
115 buf
->len
= static_cast<unsigned long>(pipe
.Buffer_
.size());
118 void cmUVPipeBuffer::UVData(uv_stream_t
* stream
, ssize_t nread
,
121 auto& pipe
= *reinterpret_cast<cmUVPipeBuffer
*>(stream
->data
);
123 if (buf
->base
!= nullptr) {
124 // Call data function
125 pipe
.DataFunction_(DataRange(buf
->base
, buf
->base
+ nread
));
127 } else if (nread
< 0) {
128 // Save the end function on the stack before resetting the pipe
130 efunc
.swap(pipe
.EndFunction_
);
131 // Reset pipe before calling the end function
134 efunc((nread
== UV_EOF
) ? 0 : nread
);
139 * @brief External process management class
141 class cmUVReadOnlyProcess
145 //! @brief Process settings
148 std::string WorkingDirectory
;
149 std::vector
<std::string
> Command
;
150 cmWorkerPool::ProcessResultT
* Result
= nullptr;
151 bool MergedOutput
= false;
154 // -- Const accessors
155 SetupT
const& Setup() const { return this->Setup_
; }
156 cmWorkerPool::ProcessResultT
* Result() const { return this->Setup_
.Result
; }
157 bool IsStarted() const { return this->IsStarted_
; }
158 bool IsFinished() const { return this->IsFinished_
; }
161 void setup(cmWorkerPool::ProcessResultT
* result
, bool mergedOutput
,
162 std::vector
<std::string
> const& command
,
163 std::string
const& workingDirectory
= std::string());
164 bool start(uv_loop_t
* uv_loop
, std::function
<void()> finishedCallback
);
167 // -- Libuv callbacks
168 static void UVExit(uv_process_t
* handle
, int64_t exitStatus
, int termSignal
);
169 void UVPipeOutData(cmUVPipeBuffer::DataRange data
) const;
170 void UVPipeOutEnd(ssize_t error
);
171 void UVPipeErrData(cmUVPipeBuffer::DataRange data
) const;
172 void UVPipeErrEnd(ssize_t error
);
178 bool IsStarted_
= false;
179 bool IsFinished_
= false;
180 std::function
<void()> FinishedCallback_
;
181 std::vector
<const char*> CommandPtr_
;
182 std::array
<uv_stdio_container_t
, 3> UVOptionsStdIO_
;
183 uv_process_options_t UVOptions_
;
184 cm::uv_process_ptr UVProcess_
;
185 cmUVPipeBuffer UVPipeOut_
;
186 cmUVPipeBuffer UVPipeErr_
;
189 void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT
* result
,
191 std::vector
<std::string
> const& command
,
192 std::string
const& workingDirectory
)
194 this->Setup_
.WorkingDirectory
= workingDirectory
;
195 this->Setup_
.Command
= command
;
196 this->Setup_
.Result
= result
;
197 this->Setup_
.MergedOutput
= mergedOutput
;
200 bool cmUVReadOnlyProcess::start(uv_loop_t
* uv_loop
,
201 std::function
<void()> finishedCallback
)
203 if (this->IsStarted() || (this->Result() == nullptr)) {
207 // Reset result before the start
208 this->Result()->reset();
210 // Fill command string pointers
211 if (!this->Setup().Command
.empty()) {
212 this->CommandPtr_
.reserve(this->Setup().Command
.size() + 1);
213 for (std::string
const& arg
: this->Setup().Command
) {
214 this->CommandPtr_
.push_back(arg
.c_str());
216 this->CommandPtr_
.push_back(nullptr);
218 this->Result()->ErrorMessage
= "Empty command";
221 if (!this->Result()->error()) {
222 if (!this->UVPipeOut_
.init(uv_loop
)) {
223 this->Result()->ErrorMessage
= "libuv stdout pipe initialization failed";
226 if (!this->Result()->error()) {
227 if (!this->UVPipeErr_
.init(uv_loop
)) {
228 this->Result()->ErrorMessage
= "libuv stderr pipe initialization failed";
231 if (!this->Result()->error()) {
232 // -- Setup process stdio options
234 this->UVOptionsStdIO_
[0].flags
= UV_IGNORE
;
235 this->UVOptionsStdIO_
[0].data
.stream
= nullptr;
237 this->UVOptionsStdIO_
[1].flags
=
238 static_cast<uv_stdio_flags
>(UV_CREATE_PIPE
| UV_WRITABLE_PIPE
);
239 this->UVOptionsStdIO_
[1].data
.stream
= this->UVPipeOut_
.uv_stream();
241 this->UVOptionsStdIO_
[2].flags
=
242 static_cast<uv_stdio_flags
>(UV_CREATE_PIPE
| UV_WRITABLE_PIPE
);
243 this->UVOptionsStdIO_
[2].data
.stream
= this->UVPipeErr_
.uv_stream();
245 // -- Setup process options
246 std::fill_n(reinterpret_cast<char*>(&this->UVOptions_
),
247 sizeof(this->UVOptions_
), 0);
248 this->UVOptions_
.exit_cb
= &cmUVReadOnlyProcess::UVExit
;
249 this->UVOptions_
.file
= this->CommandPtr_
[0];
250 this->UVOptions_
.args
= const_cast<char**>(this->CommandPtr_
.data());
251 this->UVOptions_
.cwd
= this->Setup_
.WorkingDirectory
.c_str();
252 this->UVOptions_
.flags
= UV_PROCESS_WINDOWS_HIDE
;
253 this->UVOptions_
.stdio_count
=
254 static_cast<int>(this->UVOptionsStdIO_
.size());
255 this->UVOptions_
.stdio
= this->UVOptionsStdIO_
.data();
258 int uvErrorCode
= this->UVProcess_
.spawn(*uv_loop
, this->UVOptions_
, this);
259 if (uvErrorCode
!= 0) {
260 this->Result()->ErrorMessage
= "libuv process spawn failed";
261 if (const char* uvErr
= uv_strerror(uvErrorCode
)) {
262 this->Result()->ErrorMessage
+= ": ";
263 this->Result()->ErrorMessage
+= uvErr
;
267 // -- Start reading from stdio streams
268 if (!this->Result()->error()) {
269 if (!this->UVPipeOut_
.startRead(
270 [this](cmUVPipeBuffer::DataRange range
) {
271 this->UVPipeOutData(range
);
273 [this](ssize_t error
) { this->UVPipeOutEnd(error
); })) {
274 this->Result()->ErrorMessage
=
275 "libuv start reading from stdout pipe failed";
278 if (!this->Result()->error()) {
279 if (!this->UVPipeErr_
.startRead(
280 [this](cmUVPipeBuffer::DataRange range
) {
281 this->UVPipeErrData(range
);
283 [this](ssize_t error
) { this->UVPipeErrEnd(error
); })) {
284 this->Result()->ErrorMessage
=
285 "libuv start reading from stderr pipe failed";
289 if (!this->Result()->error()) {
290 this->IsStarted_
= true;
291 this->FinishedCallback_
= std::move(finishedCallback
);
293 // Clear libuv handles and finish
294 this->UVProcess_
.reset();
295 this->UVPipeOut_
.reset();
296 this->UVPipeErr_
.reset();
297 this->CommandPtr_
.clear();
300 return this->IsStarted();
303 void cmUVReadOnlyProcess::UVExit(uv_process_t
* handle
, int64_t exitStatus
,
306 auto& proc
= *reinterpret_cast<cmUVReadOnlyProcess
*>(handle
->data
);
307 if (proc
.IsStarted() && !proc
.IsFinished()) {
308 // Set error message on demand
309 proc
.Result()->ExitStatus
= exitStatus
;
310 proc
.Result()->TermSignal
= termSignal
;
311 if (proc
.Result()->ErrorMessage
.empty()) {
312 if (termSignal
!= 0) {
313 proc
.Result()->ErrorMessage
= cmStrCat(
314 "Process was terminated by signal ", proc
.Result()->TermSignal
);
315 } else if (exitStatus
!= 0) {
316 proc
.Result()->ErrorMessage
= cmStrCat(
317 "Process failed with return value ", proc
.Result()->ExitStatus
);
321 // Reset process handle
322 proc
.UVProcess_
.reset();
328 void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data
) const
330 this->Result()->StdOut
.append(data
.begin(), data
.end());
333 void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error
)
335 // Process pipe error
336 if ((error
!= 0) && !this->Result()->error()) {
337 this->Result()->ErrorMessage
= cmStrCat(
338 "Reading from stdout pipe failed with libuv error code ", error
);
344 void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data
) const
346 std::string
* str
= this->Setup_
.MergedOutput
? &this->Result()->StdOut
347 : &this->Result()->StdErr
;
348 str
->append(data
.begin(), data
.end());
351 void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error
)
353 // Process pipe error
354 if ((error
!= 0) && !this->Result()->error()) {
355 this->Result()->ErrorMessage
= cmStrCat(
356 "Reading from stderr pipe failed with libuv error code ", error
);
362 void cmUVReadOnlyProcess::UVTryFinish()
364 // There still might be data in the pipes after the process has finished.
365 // Therefore check if the process is finished AND all pipes are closed
366 // before signaling the worker thread to continue.
367 if ((this->UVProcess_
.get() != nullptr) ||
368 (this->UVPipeOut_
.uv_pipe() != nullptr) ||
369 (this->UVPipeErr_
.uv_pipe() != nullptr)) {
372 this->IsFinished_
= true;
373 this->FinishedCallback_();
377 * @brief Worker pool worker thread
379 class cmWorkerPoolWorker
382 cmWorkerPoolWorker(uv_loop_t
& uvLoop
);
383 ~cmWorkerPoolWorker();
385 cmWorkerPoolWorker(cmWorkerPoolWorker
const&) = delete;
386 cmWorkerPoolWorker
& operator=(cmWorkerPoolWorker
const&) = delete;
389 * Set the internal thread
391 void SetThread(std::thread
&& aThread
) { this->Thread_
= std::move(aThread
); }
394 * Run an external process
396 bool RunProcess(cmWorkerPool::ProcessResultT
& result
,
397 std::vector
<std::string
> const& command
,
398 std::string
const& workingDirectory
);
401 // -- Libuv callbacks
402 static void UVProcessStart(uv_async_t
* handle
);
403 void UVProcessFinished();
405 // -- Process management
409 cm::uv_async_ptr Request
;
410 std::condition_variable Condition
;
411 std::unique_ptr
<cmUVReadOnlyProcess
> ROP
;
417 cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t
& uvLoop
)
419 this->Proc_
.Request
.init(uvLoop
, &cmWorkerPoolWorker::UVProcessStart
, this);
422 cmWorkerPoolWorker::~cmWorkerPoolWorker()
424 if (this->Thread_
.joinable()) {
425 this->Thread_
.join();
429 bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT
& result
,
430 std::vector
<std::string
> const& command
,
431 std::string
const& workingDirectory
)
433 if (command
.empty()) {
436 // Create process instance
438 std::lock_guard
<std::mutex
> lock(this->Proc_
.Mutex
);
439 this->Proc_
.ROP
= cm::make_unique
<cmUVReadOnlyProcess
>();
440 this->Proc_
.ROP
->setup(&result
, true, command
, workingDirectory
);
442 // Send asynchronous process start request to libuv loop
443 this->Proc_
.Request
.send();
444 // Wait until the process has been finished and destroyed
446 std::unique_lock
<std::mutex
> ulock(this->Proc_
.Mutex
);
447 while (this->Proc_
.ROP
) {
448 this->Proc_
.Condition
.wait(ulock
);
451 return !result
.error();
454 void cmWorkerPoolWorker::UVProcessStart(uv_async_t
* handle
)
456 auto* wrk
= reinterpret_cast<cmWorkerPoolWorker
*>(handle
->data
);
457 bool startFailed
= false;
459 auto& Proc
= wrk
->Proc_
;
460 std::lock_guard
<std::mutex
> lock(Proc
.Mutex
);
461 if (Proc
.ROP
&& !Proc
.ROP
->IsStarted()) {
463 !Proc
.ROP
->start(handle
->loop
, [wrk
] { wrk
->UVProcessFinished(); });
466 // Clean up if starting of the process failed
468 wrk
->UVProcessFinished();
472 void cmWorkerPoolWorker::UVProcessFinished()
474 std::lock_guard
<std::mutex
> lock(this->Proc_
.Mutex
);
475 if (this->Proc_
.ROP
&&
476 (this->Proc_
.ROP
->IsFinished() || !this->Proc_
.ROP
->IsStarted())) {
477 this->Proc_
.ROP
.reset();
479 // Notify idling thread
480 this->Proc_
.Condition
.notify_one();
484 * @brief Private worker pool internals
486 class cmWorkerPoolInternal
490 cmWorkerPoolInternal(cmWorkerPool
* pool
);
491 ~cmWorkerPoolInternal();
494 * Runs the libuv loop.
499 * Clear queue and abort threads.
504 * Push a job to the queue and notify a worker.
506 bool PushJob(cmWorkerPool::JobHandleT
&& jobHandle
);
509 * Worker thread main loop method.
511 void Work(unsigned int workerIndex
);
514 static void UVSlotBegin(uv_async_t
* handle
);
515 static void UVSlotEnd(uv_async_t
* handle
);
518 std::unique_ptr
<uv_loop_t
> UVLoop
;
519 cm::uv_async_ptr UVRequestBegin
;
520 cm::uv_async_ptr UVRequestEnd
;
522 // -- Thread pool and job queue
524 bool Processing
= false;
525 bool Aborting
= false;
526 bool FenceProcessing
= false;
527 unsigned int WorkersRunning
= 0;
528 unsigned int WorkersIdle
= 0;
529 unsigned int JobsProcessing
= 0;
530 std::deque
<cmWorkerPool::JobHandleT
> Queue
;
531 std::condition_variable Condition
;
532 std::condition_variable ConditionFence
;
533 std::vector
<std::unique_ptr
<cmWorkerPoolWorker
>> Workers
;
536 cmWorkerPool
* Pool
= nullptr;
539 void cmWorkerPool::ProcessResultT::reset()
541 this->ExitStatus
= 0;
542 this->TermSignal
= 0;
543 if (!this->StdOut
.empty()) {
544 this->StdOut
.clear();
545 this->StdOut
.shrink_to_fit();
547 if (!this->StdErr
.empty()) {
548 this->StdErr
.clear();
549 this->StdErr
.shrink_to_fit();
551 if (!this->ErrorMessage
.empty()) {
552 this->ErrorMessage
.clear();
553 this->ErrorMessage
.shrink_to_fit();
557 cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool
* pool
)
560 // Initialize libuv loop
561 uv_disable_stdio_inheritance();
562 this->UVLoop
= cm::make_unique
<uv_loop_t
>();
563 uv_loop_init(this->UVLoop
.get());
566 cmWorkerPoolInternal::~cmWorkerPoolInternal()
568 uv_loop_close(this->UVLoop
.get());
571 bool cmWorkerPoolInternal::Process()
574 this->Processing
= true;
575 this->Aborting
= false;
576 // Initialize libuv asynchronous request
577 this->UVRequestBegin
.init(*this->UVLoop
, &cmWorkerPoolInternal::UVSlotBegin
,
579 this->UVRequestEnd
.init(*this->UVLoop
, &cmWorkerPoolInternal::UVSlotEnd
,
581 // Send begin request
582 this->UVRequestBegin
.send();
584 bool success
= (uv_run(this->UVLoop
.get(), UV_RUN_DEFAULT
) == 0);
585 // Update state flags
586 this->Processing
= false;
587 this->Aborting
= false;
591 void cmWorkerPoolInternal::Abort()
593 // Clear all jobs and set abort flag
594 std::lock_guard
<std::mutex
> guard(this->Mutex
);
595 if (!this->Aborting
) {
596 // Register abort and clear queue
597 this->Aborting
= true;
599 this->Condition
.notify_all();
603 inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT
&& jobHandle
)
605 std::lock_guard
<std::mutex
> guard(this->Mutex
);
606 if (this->Aborting
) {
609 // Append the job to the queue
610 this->Queue
.emplace_back(std::move(jobHandle
));
611 // Notify an idle worker if there's one
612 if (this->WorkersIdle
!= 0) {
613 this->Condition
.notify_one();
619 void cmWorkerPoolInternal::UVSlotBegin(uv_async_t
* handle
)
621 auto& gint
= *reinterpret_cast<cmWorkerPoolInternal
*>(handle
->data
);
622 // Create worker threads
624 unsigned int const num
= gint
.Pool
->ThreadCount();
626 gint
.Workers
.reserve(num
);
627 for (unsigned int ii
= 0; ii
!= num
; ++ii
) {
628 gint
.Workers
.emplace_back(
629 cm::make_unique
<cmWorkerPoolWorker
>(*gint
.UVLoop
));
631 // Start worker threads
632 for (unsigned int ii
= 0; ii
!= num
; ++ii
) {
633 gint
.Workers
[ii
]->SetThread(
634 std::thread(&cmWorkerPoolInternal::Work
, &gint
, ii
));
637 // Destroy begin request
638 gint
.UVRequestBegin
.reset();
641 void cmWorkerPoolInternal::UVSlotEnd(uv_async_t
* handle
)
643 auto& gint
= *reinterpret_cast<cmWorkerPoolInternal
*>(handle
->data
);
644 // Join and destroy worker threads
645 gint
.Workers
.clear();
646 // Destroy end request
647 gint
.UVRequestEnd
.reset();
650 void cmWorkerPoolInternal::Work(unsigned int workerIndex
)
652 cmWorkerPool::JobHandleT jobHandle
;
653 std::unique_lock
<std::mutex
> uLock(this->Mutex
);
654 // Increment running workers count
655 ++this->WorkersRunning
;
656 // Enter worker main loop
659 if (this->Aborting
) {
662 // Wait for new jobs on the main CV
663 if (this->Queue
.empty()) {
665 this->Condition
.wait(uLock
);
670 // If there is a fence currently active or waiting,
671 // sleep on the main CV and try again.
672 if (this->FenceProcessing
) {
673 this->Condition
.wait(uLock
);
677 // Pop next job from queue
678 jobHandle
= std::move(this->Queue
.front());
679 this->Queue
.pop_front();
681 // Check for fence jobs
682 bool raisedFence
= false;
683 if (jobHandle
->IsFence()) {
684 this->FenceProcessing
= true;
686 // Wait on the Fence CV until all pending jobs are done.
687 while (this->JobsProcessing
!= 0 && !this->Aborting
) {
688 this->ConditionFence
.wait(uLock
);
690 // When aborting, explicitly kick all threads alive once more.
691 if (this->Aborting
) {
692 this->FenceProcessing
= false;
693 this->Condition
.notify_all();
698 // Unlocked scope for job processing
699 ++this->JobsProcessing
;
702 jobHandle
->Work(this->Pool
, workerIndex
); // Process job
703 jobHandle
.reset(); // Destroy job
706 --this->JobsProcessing
;
708 // If this was the thread that entered fence processing
709 // originally, notify all idling workers that the fence
712 this->FenceProcessing
= false;
713 this->Condition
.notify_all();
715 // If fence processing is still not done, notify the
716 // the fencing worker when all active jobs are done.
717 if (this->FenceProcessing
&& this->JobsProcessing
== 0) {
718 this->ConditionFence
.notify_all();
722 // Decrement running workers count
723 if (--this->WorkersRunning
== 0) {
724 // Last worker thread about to finish. Send libuv event.
725 this->UVRequestEnd
.send();
729 cmWorkerPool::JobT::~JobT() = default;
731 bool cmWorkerPool::JobT::RunProcess(ProcessResultT
& result
,
732 std::vector
<std::string
> const& command
,
733 std::string
const& workingDirectory
)
735 // Get worker by index
736 auto* wrk
= this->Pool_
->Int_
->Workers
.at(this->WorkerIndex_
).get();
737 return wrk
->RunProcess(result
, command
, workingDirectory
);
740 cmWorkerPool::cmWorkerPool()
741 : Int_(cm::make_unique
<cmWorkerPoolInternal
>(this))
745 cmWorkerPool::~cmWorkerPool() = default;
747 void cmWorkerPool::SetThreadCount(unsigned int threadCount
)
749 if (!this->Int_
->Processing
) {
750 this->ThreadCount_
= (threadCount
> 0) ? threadCount
: 1u;
754 bool cmWorkerPool::Process(void* userData
)
757 this->UserData_
= userData
;
759 bool success
= this->Int_
->Process();
761 this->UserData_
= nullptr;
766 bool cmWorkerPool::PushJob(JobHandleT
&& jobHandle
)
768 return this->Int_
->PushJob(std::move(jobHandle
));
771 void cmWorkerPool::Abort()