user32/tests: Fix monitor test failures on some systems.
[wine.git] / dlls / ntdll / threadpool.c
bloba7ad321a8bef72e7880c65846f77830841688479
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include "config.h"
23 #include "wine/port.h"
25 #include <assert.h>
26 #include <stdarg.h>
27 #include <limits.h>
29 #define NONAMELESSUNION
30 #include "ntstatus.h"
31 #define WIN32_NO_STATUS
32 #include "winternl.h"
34 #include "wine/debug.h"
35 #include "wine/list.h"
37 #include "ntdll_misc.h"
39 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
42 * Old thread pooling API
45 struct rtl_work_item
47 PRTL_WORK_ITEM_ROUTINE function;
48 PVOID context;
51 #define EXPIRE_NEVER (~(ULONGLONG)0)
52 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
54 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
56 static struct
58 HANDLE compl_port;
59 RTL_CRITICAL_SECTION threadpool_compl_cs;
61 old_threadpool =
63 NULL, /* compl_port */
64 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
67 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
69 0, 0, &old_threadpool.threadpool_compl_cs,
70 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
71 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
74 struct wait_work_item
76 HANDLE Object;
77 HANDLE CancelEvent;
78 WAITORTIMERCALLBACK Callback;
79 PVOID Context;
80 ULONG Milliseconds;
81 ULONG Flags;
82 HANDLE CompletionEvent;
83 LONG DeleteCount;
84 int CallbackInProgress;
87 struct timer_queue;
88 struct queue_timer
90 struct timer_queue *q;
91 struct list entry;
92 ULONG runcount; /* number of callbacks pending execution */
93 RTL_WAITORTIMERCALLBACKFUNC callback;
94 PVOID param;
95 DWORD period;
96 ULONG flags;
97 ULONGLONG expire;
98 BOOL destroy; /* timer should be deleted; once set, never unset */
99 HANDLE event; /* removal event */
102 struct timer_queue
104 DWORD magic;
105 RTL_CRITICAL_SECTION cs;
106 struct list timers; /* sorted by expiration time */
107 BOOL quit; /* queue should be deleted; once set, never unset */
108 HANDLE event;
109 HANDLE thread;
113 * Object-oriented thread pooling API
116 #define THREADPOOL_WORKER_TIMEOUT 5000
117 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
119 /* internal threadpool representation */
120 struct threadpool
122 LONG refcount;
123 LONG objcount;
124 BOOL shutdown;
125 CRITICAL_SECTION cs;
126 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
127 struct list pools[3];
128 RTL_CONDITION_VARIABLE update_event;
129 /* information about worker threads, locked via .cs */
130 int max_workers;
131 int min_workers;
132 int num_workers;
133 int num_busy_workers;
136 enum threadpool_objtype
138 TP_OBJECT_TYPE_SIMPLE,
139 TP_OBJECT_TYPE_WORK,
140 TP_OBJECT_TYPE_TIMER,
141 TP_OBJECT_TYPE_WAIT
144 /* internal threadpool object representation */
145 struct threadpool_object
147 LONG refcount;
148 BOOL shutdown;
149 /* read-only information */
150 enum threadpool_objtype type;
151 struct threadpool *pool;
152 struct threadpool_group *group;
153 PVOID userdata;
154 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
155 PTP_SIMPLE_CALLBACK finalization_callback;
156 BOOL may_run_long;
157 HMODULE race_dll;
158 TP_CALLBACK_PRIORITY priority;
159 /* information about the group, locked via .group->cs */
160 struct list group_entry;
161 BOOL is_group_member;
162 /* information about the pool, locked via .pool->cs */
163 struct list pool_entry;
164 RTL_CONDITION_VARIABLE finished_event;
165 RTL_CONDITION_VARIABLE group_finished_event;
166 LONG num_pending_callbacks;
167 LONG num_running_callbacks;
168 LONG num_associated_callbacks;
169 /* arguments for callback */
170 union
172 struct
174 PTP_SIMPLE_CALLBACK callback;
175 } simple;
176 struct
178 PTP_WORK_CALLBACK callback;
179 } work;
180 struct
182 PTP_TIMER_CALLBACK callback;
183 /* information about the timer, locked via timerqueue.cs */
184 BOOL timer_initialized;
185 BOOL timer_pending;
186 struct list timer_entry;
187 BOOL timer_set;
188 ULONGLONG timeout;
189 LONG period;
190 LONG window_length;
191 } timer;
192 struct
194 PTP_WAIT_CALLBACK callback;
195 LONG signaled;
196 /* information about the wait object, locked via waitqueue.cs */
197 struct waitqueue_bucket *bucket;
198 BOOL wait_pending;
199 struct list wait_entry;
200 ULONGLONG timeout;
201 HANDLE handle;
202 } wait;
203 } u;
206 /* internal threadpool instance representation */
207 struct threadpool_instance
209 struct threadpool_object *object;
210 DWORD threadid;
211 BOOL associated;
212 BOOL may_run_long;
213 struct
215 CRITICAL_SECTION *critical_section;
216 HANDLE mutex;
217 HANDLE semaphore;
218 LONG semaphore_count;
219 HANDLE event;
220 HMODULE library;
221 } cleanup;
224 /* internal threadpool group representation */
225 struct threadpool_group
227 LONG refcount;
228 BOOL shutdown;
229 CRITICAL_SECTION cs;
230 /* list of group members, locked via .cs */
231 struct list members;
234 /* global timerqueue object */
235 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
237 static struct
239 CRITICAL_SECTION cs;
240 LONG objcount;
241 BOOL thread_running;
242 struct list pending_timers;
243 RTL_CONDITION_VARIABLE update_event;
245 timerqueue =
247 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
248 0, /* objcount */
249 FALSE, /* thread_running */
250 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
251 RTL_CONDITION_VARIABLE_INIT /* update_event */
254 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
256 0, 0, &timerqueue.cs,
257 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
258 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
261 /* global waitqueue object */
262 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
264 static struct
266 CRITICAL_SECTION cs;
267 LONG num_buckets;
268 struct list buckets;
270 waitqueue =
272 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
273 0, /* num_buckets */
274 LIST_INIT( waitqueue.buckets ) /* buckets */
277 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
279 0, 0, &waitqueue.cs,
280 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
281 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
284 struct waitqueue_bucket
286 struct list bucket_entry;
287 LONG objcount;
288 struct list reserved;
289 struct list waiting;
290 HANDLE update_event;
293 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
295 return (struct threadpool *)pool;
298 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
300 struct threadpool_object *object = (struct threadpool_object *)work;
301 assert( object->type == TP_OBJECT_TYPE_WORK );
302 return object;
305 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
307 struct threadpool_object *object = (struct threadpool_object *)timer;
308 assert( object->type == TP_OBJECT_TYPE_TIMER );
309 return object;
312 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
314 struct threadpool_object *object = (struct threadpool_object *)wait;
315 assert( object->type == TP_OBJECT_TYPE_WAIT );
316 return object;
319 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
321 return (struct threadpool_group *)group;
324 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
326 return (struct threadpool_instance *)instance;
329 static void CALLBACK threadpool_worker_proc( void *param );
330 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
331 static void tp_object_prepare_shutdown( struct threadpool_object *object );
332 static BOOL tp_object_release( struct threadpool_object *object );
333 static struct threadpool *default_threadpool = NULL;
335 static inline LONG interlocked_inc( PLONG dest )
337 return interlocked_xchg_add( dest, 1 ) + 1;
340 static inline LONG interlocked_dec( PLONG dest )
342 return interlocked_xchg_add( dest, -1 ) - 1;
345 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
347 struct rtl_work_item *item = userdata;
349 TRACE("executing %p(%p)\n", item->function, item->context);
350 item->function( item->context );
352 RtlFreeHeap( GetProcessHeap(), 0, item );
355 /***********************************************************************
356 * RtlQueueWorkItem (NTDLL.@)
358 * Queues a work item into a thread in the thread pool.
360 * PARAMS
361 * function [I] Work function to execute.
362 * context [I] Context to pass to the work function when it is executed.
363 * flags [I] Flags. See notes.
365 * RETURNS
366 * Success: STATUS_SUCCESS.
367 * Failure: Any NTSTATUS code.
369 * NOTES
370 * Flags can be one or more of the following:
371 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
372 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
373 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
374 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
375 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
377 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
379 TP_CALLBACK_ENVIRON environment;
380 struct rtl_work_item *item;
381 NTSTATUS status;
383 TRACE( "%p %p %u\n", function, context, flags );
385 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
386 if (!item)
387 return STATUS_NO_MEMORY;
389 memset( &environment, 0, sizeof(environment) );
390 environment.Version = 1;
391 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
392 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
394 item->function = function;
395 item->context = context;
397 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
398 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
399 return status;
402 /***********************************************************************
403 * iocp_poller - get completion events and run callbacks
405 static DWORD CALLBACK iocp_poller(LPVOID Arg)
407 HANDLE cport = Arg;
409 while( TRUE )
411 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
412 LPVOID overlapped;
413 IO_STATUS_BLOCK iosb;
414 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
415 if (res)
417 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
419 else
421 DWORD transferred = 0;
422 DWORD err = 0;
424 if (iosb.u.Status == STATUS_SUCCESS)
425 transferred = iosb.Information;
426 else
427 err = RtlNtStatusToDosError(iosb.u.Status);
429 callback( err, transferred, overlapped );
432 return 0;
435 /***********************************************************************
436 * RtlSetIoCompletionCallback (NTDLL.@)
438 * Binds a handle to a thread pool's completion port, and possibly
439 * starts a non-I/O thread to monitor this port and call functions back.
441 * PARAMS
442 * FileHandle [I] Handle to bind to a completion port.
443 * Function [I] Callback function to call on I/O completions.
444 * Flags [I] Not used.
446 * RETURNS
447 * Success: STATUS_SUCCESS.
448 * Failure: Any NTSTATUS code.
451 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
453 IO_STATUS_BLOCK iosb;
454 FILE_COMPLETION_INFORMATION info;
456 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
458 if (!old_threadpool.compl_port)
460 NTSTATUS res = STATUS_SUCCESS;
462 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
463 if (!old_threadpool.compl_port)
465 HANDLE cport;
467 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
468 if (!res)
470 /* FIXME native can start additional threads in case of e.g. hung callback function. */
471 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
472 if (!res)
473 old_threadpool.compl_port = cport;
474 else
475 NtClose( cport );
478 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
479 if (res) return res;
482 info.CompletionPort = old_threadpool.compl_port;
483 info.CompletionKey = (ULONG_PTR)Function;
485 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
488 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
490 if (timeout == INFINITE) return NULL;
491 pTime->QuadPart = (ULONGLONG)timeout * -10000;
492 return pTime;
495 static void delete_wait_work_item(struct wait_work_item *wait_work_item)
497 NtClose( wait_work_item->CancelEvent );
498 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
501 static DWORD CALLBACK wait_thread_proc(LPVOID Arg)
503 struct wait_work_item *wait_work_item = Arg;
504 NTSTATUS status;
505 BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0;
506 HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent };
507 LARGE_INTEGER timeout;
508 HANDLE completion_event;
510 TRACE("\n");
512 while (TRUE)
514 status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable,
515 get_nt_timeout( &timeout, wait_work_item->Milliseconds ) );
516 if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT)
518 BOOLEAN TimerOrWaitFired;
520 if (status == STATUS_WAIT_0)
522 TRACE( "object %p signaled, calling callback %p with context %p\n",
523 wait_work_item->Object, wait_work_item->Callback,
524 wait_work_item->Context );
525 TimerOrWaitFired = FALSE;
527 else
529 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
530 wait_work_item->Object, wait_work_item->Callback,
531 wait_work_item->Context );
532 TimerOrWaitFired = TRUE;
534 interlocked_xchg( &wait_work_item->CallbackInProgress, TRUE );
535 if (wait_work_item->CompletionEvent)
537 TRACE( "Work has been canceled.\n" );
538 break;
540 wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired );
541 interlocked_xchg( &wait_work_item->CallbackInProgress, FALSE );
543 if (wait_work_item->Flags & WT_EXECUTEONLYONCE)
544 break;
546 else if (status != STATUS_USER_APC)
547 break;
551 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
553 completion_event = wait_work_item->CompletionEvent;
554 delete_wait_work_item( wait_work_item );
555 if (completion_event && completion_event != INVALID_HANDLE_VALUE)
556 NtSetEvent( completion_event, NULL );
559 return 0;
562 /***********************************************************************
563 * RtlRegisterWait (NTDLL.@)
565 * Registers a wait for a handle to become signaled.
567 * PARAMS
568 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
569 * Object [I] Object to wait to become signaled.
570 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
571 * Context [I] Context to pass to the callback function when it is executed.
572 * Milliseconds [I] Number of milliseconds to wait before timing out.
573 * Flags [I] Flags. See notes.
575 * RETURNS
576 * Success: STATUS_SUCCESS.
577 * Failure: Any NTSTATUS code.
579 * NOTES
580 * Flags can be one or more of the following:
581 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
582 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
583 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
584 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
585 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
587 NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object,
588 RTL_WAITORTIMERCALLBACKFUNC Callback,
589 PVOID Context, ULONG Milliseconds, ULONG Flags)
591 struct wait_work_item *wait_work_item;
592 NTSTATUS status;
594 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags );
596 wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) );
597 if (!wait_work_item)
598 return STATUS_NO_MEMORY;
600 wait_work_item->Object = Object;
601 wait_work_item->Callback = Callback;
602 wait_work_item->Context = Context;
603 wait_work_item->Milliseconds = Milliseconds;
604 wait_work_item->Flags = Flags;
605 wait_work_item->CallbackInProgress = FALSE;
606 wait_work_item->DeleteCount = 0;
607 wait_work_item->CompletionEvent = NULL;
609 status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
610 if (status != STATUS_SUCCESS)
612 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
613 return status;
616 Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD |
617 WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION);
618 status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags );
619 if (status != STATUS_SUCCESS)
621 delete_wait_work_item( wait_work_item );
622 return status;
625 *NewWaitObject = wait_work_item;
626 return status;
629 /***********************************************************************
630 * RtlDeregisterWaitEx (NTDLL.@)
632 * Cancels a wait operation and frees the resources associated with calling
633 * RtlRegisterWait().
635 * PARAMS
636 * WaitObject [I] Handle to the wait object to free.
638 * RETURNS
639 * Success: STATUS_SUCCESS.
640 * Failure: Any NTSTATUS code.
642 NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent)
644 struct wait_work_item *wait_work_item = WaitHandle;
645 NTSTATUS status;
646 HANDLE LocalEvent = NULL;
647 int CallbackInProgress;
649 TRACE( "(%p %p)\n", WaitHandle, CompletionEvent );
651 if (WaitHandle == NULL)
652 return STATUS_INVALID_HANDLE;
654 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, INVALID_HANDLE_VALUE );
655 CallbackInProgress = wait_work_item->CallbackInProgress;
656 TRACE( "callback in progress %u\n", CallbackInProgress );
657 if (CompletionEvent == INVALID_HANDLE_VALUE || !CallbackInProgress)
659 status = NtCreateEvent( &LocalEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
660 if (status != STATUS_SUCCESS)
661 return status;
662 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, LocalEvent );
664 else if (CompletionEvent != NULL)
666 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
669 NtSetEvent( wait_work_item->CancelEvent, NULL );
671 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
673 status = STATUS_SUCCESS;
674 delete_wait_work_item( wait_work_item );
676 else if (LocalEvent)
678 TRACE( "Waiting for completion event\n" );
679 NtWaitForSingleObject( LocalEvent, FALSE, NULL );
680 status = STATUS_SUCCESS;
682 else
684 status = STATUS_PENDING;
687 if (LocalEvent)
688 NtClose( LocalEvent );
690 return status;
693 /***********************************************************************
694 * RtlDeregisterWait (NTDLL.@)
696 * Cancels a wait operation and frees the resources associated with calling
697 * RtlRegisterWait().
699 * PARAMS
700 * WaitObject [I] Handle to the wait object to free.
702 * RETURNS
703 * Success: STATUS_SUCCESS.
704 * Failure: Any NTSTATUS code.
706 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
708 return RtlDeregisterWaitEx(WaitHandle, NULL);
712 /************************** Timer Queue Impl **************************/
714 static void queue_remove_timer(struct queue_timer *t)
716 /* We MUST hold the queue cs while calling this function. This ensures
717 that we cannot queue another callback for this timer. The runcount
718 being zero makes sure we don't have any already queued. */
719 struct timer_queue *q = t->q;
721 assert(t->runcount == 0);
722 assert(t->destroy);
724 list_remove(&t->entry);
725 if (t->event)
726 NtSetEvent(t->event, NULL);
727 RtlFreeHeap(GetProcessHeap(), 0, t);
729 if (q->quit && list_empty(&q->timers))
730 NtSetEvent(q->event, NULL);
733 static void timer_cleanup_callback(struct queue_timer *t)
735 struct timer_queue *q = t->q;
736 RtlEnterCriticalSection(&q->cs);
738 assert(0 < t->runcount);
739 --t->runcount;
741 if (t->destroy && t->runcount == 0)
742 queue_remove_timer(t);
744 RtlLeaveCriticalSection(&q->cs);
747 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
749 struct queue_timer *t = p;
750 t->callback(t->param, TRUE);
751 timer_cleanup_callback(t);
752 return 0;
755 static inline ULONGLONG queue_current_time(void)
757 LARGE_INTEGER now, freq;
758 NtQueryPerformanceCounter(&now, &freq);
759 return now.QuadPart * 1000 / freq.QuadPart;
762 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
763 BOOL set_event)
765 /* We MUST hold the queue cs while calling this function. */
766 struct timer_queue *q = t->q;
767 struct list *ptr = &q->timers;
769 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
771 if (time != EXPIRE_NEVER)
772 LIST_FOR_EACH(ptr, &q->timers)
774 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
775 if (time < cur->expire)
776 break;
778 list_add_before(ptr, &t->entry);
780 t->expire = time;
782 /* If we insert at the head of the list, we need to expire sooner
783 than expected. */
784 if (set_event && &t->entry == list_head(&q->timers))
785 NtSetEvent(q->event, NULL);
788 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
789 BOOL set_event)
791 /* We MUST hold the queue cs while calling this function. */
792 list_remove(&t->entry);
793 queue_add_timer(t, time, set_event);
796 static void queue_timer_expire(struct timer_queue *q)
798 struct queue_timer *t = NULL;
800 RtlEnterCriticalSection(&q->cs);
801 if (list_head(&q->timers))
803 ULONGLONG now, next;
804 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
805 if (!t->destroy && t->expire <= ((now = queue_current_time())))
807 ++t->runcount;
808 if (t->period)
810 next = t->expire + t->period;
811 /* avoid trigger cascade if overloaded / hibernated */
812 if (next < now)
813 next = now + t->period;
815 else
816 next = EXPIRE_NEVER;
817 queue_move_timer(t, next, FALSE);
819 else
820 t = NULL;
822 RtlLeaveCriticalSection(&q->cs);
824 if (t)
826 if (t->flags & WT_EXECUTEINTIMERTHREAD)
827 timer_callback_wrapper(t);
828 else
830 ULONG flags
831 = (t->flags
832 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
833 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
834 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
835 if (status != STATUS_SUCCESS)
836 timer_cleanup_callback(t);
841 static ULONG queue_get_timeout(struct timer_queue *q)
843 struct queue_timer *t;
844 ULONG timeout = INFINITE;
846 RtlEnterCriticalSection(&q->cs);
847 if (list_head(&q->timers))
849 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
850 assert(!t->destroy || t->expire == EXPIRE_NEVER);
852 if (t->expire != EXPIRE_NEVER)
854 ULONGLONG time = queue_current_time();
855 timeout = t->expire < time ? 0 : t->expire - time;
858 RtlLeaveCriticalSection(&q->cs);
860 return timeout;
863 static void WINAPI timer_queue_thread_proc(LPVOID p)
865 struct timer_queue *q = p;
866 ULONG timeout_ms;
868 timeout_ms = INFINITE;
869 for (;;)
871 LARGE_INTEGER timeout;
872 NTSTATUS status;
873 BOOL done = FALSE;
875 status = NtWaitForSingleObject(
876 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
878 if (status == STATUS_WAIT_0)
880 /* There are two possible ways to trigger the event. Either
881 we are quitting and the last timer got removed, or a new
882 timer got put at the head of the list so we need to adjust
883 our timeout. */
884 RtlEnterCriticalSection(&q->cs);
885 if (q->quit && list_empty(&q->timers))
886 done = TRUE;
887 RtlLeaveCriticalSection(&q->cs);
889 else if (status == STATUS_TIMEOUT)
890 queue_timer_expire(q);
892 if (done)
893 break;
895 timeout_ms = queue_get_timeout(q);
898 NtClose(q->event);
899 RtlDeleteCriticalSection(&q->cs);
900 q->magic = 0;
901 RtlFreeHeap(GetProcessHeap(), 0, q);
902 RtlExitUserThread( 0 );
905 static void queue_destroy_timer(struct queue_timer *t)
907 /* We MUST hold the queue cs while calling this function. */
908 t->destroy = TRUE;
909 if (t->runcount == 0)
910 /* Ensure a timer is promptly removed. If callbacks are pending,
911 it will be removed after the last one finishes by the callback
912 cleanup wrapper. */
913 queue_remove_timer(t);
914 else
915 /* Make sure no destroyed timer masks an active timer at the head
916 of the sorted list. */
917 queue_move_timer(t, EXPIRE_NEVER, FALSE);
920 /***********************************************************************
921 * RtlCreateTimerQueue (NTDLL.@)
923 * Creates a timer queue object and returns a handle to it.
925 * PARAMS
926 * NewTimerQueue [O] The newly created queue.
928 * RETURNS
929 * Success: STATUS_SUCCESS.
930 * Failure: Any NTSTATUS code.
932 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
934 NTSTATUS status;
935 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
936 if (!q)
937 return STATUS_NO_MEMORY;
939 RtlInitializeCriticalSection(&q->cs);
940 list_init(&q->timers);
941 q->quit = FALSE;
942 q->magic = TIMER_QUEUE_MAGIC;
943 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
944 if (status != STATUS_SUCCESS)
946 RtlFreeHeap(GetProcessHeap(), 0, q);
947 return status;
949 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
950 timer_queue_thread_proc, q, &q->thread, NULL);
951 if (status != STATUS_SUCCESS)
953 NtClose(q->event);
954 RtlFreeHeap(GetProcessHeap(), 0, q);
955 return status;
958 *NewTimerQueue = q;
959 return STATUS_SUCCESS;
962 /***********************************************************************
963 * RtlDeleteTimerQueueEx (NTDLL.@)
965 * Deletes a timer queue object.
967 * PARAMS
968 * TimerQueue [I] The timer queue to destroy.
969 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
970 * wait until all timers are finished firing before
971 * returning. Otherwise, return immediately and set the
972 * event when all timers are done.
974 * RETURNS
975 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
976 * Failure: Any NTSTATUS code.
978 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
980 struct timer_queue *q = TimerQueue;
981 struct queue_timer *t, *temp;
982 HANDLE thread;
983 NTSTATUS status;
985 if (!q || q->magic != TIMER_QUEUE_MAGIC)
986 return STATUS_INVALID_HANDLE;
988 thread = q->thread;
990 RtlEnterCriticalSection(&q->cs);
991 q->quit = TRUE;
992 if (list_head(&q->timers))
993 /* When the last timer is removed, it will signal the timer thread to
994 exit... */
995 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
996 queue_destroy_timer(t);
997 else
998 /* However if we have none, we must do it ourselves. */
999 NtSetEvent(q->event, NULL);
1000 RtlLeaveCriticalSection(&q->cs);
1002 if (CompletionEvent == INVALID_HANDLE_VALUE)
1004 NtWaitForSingleObject(thread, FALSE, NULL);
1005 status = STATUS_SUCCESS;
1007 else
1009 if (CompletionEvent)
1011 FIXME("asynchronous return on completion event unimplemented\n");
1012 NtWaitForSingleObject(thread, FALSE, NULL);
1013 NtSetEvent(CompletionEvent, NULL);
1015 status = STATUS_PENDING;
1018 NtClose(thread);
1019 return status;
1022 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
1024 static struct timer_queue *default_timer_queue;
1026 if (TimerQueue)
1027 return TimerQueue;
1028 else
1030 if (!default_timer_queue)
1032 HANDLE q;
1033 NTSTATUS status = RtlCreateTimerQueue(&q);
1034 if (status == STATUS_SUCCESS)
1036 PVOID p = interlocked_cmpxchg_ptr(
1037 (void **) &default_timer_queue, q, NULL);
1038 if (p)
1039 /* Got beat to the punch. */
1040 RtlDeleteTimerQueueEx(q, NULL);
1043 return default_timer_queue;
1047 /***********************************************************************
1048 * RtlCreateTimer (NTDLL.@)
1050 * Creates a new timer associated with the given queue.
1052 * PARAMS
1053 * NewTimer [O] The newly created timer.
1054 * TimerQueue [I] The queue to hold the timer.
1055 * Callback [I] The callback to fire.
1056 * Parameter [I] The argument for the callback.
1057 * DueTime [I] The delay, in milliseconds, before first firing the
1058 * timer.
1059 * Period [I] The period, in milliseconds, at which to fire the timer
1060 * after the first callback. If zero, the timer will only
1061 * fire once. It still needs to be deleted with
1062 * RtlDeleteTimer.
1063 * Flags [I] Flags controlling the execution of the callback. In
1064 * addition to the WT_* thread pool flags (see
1065 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1066 * WT_EXECUTEONLYONCE are supported.
1068 * RETURNS
1069 * Success: STATUS_SUCCESS.
1070 * Failure: Any NTSTATUS code.
1072 NTSTATUS WINAPI RtlCreateTimer(PHANDLE NewTimer, HANDLE TimerQueue,
1073 RTL_WAITORTIMERCALLBACKFUNC Callback,
1074 PVOID Parameter, DWORD DueTime, DWORD Period,
1075 ULONG Flags)
1077 NTSTATUS status;
1078 struct queue_timer *t;
1079 struct timer_queue *q = get_timer_queue(TimerQueue);
1081 if (!q) return STATUS_NO_MEMORY;
1082 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1084 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1085 if (!t)
1086 return STATUS_NO_MEMORY;
1088 t->q = q;
1089 t->runcount = 0;
1090 t->callback = Callback;
1091 t->param = Parameter;
1092 t->period = Period;
1093 t->flags = Flags;
1094 t->destroy = FALSE;
1095 t->event = NULL;
1097 status = STATUS_SUCCESS;
1098 RtlEnterCriticalSection(&q->cs);
1099 if (q->quit)
1100 status = STATUS_INVALID_HANDLE;
1101 else
1102 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
1103 RtlLeaveCriticalSection(&q->cs);
1105 if (status == STATUS_SUCCESS)
1106 *NewTimer = t;
1107 else
1108 RtlFreeHeap(GetProcessHeap(), 0, t);
1110 return status;
1113 /***********************************************************************
1114 * RtlUpdateTimer (NTDLL.@)
1116 * Changes the time at which a timer expires.
1118 * PARAMS
1119 * TimerQueue [I] The queue that holds the timer.
1120 * Timer [I] The timer to update.
1121 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1122 * Period [I] The period, in milliseconds, at which to fire the timer
1123 * after the first callback. If zero, the timer will not
1124 * refire once. It still needs to be deleted with
1125 * RtlDeleteTimer.
1127 * RETURNS
1128 * Success: STATUS_SUCCESS.
1129 * Failure: Any NTSTATUS code.
1131 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
1132 DWORD DueTime, DWORD Period)
1134 struct queue_timer *t = Timer;
1135 struct timer_queue *q = t->q;
1137 RtlEnterCriticalSection(&q->cs);
1138 /* Can't change a timer if it was once-only or destroyed. */
1139 if (t->expire != EXPIRE_NEVER)
1141 t->period = Period;
1142 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
1144 RtlLeaveCriticalSection(&q->cs);
1146 return STATUS_SUCCESS;
1149 /***********************************************************************
1150 * RtlDeleteTimer (NTDLL.@)
1152 * Cancels a timer-queue timer.
1154 * PARAMS
1155 * TimerQueue [I] The queue that holds the timer.
1156 * Timer [I] The timer to update.
1157 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1158 * wait until the timer is finished firing all pending
1159 * callbacks before returning. Otherwise, return
1160 * immediately and set the timer is done.
1162 * RETURNS
1163 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1164 or if the completion event is NULL.
1165 * Failure: Any NTSTATUS code.
1167 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1168 HANDLE CompletionEvent)
1170 struct queue_timer *t = Timer;
1171 struct timer_queue *q;
1172 NTSTATUS status = STATUS_PENDING;
1173 HANDLE event = NULL;
1175 if (!Timer)
1176 return STATUS_INVALID_PARAMETER_1;
1177 q = t->q;
1178 if (CompletionEvent == INVALID_HANDLE_VALUE)
1180 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1181 if (status == STATUS_SUCCESS)
1182 status = STATUS_PENDING;
1184 else if (CompletionEvent)
1185 event = CompletionEvent;
1187 RtlEnterCriticalSection(&q->cs);
1188 t->event = event;
1189 if (t->runcount == 0 && event)
1190 status = STATUS_SUCCESS;
1191 queue_destroy_timer(t);
1192 RtlLeaveCriticalSection(&q->cs);
1194 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1196 if (status == STATUS_PENDING)
1198 NtWaitForSingleObject(event, FALSE, NULL);
1199 status = STATUS_SUCCESS;
1201 NtClose(event);
1204 return status;
1207 /***********************************************************************
1208 * timerqueue_thread_proc (internal)
1210 static void CALLBACK timerqueue_thread_proc( void *param )
1212 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1213 struct threadpool_object *other_timer;
1214 LARGE_INTEGER now, timeout;
1215 struct list *ptr;
1217 TRACE( "starting timer queue thread\n" );
1219 RtlEnterCriticalSection( &timerqueue.cs );
1220 for (;;)
1222 NtQuerySystemTime( &now );
1224 /* Check for expired timers. */
1225 while ((ptr = list_head( &timerqueue.pending_timers )))
1227 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1228 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1229 assert( timer->u.timer.timer_pending );
1230 if (timer->u.timer.timeout > now.QuadPart)
1231 break;
1233 /* Queue a new callback in one of the worker threads. */
1234 list_remove( &timer->u.timer.timer_entry );
1235 timer->u.timer.timer_pending = FALSE;
1236 tp_object_submit( timer, FALSE );
1238 /* Insert the timer back into the queue, except it's marked for shutdown. */
1239 if (timer->u.timer.period && !timer->shutdown)
1241 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1242 if (timer->u.timer.timeout <= now.QuadPart)
1243 timer->u.timer.timeout = now.QuadPart + 1;
1245 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1246 struct threadpool_object, u.timer.timer_entry )
1248 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1249 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1250 break;
1252 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1253 timer->u.timer.timer_pending = TRUE;
1257 timeout_lower = TIMEOUT_INFINITE;
1258 timeout_upper = TIMEOUT_INFINITE;
1260 /* Determine next timeout and use the window length to optimize wakeup times. */
1261 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1262 struct threadpool_object, u.timer.timer_entry )
1264 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1265 if (other_timer->u.timer.timeout >= timeout_upper)
1266 break;
1268 timeout_lower = other_timer->u.timer.timeout;
1269 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1270 if (new_timeout < timeout_upper)
1271 timeout_upper = new_timeout;
1274 /* Wait for timer update events or until the next timer expires. */
1275 if (timerqueue.objcount)
1277 timeout.QuadPart = timeout_lower;
1278 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1279 continue;
1282 /* All timers have been destroyed, if no new timers are created
1283 * within some amount of time, then we can shutdown this thread. */
1284 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1285 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1286 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1288 break;
1292 timerqueue.thread_running = FALSE;
1293 RtlLeaveCriticalSection( &timerqueue.cs );
1295 TRACE( "terminating timer queue thread\n" );
1296 RtlExitUserThread( 0 );
1299 /***********************************************************************
1300 * tp_new_worker_thread (internal)
1302 * Create and account a new worker thread for the desired pool.
1304 static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1306 HANDLE thread;
1307 NTSTATUS status;
1309 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1310 threadpool_worker_proc, pool, &thread, NULL );
1311 if (status == STATUS_SUCCESS)
1313 interlocked_inc( &pool->refcount );
1314 pool->num_workers++;
1315 pool->num_busy_workers++;
1316 NtClose( thread );
1318 return status;
1321 /***********************************************************************
1322 * tp_timerqueue_lock (internal)
1324 * Acquires a lock on the global timerqueue. When the lock is acquired
1325 * successfully, it is guaranteed that the timer thread is running.
1327 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1329 NTSTATUS status = STATUS_SUCCESS;
1330 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1332 timer->u.timer.timer_initialized = FALSE;
1333 timer->u.timer.timer_pending = FALSE;
1334 timer->u.timer.timer_set = FALSE;
1335 timer->u.timer.timeout = 0;
1336 timer->u.timer.period = 0;
1337 timer->u.timer.window_length = 0;
1339 RtlEnterCriticalSection( &timerqueue.cs );
1341 /* Make sure that the timerqueue thread is running. */
1342 if (!timerqueue.thread_running)
1344 HANDLE thread;
1345 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1346 timerqueue_thread_proc, NULL, &thread, NULL );
1347 if (status == STATUS_SUCCESS)
1349 timerqueue.thread_running = TRUE;
1350 NtClose( thread );
1354 if (status == STATUS_SUCCESS)
1356 timer->u.timer.timer_initialized = TRUE;
1357 timerqueue.objcount++;
1360 RtlLeaveCriticalSection( &timerqueue.cs );
1361 return status;
1364 /***********************************************************************
1365 * tp_timerqueue_unlock (internal)
1367 * Releases a lock on the global timerqueue.
1369 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1371 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1373 RtlEnterCriticalSection( &timerqueue.cs );
1374 if (timer->u.timer.timer_initialized)
1376 /* If timer was pending, remove it. */
1377 if (timer->u.timer.timer_pending)
1379 list_remove( &timer->u.timer.timer_entry );
1380 timer->u.timer.timer_pending = FALSE;
1383 /* If the last timer object was destroyed, then wake up the thread. */
1384 if (!--timerqueue.objcount)
1386 assert( list_empty( &timerqueue.pending_timers ) );
1387 RtlWakeAllConditionVariable( &timerqueue.update_event );
1390 timer->u.timer.timer_initialized = FALSE;
1392 RtlLeaveCriticalSection( &timerqueue.cs );
1395 /***********************************************************************
1396 * waitqueue_thread_proc (internal)
1398 static void CALLBACK waitqueue_thread_proc( void *param )
1400 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1401 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1402 struct waitqueue_bucket *bucket = param;
1403 struct threadpool_object *wait, *next;
1404 LARGE_INTEGER now, timeout;
1405 DWORD num_handles;
1406 NTSTATUS status;
1408 TRACE( "starting wait queue thread\n" );
1410 RtlEnterCriticalSection( &waitqueue.cs );
1412 for (;;)
1414 NtQuerySystemTime( &now );
1415 timeout.QuadPart = TIMEOUT_INFINITE;
1416 num_handles = 0;
1418 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1419 u.wait.wait_entry )
1421 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1422 if (wait->u.wait.timeout <= now.QuadPart)
1424 /* Wait object timed out. */
1425 list_remove( &wait->u.wait.wait_entry );
1426 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1427 tp_object_submit( wait, FALSE );
1429 else
1431 if (wait->u.wait.timeout < timeout.QuadPart)
1432 timeout.QuadPart = wait->u.wait.timeout;
1434 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1435 interlocked_inc( &wait->refcount );
1436 objects[num_handles] = wait;
1437 handles[num_handles] = wait->u.wait.handle;
1438 num_handles++;
1442 if (!bucket->objcount)
1444 /* All wait objects have been destroyed, if no new wait objects are created
1445 * within some amount of time, then we can shutdown this thread. */
1446 assert( num_handles == 0 );
1447 RtlLeaveCriticalSection( &waitqueue.cs );
1448 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1449 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout );
1450 RtlEnterCriticalSection( &waitqueue.cs );
1452 if (status == STATUS_TIMEOUT && !bucket->objcount)
1453 break;
1455 else
1457 handles[num_handles] = bucket->update_event;
1458 RtlLeaveCriticalSection( &waitqueue.cs );
1459 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout );
1460 RtlEnterCriticalSection( &waitqueue.cs );
1462 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1464 wait = objects[status - STATUS_WAIT_0];
1465 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1466 if (wait->u.wait.bucket)
1468 /* Wait object signaled. */
1469 assert( wait->u.wait.bucket == bucket );
1470 list_remove( &wait->u.wait.wait_entry );
1471 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1472 tp_object_submit( wait, TRUE );
1474 else
1475 WARN("wait object %p triggered while object was destroyed\n", wait);
1478 /* Release temporary references to wait objects. */
1479 while (num_handles)
1481 wait = objects[--num_handles];
1482 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1483 tp_object_release( wait );
1487 /* Try to merge bucket with other threads. */
1488 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1489 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1491 struct waitqueue_bucket *other_bucket;
1492 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1494 if (other_bucket != bucket && other_bucket->objcount &&
1495 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1497 other_bucket->objcount += bucket->objcount;
1498 bucket->objcount = 0;
1500 /* Update reserved list. */
1501 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1503 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1504 wait->u.wait.bucket = other_bucket;
1506 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1508 /* Update waiting list. */
1509 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1511 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1512 wait->u.wait.bucket = other_bucket;
1514 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1516 /* Move bucket to the end, to keep the probability of
1517 * newly added wait objects as small as possible. */
1518 list_remove( &bucket->bucket_entry );
1519 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1521 NtSetEvent( other_bucket->update_event, NULL );
1522 break;
1528 /* Remove this bucket from the list. */
1529 list_remove( &bucket->bucket_entry );
1530 if (!--waitqueue.num_buckets)
1531 assert( list_empty( &waitqueue.buckets ) );
1533 RtlLeaveCriticalSection( &waitqueue.cs );
1535 TRACE( "terminating wait queue thread\n" );
1537 assert( bucket->objcount == 0 );
1538 assert( list_empty( &bucket->reserved ) );
1539 assert( list_empty( &bucket->waiting ) );
1540 NtClose( bucket->update_event );
1542 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1543 RtlExitUserThread( 0 );
1546 /***********************************************************************
1547 * tp_waitqueue_lock (internal)
1549 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1551 struct waitqueue_bucket *bucket;
1552 NTSTATUS status;
1553 HANDLE thread;
1554 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1556 wait->u.wait.signaled = 0;
1557 wait->u.wait.bucket = NULL;
1558 wait->u.wait.wait_pending = FALSE;
1559 wait->u.wait.timeout = 0;
1560 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1562 RtlEnterCriticalSection( &waitqueue.cs );
1564 /* Try to assign to existing bucket if possible. */
1565 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1567 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS)
1569 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1570 wait->u.wait.bucket = bucket;
1571 bucket->objcount++;
1573 status = STATUS_SUCCESS;
1574 goto out;
1578 /* Create a new bucket and corresponding worker thread. */
1579 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1580 if (!bucket)
1582 status = STATUS_NO_MEMORY;
1583 goto out;
1586 bucket->objcount = 0;
1587 list_init( &bucket->reserved );
1588 list_init( &bucket->waiting );
1590 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1591 NULL, SynchronizationEvent, FALSE );
1592 if (status)
1594 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1595 goto out;
1598 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1599 waitqueue_thread_proc, bucket, &thread, NULL );
1600 if (status == STATUS_SUCCESS)
1602 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1603 waitqueue.num_buckets++;
1605 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1606 wait->u.wait.bucket = bucket;
1607 bucket->objcount++;
1609 NtClose( thread );
1611 else
1613 NtClose( bucket->update_event );
1614 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1617 out:
1618 RtlLeaveCriticalSection( &waitqueue.cs );
1619 return status;
1622 /***********************************************************************
1623 * tp_waitqueue_unlock (internal)
1625 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1627 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1629 RtlEnterCriticalSection( &waitqueue.cs );
1630 if (wait->u.wait.bucket)
1632 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1633 assert( bucket->objcount > 0 );
1635 list_remove( &wait->u.wait.wait_entry );
1636 wait->u.wait.bucket = NULL;
1637 bucket->objcount--;
1639 NtSetEvent( bucket->update_event, NULL );
1641 RtlLeaveCriticalSection( &waitqueue.cs );
1644 /***********************************************************************
1645 * tp_threadpool_alloc (internal)
1647 * Allocates a new threadpool object.
1649 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1651 struct threadpool *pool;
1652 unsigned int i;
1654 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1655 if (!pool)
1656 return STATUS_NO_MEMORY;
1658 pool->refcount = 1;
1659 pool->objcount = 0;
1660 pool->shutdown = FALSE;
1662 RtlInitializeCriticalSection( &pool->cs );
1663 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1665 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1666 list_init( &pool->pools[i] );
1667 RtlInitializeConditionVariable( &pool->update_event );
1669 pool->max_workers = 500;
1670 pool->min_workers = 0;
1671 pool->num_workers = 0;
1672 pool->num_busy_workers = 0;
1674 TRACE( "allocated threadpool %p\n", pool );
1676 *out = pool;
1677 return STATUS_SUCCESS;
1680 /***********************************************************************
1681 * tp_threadpool_shutdown (internal)
1683 * Prepares the shutdown of a threadpool object and notifies all worker
1684 * threads to terminate (after all remaining work items have been
1685 * processed).
1687 static void tp_threadpool_shutdown( struct threadpool *pool )
1689 assert( pool != default_threadpool );
1691 pool->shutdown = TRUE;
1692 RtlWakeAllConditionVariable( &pool->update_event );
1695 /***********************************************************************
1696 * tp_threadpool_release (internal)
1698 * Releases a reference to a threadpool object.
1700 static BOOL tp_threadpool_release( struct threadpool *pool )
1702 unsigned int i;
1704 if (interlocked_dec( &pool->refcount ))
1705 return FALSE;
1707 TRACE( "destroying threadpool %p\n", pool );
1709 assert( pool->shutdown );
1710 assert( !pool->objcount );
1711 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1712 assert( list_empty( &pool->pools[i] ) );
1714 pool->cs.DebugInfo->Spare[0] = 0;
1715 RtlDeleteCriticalSection( &pool->cs );
1717 RtlFreeHeap( GetProcessHeap(), 0, pool );
1718 return TRUE;
1721 /***********************************************************************
1722 * tp_threadpool_lock (internal)
1724 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1725 * block. When the lock is acquired successfully, it is guaranteed that
1726 * there is at least one worker thread to process tasks.
1728 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1730 struct threadpool *pool = NULL;
1731 NTSTATUS status = STATUS_SUCCESS;
1733 if (environment)
1735 /* Validate environment parameters. */
1736 if (environment->Version == 3)
1738 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1740 switch (environment3->CallbackPriority)
1742 case TP_CALLBACK_PRIORITY_HIGH:
1743 case TP_CALLBACK_PRIORITY_NORMAL:
1744 case TP_CALLBACK_PRIORITY_LOW:
1745 break;
1746 default:
1747 return STATUS_INVALID_PARAMETER;
1751 pool = (struct threadpool *)environment->Pool;
1754 if (!pool)
1756 if (!default_threadpool)
1758 status = tp_threadpool_alloc( &pool );
1759 if (status != STATUS_SUCCESS)
1760 return status;
1762 if (interlocked_cmpxchg_ptr( (void *)&default_threadpool, pool, NULL ) != NULL)
1764 tp_threadpool_shutdown( pool );
1765 tp_threadpool_release( pool );
1769 pool = default_threadpool;
1772 RtlEnterCriticalSection( &pool->cs );
1774 /* Make sure that the threadpool has at least one thread. */
1775 if (!pool->num_workers)
1776 status = tp_new_worker_thread( pool );
1778 /* Keep a reference, and increment objcount to ensure that the
1779 * last thread doesn't terminate. */
1780 if (status == STATUS_SUCCESS)
1782 interlocked_inc( &pool->refcount );
1783 pool->objcount++;
1786 RtlLeaveCriticalSection( &pool->cs );
1788 if (status != STATUS_SUCCESS)
1789 return status;
1791 *out = pool;
1792 return STATUS_SUCCESS;
1795 /***********************************************************************
1796 * tp_threadpool_unlock (internal)
1798 * Releases a lock on a threadpool.
1800 static void tp_threadpool_unlock( struct threadpool *pool )
1802 RtlEnterCriticalSection( &pool->cs );
1803 pool->objcount--;
1804 RtlLeaveCriticalSection( &pool->cs );
1805 tp_threadpool_release( pool );
1808 /***********************************************************************
1809 * tp_group_alloc (internal)
1811 * Allocates a new threadpool group object.
1813 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1815 struct threadpool_group *group;
1817 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1818 if (!group)
1819 return STATUS_NO_MEMORY;
1821 group->refcount = 1;
1822 group->shutdown = FALSE;
1824 RtlInitializeCriticalSection( &group->cs );
1825 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1827 list_init( &group->members );
1829 TRACE( "allocated group %p\n", group );
1831 *out = group;
1832 return STATUS_SUCCESS;
1835 /***********************************************************************
1836 * tp_group_shutdown (internal)
1838 * Marks the group object for shutdown.
1840 static void tp_group_shutdown( struct threadpool_group *group )
1842 group->shutdown = TRUE;
1845 /***********************************************************************
1846 * tp_group_release (internal)
1848 * Releases a reference to a group object.
1850 static BOOL tp_group_release( struct threadpool_group *group )
1852 if (interlocked_dec( &group->refcount ))
1853 return FALSE;
1855 TRACE( "destroying group %p\n", group );
1857 assert( group->shutdown );
1858 assert( list_empty( &group->members ) );
1860 group->cs.DebugInfo->Spare[0] = 0;
1861 RtlDeleteCriticalSection( &group->cs );
1863 RtlFreeHeap( GetProcessHeap(), 0, group );
1864 return TRUE;
1867 /***********************************************************************
1868 * tp_object_initialize (internal)
1870 * Initializes members of a threadpool object.
1872 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
1873 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
1875 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
1877 object->refcount = 1;
1878 object->shutdown = FALSE;
1880 object->pool = pool;
1881 object->group = NULL;
1882 object->userdata = userdata;
1883 object->group_cancel_callback = NULL;
1884 object->finalization_callback = NULL;
1885 object->may_run_long = 0;
1886 object->race_dll = NULL;
1887 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
1889 memset( &object->group_entry, 0, sizeof(object->group_entry) );
1890 object->is_group_member = FALSE;
1892 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
1893 RtlInitializeConditionVariable( &object->finished_event );
1894 RtlInitializeConditionVariable( &object->group_finished_event );
1895 object->num_pending_callbacks = 0;
1896 object->num_running_callbacks = 0;
1897 object->num_associated_callbacks = 0;
1899 if (environment)
1901 if (environment->Version != 1 && environment->Version != 3)
1902 FIXME( "unsupported environment version %u\n", environment->Version );
1904 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
1905 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
1906 object->finalization_callback = environment->FinalizationCallback;
1907 object->may_run_long = environment->u.s.LongFunction != 0;
1908 object->race_dll = environment->RaceDll;
1909 if (environment->Version == 3)
1911 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1913 object->priority = environment_v3->CallbackPriority;
1914 assert( object->priority < ARRAY_SIZE(pool->pools) );
1917 if (environment->ActivationContext)
1918 FIXME( "activation context not supported yet\n" );
1920 if (environment->u.s.Persistent)
1921 FIXME( "persistent threads not supported yet\n" );
1924 if (object->race_dll)
1925 LdrAddRefDll( 0, object->race_dll );
1927 TRACE( "allocated object %p of type %u\n", object, object->type );
1929 /* For simple callbacks we have to run tp_object_submit before adding this object
1930 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1931 * will be set, and tp_object_submit would fail with an assertion. */
1933 if (is_simple_callback)
1934 tp_object_submit( object, FALSE );
1936 if (object->group)
1938 struct threadpool_group *group = object->group;
1939 interlocked_inc( &group->refcount );
1941 RtlEnterCriticalSection( &group->cs );
1942 list_add_tail( &group->members, &object->group_entry );
1943 object->is_group_member = TRUE;
1944 RtlLeaveCriticalSection( &group->cs );
1947 if (is_simple_callback)
1948 tp_object_release( object );
1951 static void tp_object_prio_queue( struct threadpool_object *object )
1953 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
1956 /***********************************************************************
1957 * tp_object_submit (internal)
1959 * Submits a threadpool object to the associated threadpool. This
1960 * function has to be VOID because TpPostWork can never fail on Windows.
1962 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
1964 struct threadpool *pool = object->pool;
1965 NTSTATUS status = STATUS_UNSUCCESSFUL;
1967 assert( !object->shutdown );
1968 assert( !pool->shutdown );
1970 RtlEnterCriticalSection( &pool->cs );
1972 /* Start new worker threads if required. */
1973 if (pool->num_busy_workers >= pool->num_workers &&
1974 pool->num_workers < pool->max_workers)
1975 status = tp_new_worker_thread( pool );
1977 /* Queue work item and increment refcount. */
1978 interlocked_inc( &object->refcount );
1979 if (!object->num_pending_callbacks++)
1980 tp_object_prio_queue( object );
1982 /* Count how often the object was signaled. */
1983 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
1984 object->u.wait.signaled++;
1986 /* No new thread started - wake up one existing thread. */
1987 if (status != STATUS_SUCCESS)
1989 assert( pool->num_workers > 0 );
1990 RtlWakeConditionVariable( &pool->update_event );
1993 RtlLeaveCriticalSection( &pool->cs );
1996 /***********************************************************************
1997 * tp_object_cancel (internal)
1999 * Cancels all currently pending callbacks for a specific object.
2001 static void tp_object_cancel( struct threadpool_object *object )
2003 struct threadpool *pool = object->pool;
2004 LONG pending_callbacks = 0;
2006 RtlEnterCriticalSection( &pool->cs );
2007 if (object->num_pending_callbacks)
2009 pending_callbacks = object->num_pending_callbacks;
2010 object->num_pending_callbacks = 0;
2011 list_remove( &object->pool_entry );
2013 if (object->type == TP_OBJECT_TYPE_WAIT)
2014 object->u.wait.signaled = 0;
2016 RtlLeaveCriticalSection( &pool->cs );
2018 while (pending_callbacks--)
2019 tp_object_release( object );
2022 /***********************************************************************
2023 * tp_object_wait (internal)
2025 * Waits until all pending and running callbacks of a specific object
2026 * have been processed.
2028 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2030 struct threadpool *pool = object->pool;
2032 RtlEnterCriticalSection( &pool->cs );
2033 if (group_wait)
2035 while (object->num_pending_callbacks || object->num_running_callbacks)
2036 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2038 else
2040 while (object->num_pending_callbacks || object->num_associated_callbacks)
2041 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2043 RtlLeaveCriticalSection( &pool->cs );
2046 /***********************************************************************
2047 * tp_object_prepare_shutdown (internal)
2049 * Prepares a threadpool object for shutdown.
2051 static void tp_object_prepare_shutdown( struct threadpool_object *object )
2053 if (object->type == TP_OBJECT_TYPE_TIMER)
2054 tp_timerqueue_unlock( object );
2055 else if (object->type == TP_OBJECT_TYPE_WAIT)
2056 tp_waitqueue_unlock( object );
2059 /***********************************************************************
2060 * tp_object_release (internal)
2062 * Releases a reference to a threadpool object.
2064 static BOOL tp_object_release( struct threadpool_object *object )
2066 if (interlocked_dec( &object->refcount ))
2067 return FALSE;
2069 TRACE( "destroying object %p of type %u\n", object, object->type );
2071 assert( object->shutdown );
2072 assert( !object->num_pending_callbacks );
2073 assert( !object->num_running_callbacks );
2074 assert( !object->num_associated_callbacks );
2076 /* release reference to the group */
2077 if (object->group)
2079 struct threadpool_group *group = object->group;
2081 RtlEnterCriticalSection( &group->cs );
2082 if (object->is_group_member)
2084 list_remove( &object->group_entry );
2085 object->is_group_member = FALSE;
2087 RtlLeaveCriticalSection( &group->cs );
2089 tp_group_release( group );
2092 tp_threadpool_unlock( object->pool );
2094 if (object->race_dll)
2095 LdrUnloadDll( object->race_dll );
2097 RtlFreeHeap( GetProcessHeap(), 0, object );
2098 return TRUE;
2101 static struct list *threadpool_get_next_item( const struct threadpool *pool )
2103 struct list *ptr;
2104 unsigned int i;
2106 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2108 if ((ptr = list_head( &pool->pools[i] )))
2109 break;
2112 return ptr;
2115 /***********************************************************************
2116 * threadpool_worker_proc (internal)
2118 static void CALLBACK threadpool_worker_proc( void *param )
2120 TP_CALLBACK_INSTANCE *callback_instance;
2121 struct threadpool_instance instance;
2122 struct threadpool *pool = param;
2123 TP_WAIT_RESULT wait_result = 0;
2124 LARGE_INTEGER timeout;
2125 struct list *ptr;
2126 NTSTATUS status;
2128 TRACE( "starting worker thread for pool %p\n", pool );
2130 RtlEnterCriticalSection( &pool->cs );
2131 pool->num_busy_workers--;
2132 for (;;)
2134 while ((ptr = threadpool_get_next_item( pool )))
2136 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2137 assert( object->num_pending_callbacks > 0 );
2139 /* If further pending callbacks are queued, move the work item to
2140 * the end of the pool list. Otherwise remove it from the pool. */
2141 list_remove( &object->pool_entry );
2142 if (--object->num_pending_callbacks)
2143 tp_object_prio_queue( object );
2145 /* For wait objects check if they were signaled or have timed out. */
2146 if (object->type == TP_OBJECT_TYPE_WAIT)
2148 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2149 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2152 /* Leave critical section and do the actual callback. */
2153 object->num_associated_callbacks++;
2154 object->num_running_callbacks++;
2155 pool->num_busy_workers++;
2156 RtlLeaveCriticalSection( &pool->cs );
2158 /* Initialize threadpool instance struct. */
2159 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2160 instance.object = object;
2161 instance.threadid = GetCurrentThreadId();
2162 instance.associated = TRUE;
2163 instance.may_run_long = object->may_run_long;
2164 instance.cleanup.critical_section = NULL;
2165 instance.cleanup.mutex = NULL;
2166 instance.cleanup.semaphore = NULL;
2167 instance.cleanup.semaphore_count = 0;
2168 instance.cleanup.event = NULL;
2169 instance.cleanup.library = NULL;
2171 switch (object->type)
2173 case TP_OBJECT_TYPE_SIMPLE:
2175 TRACE( "executing simple callback %p(%p, %p)\n",
2176 object->u.simple.callback, callback_instance, object->userdata );
2177 object->u.simple.callback( callback_instance, object->userdata );
2178 TRACE( "callback %p returned\n", object->u.simple.callback );
2179 break;
2182 case TP_OBJECT_TYPE_WORK:
2184 TRACE( "executing work callback %p(%p, %p, %p)\n",
2185 object->u.work.callback, callback_instance, object->userdata, object );
2186 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2187 TRACE( "callback %p returned\n", object->u.work.callback );
2188 break;
2191 case TP_OBJECT_TYPE_TIMER:
2193 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2194 object->u.timer.callback, callback_instance, object->userdata, object );
2195 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2196 TRACE( "callback %p returned\n", object->u.timer.callback );
2197 break;
2200 case TP_OBJECT_TYPE_WAIT:
2202 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2203 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2204 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2205 TRACE( "callback %p returned\n", object->u.wait.callback );
2206 break;
2209 default:
2210 assert(0);
2211 break;
2214 /* Execute finalization callback. */
2215 if (object->finalization_callback)
2217 TRACE( "executing finalization callback %p(%p, %p)\n",
2218 object->finalization_callback, callback_instance, object->userdata );
2219 object->finalization_callback( callback_instance, object->userdata );
2220 TRACE( "callback %p returned\n", object->finalization_callback );
2223 /* Execute cleanup tasks. */
2224 if (instance.cleanup.critical_section)
2226 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2228 if (instance.cleanup.mutex)
2230 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2231 if (status != STATUS_SUCCESS) goto skip_cleanup;
2233 if (instance.cleanup.semaphore)
2235 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2236 if (status != STATUS_SUCCESS) goto skip_cleanup;
2238 if (instance.cleanup.event)
2240 status = NtSetEvent( instance.cleanup.event, NULL );
2241 if (status != STATUS_SUCCESS) goto skip_cleanup;
2243 if (instance.cleanup.library)
2245 LdrUnloadDll( instance.cleanup.library );
2248 skip_cleanup:
2249 RtlEnterCriticalSection( &pool->cs );
2250 pool->num_busy_workers--;
2252 /* Simple callbacks are automatically shutdown after execution. */
2253 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2255 tp_object_prepare_shutdown( object );
2256 object->shutdown = TRUE;
2259 object->num_running_callbacks--;
2260 if (!object->num_pending_callbacks && !object->num_running_callbacks)
2261 RtlWakeAllConditionVariable( &object->group_finished_event );
2263 if (instance.associated)
2265 object->num_associated_callbacks--;
2266 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2267 RtlWakeAllConditionVariable( &object->finished_event );
2270 tp_object_release( object );
2273 /* Shutdown worker thread if requested. */
2274 if (pool->shutdown)
2275 break;
2277 /* Wait for new tasks or until the timeout expires. A thread only terminates
2278 * when no new tasks are available, and the number of threads can be
2279 * decreased without violating the min_workers limit. An exception is when
2280 * min_workers == 0, then objcount is used to detect if the last thread
2281 * can be terminated. */
2282 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2283 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2284 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2285 (!pool->min_workers && !pool->objcount)))
2287 break;
2290 pool->num_workers--;
2291 RtlLeaveCriticalSection( &pool->cs );
2293 TRACE( "terminating worker thread for pool %p\n", pool );
2294 tp_threadpool_release( pool );
2295 RtlExitUserThread( 0 );
2298 /***********************************************************************
2299 * TpAllocCleanupGroup (NTDLL.@)
2301 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2303 TRACE( "%p\n", out );
2305 return tp_group_alloc( (struct threadpool_group **)out );
2308 /***********************************************************************
2309 * TpAllocPool (NTDLL.@)
2311 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2313 TRACE( "%p %p\n", out, reserved );
2315 if (reserved)
2316 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2318 return tp_threadpool_alloc( (struct threadpool **)out );
2321 /***********************************************************************
2322 * TpAllocTimer (NTDLL.@)
2324 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2325 TP_CALLBACK_ENVIRON *environment )
2327 struct threadpool_object *object;
2328 struct threadpool *pool;
2329 NTSTATUS status;
2331 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2333 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2334 if (!object)
2335 return STATUS_NO_MEMORY;
2337 status = tp_threadpool_lock( &pool, environment );
2338 if (status)
2340 RtlFreeHeap( GetProcessHeap(), 0, object );
2341 return status;
2344 object->type = TP_OBJECT_TYPE_TIMER;
2345 object->u.timer.callback = callback;
2347 status = tp_timerqueue_lock( object );
2348 if (status)
2350 tp_threadpool_unlock( pool );
2351 RtlFreeHeap( GetProcessHeap(), 0, object );
2352 return status;
2355 tp_object_initialize( object, pool, userdata, environment );
2357 *out = (TP_TIMER *)object;
2358 return STATUS_SUCCESS;
2361 /***********************************************************************
2362 * TpAllocWait (NTDLL.@)
2364 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2365 TP_CALLBACK_ENVIRON *environment )
2367 struct threadpool_object *object;
2368 struct threadpool *pool;
2369 NTSTATUS status;
2371 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2373 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2374 if (!object)
2375 return STATUS_NO_MEMORY;
2377 status = tp_threadpool_lock( &pool, environment );
2378 if (status)
2380 RtlFreeHeap( GetProcessHeap(), 0, object );
2381 return status;
2384 object->type = TP_OBJECT_TYPE_WAIT;
2385 object->u.wait.callback = callback;
2387 status = tp_waitqueue_lock( object );
2388 if (status)
2390 tp_threadpool_unlock( pool );
2391 RtlFreeHeap( GetProcessHeap(), 0, object );
2392 return status;
2395 tp_object_initialize( object, pool, userdata, environment );
2397 *out = (TP_WAIT *)object;
2398 return STATUS_SUCCESS;
2401 /***********************************************************************
2402 * TpAllocWork (NTDLL.@)
2404 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2405 TP_CALLBACK_ENVIRON *environment )
2407 struct threadpool_object *object;
2408 struct threadpool *pool;
2409 NTSTATUS status;
2411 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2413 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2414 if (!object)
2415 return STATUS_NO_MEMORY;
2417 status = tp_threadpool_lock( &pool, environment );
2418 if (status)
2420 RtlFreeHeap( GetProcessHeap(), 0, object );
2421 return status;
2424 object->type = TP_OBJECT_TYPE_WORK;
2425 object->u.work.callback = callback;
2426 tp_object_initialize( object, pool, userdata, environment );
2428 *out = (TP_WORK *)object;
2429 return STATUS_SUCCESS;
2432 /***********************************************************************
2433 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2435 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2437 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2439 TRACE( "%p %p\n", instance, crit );
2441 if (!this->cleanup.critical_section)
2442 this->cleanup.critical_section = crit;
2445 /***********************************************************************
2446 * TpCallbackMayRunLong (NTDLL.@)
2448 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2450 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2451 struct threadpool_object *object = this->object;
2452 struct threadpool *pool;
2453 NTSTATUS status = STATUS_SUCCESS;
2455 TRACE( "%p\n", instance );
2457 if (this->threadid != GetCurrentThreadId())
2459 ERR("called from wrong thread, ignoring\n");
2460 return STATUS_UNSUCCESSFUL; /* FIXME */
2463 if (this->may_run_long)
2464 return STATUS_SUCCESS;
2466 pool = object->pool;
2467 RtlEnterCriticalSection( &pool->cs );
2469 /* Start new worker threads if required. */
2470 if (pool->num_busy_workers >= pool->num_workers)
2472 if (pool->num_workers < pool->max_workers)
2474 status = tp_new_worker_thread( pool );
2476 else
2478 status = STATUS_TOO_MANY_THREADS;
2482 RtlLeaveCriticalSection( &pool->cs );
2483 this->may_run_long = TRUE;
2484 return status;
2487 /***********************************************************************
2488 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2490 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2492 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2494 TRACE( "%p %p\n", instance, mutex );
2496 if (!this->cleanup.mutex)
2497 this->cleanup.mutex = mutex;
2500 /***********************************************************************
2501 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2503 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2505 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2507 TRACE( "%p %p %u\n", instance, semaphore, count );
2509 if (!this->cleanup.semaphore)
2511 this->cleanup.semaphore = semaphore;
2512 this->cleanup.semaphore_count = count;
2516 /***********************************************************************
2517 * TpCallbackSetEventOnCompletion (NTDLL.@)
2519 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2521 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2523 TRACE( "%p %p\n", instance, event );
2525 if (!this->cleanup.event)
2526 this->cleanup.event = event;
2529 /***********************************************************************
2530 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2532 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2534 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2536 TRACE( "%p %p\n", instance, module );
2538 if (!this->cleanup.library)
2539 this->cleanup.library = module;
2542 /***********************************************************************
2543 * TpDisassociateCallback (NTDLL.@)
2545 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2547 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2548 struct threadpool_object *object = this->object;
2549 struct threadpool *pool;
2551 TRACE( "%p\n", instance );
2553 if (this->threadid != GetCurrentThreadId())
2555 ERR("called from wrong thread, ignoring\n");
2556 return;
2559 if (!this->associated)
2560 return;
2562 pool = object->pool;
2563 RtlEnterCriticalSection( &pool->cs );
2565 object->num_associated_callbacks--;
2566 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2567 RtlWakeAllConditionVariable( &object->finished_event );
2569 RtlLeaveCriticalSection( &pool->cs );
2570 this->associated = FALSE;
2573 /***********************************************************************
2574 * TpIsTimerSet (NTDLL.@)
2576 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2578 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2580 TRACE( "%p\n", timer );
2582 return this->u.timer.timer_set;
2585 /***********************************************************************
2586 * TpPostWork (NTDLL.@)
2588 VOID WINAPI TpPostWork( TP_WORK *work )
2590 struct threadpool_object *this = impl_from_TP_WORK( work );
2592 TRACE( "%p\n", work );
2594 tp_object_submit( this, FALSE );
2597 /***********************************************************************
2598 * TpReleaseCleanupGroup (NTDLL.@)
2600 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2602 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2604 TRACE( "%p\n", group );
2606 tp_group_shutdown( this );
2607 tp_group_release( this );
2610 /***********************************************************************
2611 * TpReleaseCleanupGroupMembers (NTDLL.@)
2613 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2615 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2616 struct threadpool_object *object, *next;
2617 struct list members;
2619 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2621 RtlEnterCriticalSection( &this->cs );
2623 /* Unset group, increase references, and mark objects for shutdown */
2624 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2626 assert( object->group == this );
2627 assert( object->is_group_member );
2629 if (interlocked_inc( &object->refcount ) == 1)
2631 /* Object is basically already destroyed, but group reference
2632 * was not deleted yet. We can safely ignore this object. */
2633 interlocked_dec( &object->refcount );
2634 list_remove( &object->group_entry );
2635 object->is_group_member = FALSE;
2636 continue;
2639 object->is_group_member = FALSE;
2640 tp_object_prepare_shutdown( object );
2643 /* Move members to a new temporary list */
2644 list_init( &members );
2645 list_move_tail( &members, &this->members );
2647 RtlLeaveCriticalSection( &this->cs );
2649 /* Cancel pending callbacks if requested */
2650 if (cancel_pending)
2652 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2654 tp_object_cancel( object );
2658 /* Wait for remaining callbacks to finish */
2659 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2661 tp_object_wait( object, TRUE );
2663 if (!object->shutdown)
2665 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2666 if (cancel_pending && object->group_cancel_callback)
2668 TRACE( "executing group cancel callback %p(%p, %p)\n",
2669 object->group_cancel_callback, object->userdata, userdata );
2670 object->group_cancel_callback( object->userdata, userdata );
2671 TRACE( "callback %p returned\n", object->group_cancel_callback );
2674 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2675 tp_object_release( object );
2678 object->shutdown = TRUE;
2679 tp_object_release( object );
2683 /***********************************************************************
2684 * TpReleasePool (NTDLL.@)
2686 VOID WINAPI TpReleasePool( TP_POOL *pool )
2688 struct threadpool *this = impl_from_TP_POOL( pool );
2690 TRACE( "%p\n", pool );
2692 tp_threadpool_shutdown( this );
2693 tp_threadpool_release( this );
2696 /***********************************************************************
2697 * TpReleaseTimer (NTDLL.@)
2699 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2701 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2703 TRACE( "%p\n", timer );
2705 tp_object_prepare_shutdown( this );
2706 this->shutdown = TRUE;
2707 tp_object_release( this );
2710 /***********************************************************************
2711 * TpReleaseWait (NTDLL.@)
2713 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
2715 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2717 TRACE( "%p\n", wait );
2719 tp_object_prepare_shutdown( this );
2720 this->shutdown = TRUE;
2721 tp_object_release( this );
2724 /***********************************************************************
2725 * TpReleaseWork (NTDLL.@)
2727 VOID WINAPI TpReleaseWork( TP_WORK *work )
2729 struct threadpool_object *this = impl_from_TP_WORK( work );
2731 TRACE( "%p\n", work );
2733 tp_object_prepare_shutdown( this );
2734 this->shutdown = TRUE;
2735 tp_object_release( this );
2738 /***********************************************************************
2739 * TpSetPoolMaxThreads (NTDLL.@)
2741 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
2743 struct threadpool *this = impl_from_TP_POOL( pool );
2745 TRACE( "%p %u\n", pool, maximum );
2747 RtlEnterCriticalSection( &this->cs );
2748 this->max_workers = max( maximum, 1 );
2749 this->min_workers = min( this->min_workers, this->max_workers );
2750 RtlLeaveCriticalSection( &this->cs );
2753 /***********************************************************************
2754 * TpSetPoolMinThreads (NTDLL.@)
2756 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
2758 struct threadpool *this = impl_from_TP_POOL( pool );
2759 NTSTATUS status = STATUS_SUCCESS;
2761 TRACE( "%p %u\n", pool, minimum );
2763 RtlEnterCriticalSection( &this->cs );
2765 while (this->num_workers < minimum)
2767 status = tp_new_worker_thread( this );
2768 if (status != STATUS_SUCCESS)
2769 break;
2772 if (status == STATUS_SUCCESS)
2774 this->min_workers = minimum;
2775 this->max_workers = max( this->min_workers, this->max_workers );
2778 RtlLeaveCriticalSection( &this->cs );
2779 return !status;
2782 /***********************************************************************
2783 * TpSetTimer (NTDLL.@)
2785 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
2787 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2788 struct threadpool_object *other_timer;
2789 BOOL submit_timer = FALSE;
2790 ULONGLONG timestamp;
2792 TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
2794 RtlEnterCriticalSection( &timerqueue.cs );
2796 assert( this->u.timer.timer_initialized );
2797 this->u.timer.timer_set = timeout != NULL;
2799 /* Convert relative timeout to absolute timestamp and handle a timeout
2800 * of zero, which means that the timer is submitted immediately. */
2801 if (timeout)
2803 timestamp = timeout->QuadPart;
2804 if ((LONGLONG)timestamp < 0)
2806 LARGE_INTEGER now;
2807 NtQuerySystemTime( &now );
2808 timestamp = now.QuadPart - timestamp;
2810 else if (!timestamp)
2812 if (!period)
2813 timeout = NULL;
2814 else
2816 LARGE_INTEGER now;
2817 NtQuerySystemTime( &now );
2818 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
2820 submit_timer = TRUE;
2824 /* First remove existing timeout. */
2825 if (this->u.timer.timer_pending)
2827 list_remove( &this->u.timer.timer_entry );
2828 this->u.timer.timer_pending = FALSE;
2831 /* If the timer was enabled, then add it back to the queue. */
2832 if (timeout)
2834 this->u.timer.timeout = timestamp;
2835 this->u.timer.period = period;
2836 this->u.timer.window_length = window_length;
2838 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
2839 struct threadpool_object, u.timer.timer_entry )
2841 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
2842 if (this->u.timer.timeout < other_timer->u.timer.timeout)
2843 break;
2845 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
2847 /* Wake up the timer thread when the timeout has to be updated. */
2848 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
2849 RtlWakeAllConditionVariable( &timerqueue.update_event );
2851 this->u.timer.timer_pending = TRUE;
2854 RtlLeaveCriticalSection( &timerqueue.cs );
2856 if (submit_timer)
2857 tp_object_submit( this, FALSE );
2860 /***********************************************************************
2861 * TpSetWait (NTDLL.@)
2863 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
2865 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2866 ULONGLONG timestamp = TIMEOUT_INFINITE;
2867 BOOL submit_wait = FALSE;
2869 TRACE( "%p %p %p\n", wait, handle, timeout );
2871 RtlEnterCriticalSection( &waitqueue.cs );
2873 assert( this->u.wait.bucket );
2874 this->u.wait.handle = handle;
2876 if (handle || this->u.wait.wait_pending)
2878 struct waitqueue_bucket *bucket = this->u.wait.bucket;
2879 list_remove( &this->u.wait.wait_entry );
2881 /* Convert relative timeout to absolute timestamp. */
2882 if (handle && timeout)
2884 timestamp = timeout->QuadPart;
2885 if ((LONGLONG)timestamp < 0)
2887 LARGE_INTEGER now;
2888 NtQuerySystemTime( &now );
2889 timestamp = now.QuadPart - timestamp;
2891 else if (!timestamp)
2893 submit_wait = TRUE;
2894 handle = NULL;
2898 /* Add wait object back into one of the queues. */
2899 if (handle)
2901 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
2902 this->u.wait.wait_pending = TRUE;
2903 this->u.wait.timeout = timestamp;
2905 else
2907 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
2908 this->u.wait.wait_pending = FALSE;
2911 /* Wake up the wait queue thread. */
2912 NtSetEvent( bucket->update_event, NULL );
2915 RtlLeaveCriticalSection( &waitqueue.cs );
2917 if (submit_wait)
2918 tp_object_submit( this, FALSE );
2921 /***********************************************************************
2922 * TpSimpleTryPost (NTDLL.@)
2924 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
2925 TP_CALLBACK_ENVIRON *environment )
2927 struct threadpool_object *object;
2928 struct threadpool *pool;
2929 NTSTATUS status;
2931 TRACE( "%p %p %p\n", callback, userdata, environment );
2933 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2934 if (!object)
2935 return STATUS_NO_MEMORY;
2937 status = tp_threadpool_lock( &pool, environment );
2938 if (status)
2940 RtlFreeHeap( GetProcessHeap(), 0, object );
2941 return status;
2944 object->type = TP_OBJECT_TYPE_SIMPLE;
2945 object->u.simple.callback = callback;
2946 tp_object_initialize( object, pool, userdata, environment );
2948 return STATUS_SUCCESS;
2951 /***********************************************************************
2952 * TpWaitForTimer (NTDLL.@)
2954 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
2956 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2958 TRACE( "%p %d\n", timer, cancel_pending );
2960 if (cancel_pending)
2961 tp_object_cancel( this );
2962 tp_object_wait( this, FALSE );
2965 /***********************************************************************
2966 * TpWaitForWait (NTDLL.@)
2968 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
2970 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2972 TRACE( "%p %d\n", wait, cancel_pending );
2974 if (cancel_pending)
2975 tp_object_cancel( this );
2976 tp_object_wait( this, FALSE );
2979 /***********************************************************************
2980 * TpWaitForWork (NTDLL.@)
2982 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
2984 struct threadpool_object *this = impl_from_TP_WORK( work );
2986 TRACE( "%p %u\n", work, cancel_pending );
2988 if (cancel_pending)
2989 tp_object_cancel( this );
2990 tp_object_wait( this, FALSE );