wined3d: Allow to apply extension emulation wrappers independently.
[wine.git] / dlls / ntdll / threadpool.c
blobad4951fc557229f8427bc9a8bf2b353ce6204aff
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2015 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include "config.h"
23 #include "wine/port.h"
25 #include <assert.h>
26 #include <stdarg.h>
27 #include <limits.h>
29 #define NONAMELESSUNION
30 #include "ntstatus.h"
31 #define WIN32_NO_STATUS
32 #include "winternl.h"
34 #include "wine/debug.h"
35 #include "wine/list.h"
37 #include "ntdll_misc.h"
39 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
42 * Old thread pooling API
45 struct rtl_work_item
47 PRTL_WORK_ITEM_ROUTINE function;
48 PVOID context;
51 #define EXPIRE_NEVER (~(ULONGLONG)0)
52 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
54 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
56 static struct
58 HANDLE compl_port;
59 RTL_CRITICAL_SECTION threadpool_compl_cs;
61 old_threadpool =
63 NULL, /* compl_port */
64 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
67 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
69 0, 0, &old_threadpool.threadpool_compl_cs,
70 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
71 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
74 struct wait_work_item
76 HANDLE Object;
77 HANDLE CancelEvent;
78 WAITORTIMERCALLBACK Callback;
79 PVOID Context;
80 ULONG Milliseconds;
81 ULONG Flags;
82 HANDLE CompletionEvent;
83 LONG DeleteCount;
84 BOOLEAN CallbackInProgress;
87 struct timer_queue;
88 struct queue_timer
90 struct timer_queue *q;
91 struct list entry;
92 ULONG runcount; /* number of callbacks pending execution */
93 RTL_WAITORTIMERCALLBACKFUNC callback;
94 PVOID param;
95 DWORD period;
96 ULONG flags;
97 ULONGLONG expire;
98 BOOL destroy; /* timer should be deleted; once set, never unset */
99 HANDLE event; /* removal event */
102 struct timer_queue
104 DWORD magic;
105 RTL_CRITICAL_SECTION cs;
106 struct list timers; /* sorted by expiration time */
107 BOOL quit; /* queue should be deleted; once set, never unset */
108 HANDLE event;
109 HANDLE thread;
113 * Object-oriented thread pooling API
116 #define THREADPOOL_WORKER_TIMEOUT 5000
117 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
119 /* internal threadpool representation */
120 struct threadpool
122 LONG refcount;
123 LONG objcount;
124 BOOL shutdown;
125 CRITICAL_SECTION cs;
126 /* pool of work items, locked via .cs */
127 struct list pool;
128 RTL_CONDITION_VARIABLE update_event;
129 /* information about worker threads, locked via .cs */
130 int max_workers;
131 int min_workers;
132 int num_workers;
133 int num_busy_workers;
136 enum threadpool_objtype
138 TP_OBJECT_TYPE_SIMPLE,
139 TP_OBJECT_TYPE_WORK,
140 TP_OBJECT_TYPE_TIMER,
141 TP_OBJECT_TYPE_WAIT
144 /* internal threadpool object representation */
145 struct threadpool_object
147 LONG refcount;
148 BOOL shutdown;
149 /* read-only information */
150 enum threadpool_objtype type;
151 struct threadpool *pool;
152 struct threadpool_group *group;
153 PVOID userdata;
154 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
155 PTP_SIMPLE_CALLBACK finalization_callback;
156 BOOL may_run_long;
157 HMODULE race_dll;
158 /* information about the group, locked via .group->cs */
159 struct list group_entry;
160 BOOL is_group_member;
161 /* information about the pool, locked via .pool->cs */
162 struct list pool_entry;
163 RTL_CONDITION_VARIABLE finished_event;
164 RTL_CONDITION_VARIABLE group_finished_event;
165 LONG num_pending_callbacks;
166 LONG num_running_callbacks;
167 LONG num_associated_callbacks;
168 /* arguments for callback */
169 union
171 struct
173 PTP_SIMPLE_CALLBACK callback;
174 } simple;
175 struct
177 PTP_WORK_CALLBACK callback;
178 } work;
179 struct
181 PTP_TIMER_CALLBACK callback;
182 /* information about the timer, locked via timerqueue.cs */
183 BOOL timer_initialized;
184 BOOL timer_pending;
185 struct list timer_entry;
186 BOOL timer_set;
187 ULONGLONG timeout;
188 LONG period;
189 LONG window_length;
190 } timer;
191 struct
193 PTP_WAIT_CALLBACK callback;
194 LONG signaled;
195 /* information about the wait object, locked via waitqueue.cs */
196 struct waitqueue_bucket *bucket;
197 BOOL wait_pending;
198 struct list wait_entry;
199 ULONGLONG timeout;
200 HANDLE handle;
201 } wait;
202 } u;
205 /* internal threadpool instance representation */
206 struct threadpool_instance
208 struct threadpool_object *object;
209 DWORD threadid;
210 BOOL associated;
211 BOOL may_run_long;
212 struct
214 CRITICAL_SECTION *critical_section;
215 HANDLE mutex;
216 HANDLE semaphore;
217 LONG semaphore_count;
218 HANDLE event;
219 HMODULE library;
220 } cleanup;
223 /* internal threadpool group representation */
224 struct threadpool_group
226 LONG refcount;
227 BOOL shutdown;
228 CRITICAL_SECTION cs;
229 /* list of group members, locked via .cs */
230 struct list members;
233 /* global timerqueue object */
234 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
236 static struct
238 CRITICAL_SECTION cs;
239 LONG objcount;
240 BOOL thread_running;
241 struct list pending_timers;
242 RTL_CONDITION_VARIABLE update_event;
244 timerqueue =
246 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
247 0, /* objcount */
248 FALSE, /* thread_running */
249 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
250 RTL_CONDITION_VARIABLE_INIT /* update_event */
253 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
255 0, 0, &timerqueue.cs,
256 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
257 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
260 /* global waitqueue object */
261 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
263 static struct
265 CRITICAL_SECTION cs;
266 LONG num_buckets;
267 struct list buckets;
269 waitqueue =
271 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
272 0, /* num_buckets */
273 LIST_INIT( waitqueue.buckets ) /* buckets */
276 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
278 0, 0, &waitqueue.cs,
279 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
280 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
283 struct waitqueue_bucket
285 struct list bucket_entry;
286 LONG objcount;
287 struct list reserved;
288 struct list waiting;
289 HANDLE update_event;
292 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
294 return (struct threadpool *)pool;
297 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
299 struct threadpool_object *object = (struct threadpool_object *)work;
300 assert( object->type == TP_OBJECT_TYPE_WORK );
301 return object;
304 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
306 struct threadpool_object *object = (struct threadpool_object *)timer;
307 assert( object->type == TP_OBJECT_TYPE_TIMER );
308 return object;
311 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
313 struct threadpool_object *object = (struct threadpool_object *)wait;
314 assert( object->type == TP_OBJECT_TYPE_WAIT );
315 return object;
318 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
320 return (struct threadpool_group *)group;
323 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
325 return (struct threadpool_instance *)instance;
328 static void CALLBACK threadpool_worker_proc( void *param );
329 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
330 static void tp_object_shutdown( struct threadpool_object *object );
331 static BOOL tp_object_release( struct threadpool_object *object );
332 static struct threadpool *default_threadpool = NULL;
334 static inline LONG interlocked_inc( PLONG dest )
336 return interlocked_xchg_add( dest, 1 ) + 1;
339 static inline LONG interlocked_dec( PLONG dest )
341 return interlocked_xchg_add( dest, -1 ) - 1;
344 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
346 struct rtl_work_item *item = userdata;
348 TRACE("executing %p(%p)\n", item->function, item->context);
349 item->function( item->context );
351 RtlFreeHeap( GetProcessHeap(), 0, item );
354 /***********************************************************************
355 * RtlQueueWorkItem (NTDLL.@)
357 * Queues a work item into a thread in the thread pool.
359 * PARAMS
360 * function [I] Work function to execute.
361 * context [I] Context to pass to the work function when it is executed.
362 * flags [I] Flags. See notes.
364 * RETURNS
365 * Success: STATUS_SUCCESS.
366 * Failure: Any NTSTATUS code.
368 * NOTES
369 * Flags can be one or more of the following:
370 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
371 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
372 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
373 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
374 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
376 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
378 TP_CALLBACK_ENVIRON environment;
379 struct rtl_work_item *item;
380 NTSTATUS status;
382 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
383 if (!item)
384 return STATUS_NO_MEMORY;
386 memset( &environment, 0, sizeof(environment) );
387 environment.Version = 1;
388 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
389 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
391 item->function = function;
392 item->context = context;
394 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
395 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
396 return status;
399 /***********************************************************************
400 * iocp_poller - get completion events and run callbacks
402 static DWORD CALLBACK iocp_poller(LPVOID Arg)
404 HANDLE cport = Arg;
406 while( TRUE )
408 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
409 LPVOID overlapped;
410 IO_STATUS_BLOCK iosb;
411 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
412 if (res)
414 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
416 else
418 DWORD transferred = 0;
419 DWORD err = 0;
421 if (iosb.u.Status == STATUS_SUCCESS)
422 transferred = iosb.Information;
423 else
424 err = RtlNtStatusToDosError(iosb.u.Status);
426 callback( err, transferred, overlapped );
429 return 0;
432 /***********************************************************************
433 * RtlSetIoCompletionCallback (NTDLL.@)
435 * Binds a handle to a thread pool's completion port, and possibly
436 * starts a non-I/O thread to monitor this port and call functions back.
438 * PARAMS
439 * FileHandle [I] Handle to bind to a completion port.
440 * Function [I] Callback function to call on I/O completions.
441 * Flags [I] Not used.
443 * RETURNS
444 * Success: STATUS_SUCCESS.
445 * Failure: Any NTSTATUS code.
448 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
450 IO_STATUS_BLOCK iosb;
451 FILE_COMPLETION_INFORMATION info;
453 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
455 if (!old_threadpool.compl_port)
457 NTSTATUS res = STATUS_SUCCESS;
459 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
460 if (!old_threadpool.compl_port)
462 HANDLE cport;
464 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
465 if (!res)
467 /* FIXME native can start additional threads in case of e.g. hung callback function. */
468 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
469 if (!res)
470 old_threadpool.compl_port = cport;
471 else
472 NtClose( cport );
475 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
476 if (res) return res;
479 info.CompletionPort = old_threadpool.compl_port;
480 info.CompletionKey = (ULONG_PTR)Function;
482 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
485 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
487 if (timeout == INFINITE) return NULL;
488 pTime->QuadPart = (ULONGLONG)timeout * -10000;
489 return pTime;
492 static void delete_wait_work_item(struct wait_work_item *wait_work_item)
494 NtClose( wait_work_item->CancelEvent );
495 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
498 static DWORD CALLBACK wait_thread_proc(LPVOID Arg)
500 struct wait_work_item *wait_work_item = Arg;
501 NTSTATUS status;
502 BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0;
503 HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent };
504 LARGE_INTEGER timeout;
505 HANDLE completion_event;
507 TRACE("\n");
509 while (TRUE)
511 status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable,
512 get_nt_timeout( &timeout, wait_work_item->Milliseconds ) );
513 if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT)
515 BOOLEAN TimerOrWaitFired;
517 if (status == STATUS_WAIT_0)
519 TRACE( "object %p signaled, calling callback %p with context %p\n",
520 wait_work_item->Object, wait_work_item->Callback,
521 wait_work_item->Context );
522 TimerOrWaitFired = FALSE;
524 else
526 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
527 wait_work_item->Object, wait_work_item->Callback,
528 wait_work_item->Context );
529 TimerOrWaitFired = TRUE;
531 wait_work_item->CallbackInProgress = TRUE;
532 wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired );
533 wait_work_item->CallbackInProgress = FALSE;
535 if (wait_work_item->Flags & WT_EXECUTEONLYONCE)
536 break;
538 else
539 break;
542 completion_event = wait_work_item->CompletionEvent;
543 if (completion_event) NtSetEvent( completion_event, NULL );
545 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
546 delete_wait_work_item( wait_work_item );
548 return 0;
551 /***********************************************************************
552 * RtlRegisterWait (NTDLL.@)
554 * Registers a wait for a handle to become signaled.
556 * PARAMS
557 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
558 * Object [I] Object to wait to become signaled.
559 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
560 * Context [I] Context to pass to the callback function when it is executed.
561 * Milliseconds [I] Number of milliseconds to wait before timing out.
562 * Flags [I] Flags. See notes.
564 * RETURNS
565 * Success: STATUS_SUCCESS.
566 * Failure: Any NTSTATUS code.
568 * NOTES
569 * Flags can be one or more of the following:
570 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
571 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
572 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
573 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
574 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
576 NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object,
577 RTL_WAITORTIMERCALLBACKFUNC Callback,
578 PVOID Context, ULONG Milliseconds, ULONG Flags)
580 struct wait_work_item *wait_work_item;
581 NTSTATUS status;
583 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags );
585 wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) );
586 if (!wait_work_item)
587 return STATUS_NO_MEMORY;
589 wait_work_item->Object = Object;
590 wait_work_item->Callback = Callback;
591 wait_work_item->Context = Context;
592 wait_work_item->Milliseconds = Milliseconds;
593 wait_work_item->Flags = Flags;
594 wait_work_item->CallbackInProgress = FALSE;
595 wait_work_item->DeleteCount = 0;
596 wait_work_item->CompletionEvent = NULL;
598 status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
599 if (status != STATUS_SUCCESS)
601 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
602 return status;
605 Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD |
606 WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION);
607 status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags );
608 if (status != STATUS_SUCCESS)
610 delete_wait_work_item( wait_work_item );
611 return status;
614 *NewWaitObject = wait_work_item;
615 return status;
618 /***********************************************************************
619 * RtlDeregisterWaitEx (NTDLL.@)
621 * Cancels a wait operation and frees the resources associated with calling
622 * RtlRegisterWait().
624 * PARAMS
625 * WaitObject [I] Handle to the wait object to free.
627 * RETURNS
628 * Success: STATUS_SUCCESS.
629 * Failure: Any NTSTATUS code.
631 NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent)
633 struct wait_work_item *wait_work_item = WaitHandle;
634 NTSTATUS status = STATUS_SUCCESS;
636 TRACE( "(%p)\n", WaitHandle );
638 NtSetEvent( wait_work_item->CancelEvent, NULL );
639 if (wait_work_item->CallbackInProgress)
641 if (CompletionEvent != NULL)
643 if (CompletionEvent == INVALID_HANDLE_VALUE)
645 status = NtCreateEvent( &CompletionEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
646 if (status != STATUS_SUCCESS)
647 return status;
648 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
649 if (wait_work_item->CallbackInProgress)
650 NtWaitForSingleObject( CompletionEvent, FALSE, NULL );
651 NtClose( CompletionEvent );
653 else
655 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
656 if (wait_work_item->CallbackInProgress)
657 status = STATUS_PENDING;
660 else
661 status = STATUS_PENDING;
664 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
666 status = STATUS_SUCCESS;
667 delete_wait_work_item( wait_work_item );
670 return status;
673 /***********************************************************************
674 * RtlDeregisterWait (NTDLL.@)
676 * Cancels a wait operation and frees the resources associated with calling
677 * RtlRegisterWait().
679 * PARAMS
680 * WaitObject [I] Handle to the wait object to free.
682 * RETURNS
683 * Success: STATUS_SUCCESS.
684 * Failure: Any NTSTATUS code.
686 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
688 return RtlDeregisterWaitEx(WaitHandle, NULL);
692 /************************** Timer Queue Impl **************************/
694 static void queue_remove_timer(struct queue_timer *t)
696 /* We MUST hold the queue cs while calling this function. This ensures
697 that we cannot queue another callback for this timer. The runcount
698 being zero makes sure we don't have any already queued. */
699 struct timer_queue *q = t->q;
701 assert(t->runcount == 0);
702 assert(t->destroy);
704 list_remove(&t->entry);
705 if (t->event)
706 NtSetEvent(t->event, NULL);
707 RtlFreeHeap(GetProcessHeap(), 0, t);
709 if (q->quit && list_empty(&q->timers))
710 NtSetEvent(q->event, NULL);
713 static void timer_cleanup_callback(struct queue_timer *t)
715 struct timer_queue *q = t->q;
716 RtlEnterCriticalSection(&q->cs);
718 assert(0 < t->runcount);
719 --t->runcount;
721 if (t->destroy && t->runcount == 0)
722 queue_remove_timer(t);
724 RtlLeaveCriticalSection(&q->cs);
727 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
729 struct queue_timer *t = p;
730 t->callback(t->param, TRUE);
731 timer_cleanup_callback(t);
732 return 0;
735 static inline ULONGLONG queue_current_time(void)
737 LARGE_INTEGER now, freq;
738 NtQueryPerformanceCounter(&now, &freq);
739 return now.QuadPart * 1000 / freq.QuadPart;
742 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
743 BOOL set_event)
745 /* We MUST hold the queue cs while calling this function. */
746 struct timer_queue *q = t->q;
747 struct list *ptr = &q->timers;
749 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
751 if (time != EXPIRE_NEVER)
752 LIST_FOR_EACH(ptr, &q->timers)
754 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
755 if (time < cur->expire)
756 break;
758 list_add_before(ptr, &t->entry);
760 t->expire = time;
762 /* If we insert at the head of the list, we need to expire sooner
763 than expected. */
764 if (set_event && &t->entry == list_head(&q->timers))
765 NtSetEvent(q->event, NULL);
768 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
769 BOOL set_event)
771 /* We MUST hold the queue cs while calling this function. */
772 list_remove(&t->entry);
773 queue_add_timer(t, time, set_event);
776 static void queue_timer_expire(struct timer_queue *q)
778 struct queue_timer *t = NULL;
780 RtlEnterCriticalSection(&q->cs);
781 if (list_head(&q->timers))
783 ULONGLONG now, next;
784 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
785 if (!t->destroy && t->expire <= ((now = queue_current_time())))
787 ++t->runcount;
788 if (t->period)
790 next = t->expire + t->period;
791 /* avoid trigger cascade if overloaded / hibernated */
792 if (next < now)
793 next = now + t->period;
795 else
796 next = EXPIRE_NEVER;
797 queue_move_timer(t, next, FALSE);
799 else
800 t = NULL;
802 RtlLeaveCriticalSection(&q->cs);
804 if (t)
806 if (t->flags & WT_EXECUTEINTIMERTHREAD)
807 timer_callback_wrapper(t);
808 else
810 ULONG flags
811 = (t->flags
812 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
813 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
814 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
815 if (status != STATUS_SUCCESS)
816 timer_cleanup_callback(t);
821 static ULONG queue_get_timeout(struct timer_queue *q)
823 struct queue_timer *t;
824 ULONG timeout = INFINITE;
826 RtlEnterCriticalSection(&q->cs);
827 if (list_head(&q->timers))
829 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
830 assert(!t->destroy || t->expire == EXPIRE_NEVER);
832 if (t->expire != EXPIRE_NEVER)
834 ULONGLONG time = queue_current_time();
835 timeout = t->expire < time ? 0 : t->expire - time;
838 RtlLeaveCriticalSection(&q->cs);
840 return timeout;
843 static void WINAPI timer_queue_thread_proc(LPVOID p)
845 struct timer_queue *q = p;
846 ULONG timeout_ms;
848 timeout_ms = INFINITE;
849 for (;;)
851 LARGE_INTEGER timeout;
852 NTSTATUS status;
853 BOOL done = FALSE;
855 status = NtWaitForSingleObject(
856 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
858 if (status == STATUS_WAIT_0)
860 /* There are two possible ways to trigger the event. Either
861 we are quitting and the last timer got removed, or a new
862 timer got put at the head of the list so we need to adjust
863 our timeout. */
864 RtlEnterCriticalSection(&q->cs);
865 if (q->quit && list_empty(&q->timers))
866 done = TRUE;
867 RtlLeaveCriticalSection(&q->cs);
869 else if (status == STATUS_TIMEOUT)
870 queue_timer_expire(q);
872 if (done)
873 break;
875 timeout_ms = queue_get_timeout(q);
878 NtClose(q->event);
879 RtlDeleteCriticalSection(&q->cs);
880 q->magic = 0;
881 RtlFreeHeap(GetProcessHeap(), 0, q);
882 RtlExitUserThread( 0 );
885 static void queue_destroy_timer(struct queue_timer *t)
887 /* We MUST hold the queue cs while calling this function. */
888 t->destroy = TRUE;
889 if (t->runcount == 0)
890 /* Ensure a timer is promptly removed. If callbacks are pending,
891 it will be removed after the last one finishes by the callback
892 cleanup wrapper. */
893 queue_remove_timer(t);
894 else
895 /* Make sure no destroyed timer masks an active timer at the head
896 of the sorted list. */
897 queue_move_timer(t, EXPIRE_NEVER, FALSE);
900 /***********************************************************************
901 * RtlCreateTimerQueue (NTDLL.@)
903 * Creates a timer queue object and returns a handle to it.
905 * PARAMS
906 * NewTimerQueue [O] The newly created queue.
908 * RETURNS
909 * Success: STATUS_SUCCESS.
910 * Failure: Any NTSTATUS code.
912 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
914 NTSTATUS status;
915 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
916 if (!q)
917 return STATUS_NO_MEMORY;
919 RtlInitializeCriticalSection(&q->cs);
920 list_init(&q->timers);
921 q->quit = FALSE;
922 q->magic = TIMER_QUEUE_MAGIC;
923 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
924 if (status != STATUS_SUCCESS)
926 RtlFreeHeap(GetProcessHeap(), 0, q);
927 return status;
929 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
930 timer_queue_thread_proc, q, &q->thread, NULL);
931 if (status != STATUS_SUCCESS)
933 NtClose(q->event);
934 RtlFreeHeap(GetProcessHeap(), 0, q);
935 return status;
938 *NewTimerQueue = q;
939 return STATUS_SUCCESS;
942 /***********************************************************************
943 * RtlDeleteTimerQueueEx (NTDLL.@)
945 * Deletes a timer queue object.
947 * PARAMS
948 * TimerQueue [I] The timer queue to destroy.
949 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
950 * wait until all timers are finished firing before
951 * returning. Otherwise, return immediately and set the
952 * event when all timers are done.
954 * RETURNS
955 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
956 * Failure: Any NTSTATUS code.
958 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
960 struct timer_queue *q = TimerQueue;
961 struct queue_timer *t, *temp;
962 HANDLE thread;
963 NTSTATUS status;
965 if (!q || q->magic != TIMER_QUEUE_MAGIC)
966 return STATUS_INVALID_HANDLE;
968 thread = q->thread;
970 RtlEnterCriticalSection(&q->cs);
971 q->quit = TRUE;
972 if (list_head(&q->timers))
973 /* When the last timer is removed, it will signal the timer thread to
974 exit... */
975 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
976 queue_destroy_timer(t);
977 else
978 /* However if we have none, we must do it ourselves. */
979 NtSetEvent(q->event, NULL);
980 RtlLeaveCriticalSection(&q->cs);
982 if (CompletionEvent == INVALID_HANDLE_VALUE)
984 NtWaitForSingleObject(thread, FALSE, NULL);
985 status = STATUS_SUCCESS;
987 else
989 if (CompletionEvent)
991 FIXME("asynchronous return on completion event unimplemented\n");
992 NtWaitForSingleObject(thread, FALSE, NULL);
993 NtSetEvent(CompletionEvent, NULL);
995 status = STATUS_PENDING;
998 NtClose(thread);
999 return status;
1002 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
1004 static struct timer_queue *default_timer_queue;
1006 if (TimerQueue)
1007 return TimerQueue;
1008 else
1010 if (!default_timer_queue)
1012 HANDLE q;
1013 NTSTATUS status = RtlCreateTimerQueue(&q);
1014 if (status == STATUS_SUCCESS)
1016 PVOID p = interlocked_cmpxchg_ptr(
1017 (void **) &default_timer_queue, q, NULL);
1018 if (p)
1019 /* Got beat to the punch. */
1020 RtlDeleteTimerQueueEx(q, NULL);
1023 return default_timer_queue;
1027 /***********************************************************************
1028 * RtlCreateTimer (NTDLL.@)
1030 * Creates a new timer associated with the given queue.
1032 * PARAMS
1033 * NewTimer [O] The newly created timer.
1034 * TimerQueue [I] The queue to hold the timer.
1035 * Callback [I] The callback to fire.
1036 * Parameter [I] The argument for the callback.
1037 * DueTime [I] The delay, in milliseconds, before first firing the
1038 * timer.
1039 * Period [I] The period, in milliseconds, at which to fire the timer
1040 * after the first callback. If zero, the timer will only
1041 * fire once. It still needs to be deleted with
1042 * RtlDeleteTimer.
1043 * Flags [I] Flags controlling the execution of the callback. In
1044 * addition to the WT_* thread pool flags (see
1045 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1046 * WT_EXECUTEONLYONCE are supported.
1048 * RETURNS
1049 * Success: STATUS_SUCCESS.
1050 * Failure: Any NTSTATUS code.
1052 NTSTATUS WINAPI RtlCreateTimer(PHANDLE NewTimer, HANDLE TimerQueue,
1053 RTL_WAITORTIMERCALLBACKFUNC Callback,
1054 PVOID Parameter, DWORD DueTime, DWORD Period,
1055 ULONG Flags)
1057 NTSTATUS status;
1058 struct queue_timer *t;
1059 struct timer_queue *q = get_timer_queue(TimerQueue);
1061 if (!q) return STATUS_NO_MEMORY;
1062 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1064 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1065 if (!t)
1066 return STATUS_NO_MEMORY;
1068 t->q = q;
1069 t->runcount = 0;
1070 t->callback = Callback;
1071 t->param = Parameter;
1072 t->period = Period;
1073 t->flags = Flags;
1074 t->destroy = FALSE;
1075 t->event = NULL;
1077 status = STATUS_SUCCESS;
1078 RtlEnterCriticalSection(&q->cs);
1079 if (q->quit)
1080 status = STATUS_INVALID_HANDLE;
1081 else
1082 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
1083 RtlLeaveCriticalSection(&q->cs);
1085 if (status == STATUS_SUCCESS)
1086 *NewTimer = t;
1087 else
1088 RtlFreeHeap(GetProcessHeap(), 0, t);
1090 return status;
1093 /***********************************************************************
1094 * RtlUpdateTimer (NTDLL.@)
1096 * Changes the time at which a timer expires.
1098 * PARAMS
1099 * TimerQueue [I] The queue that holds the timer.
1100 * Timer [I] The timer to update.
1101 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1102 * Period [I] The period, in milliseconds, at which to fire the timer
1103 * after the first callback. If zero, the timer will not
1104 * refire once. It still needs to be deleted with
1105 * RtlDeleteTimer.
1107 * RETURNS
1108 * Success: STATUS_SUCCESS.
1109 * Failure: Any NTSTATUS code.
1111 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
1112 DWORD DueTime, DWORD Period)
1114 struct queue_timer *t = Timer;
1115 struct timer_queue *q = t->q;
1117 RtlEnterCriticalSection(&q->cs);
1118 /* Can't change a timer if it was once-only or destroyed. */
1119 if (t->expire != EXPIRE_NEVER)
1121 t->period = Period;
1122 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
1124 RtlLeaveCriticalSection(&q->cs);
1126 return STATUS_SUCCESS;
1129 /***********************************************************************
1130 * RtlDeleteTimer (NTDLL.@)
1132 * Cancels a timer-queue timer.
1134 * PARAMS
1135 * TimerQueue [I] The queue that holds the timer.
1136 * Timer [I] The timer to update.
1137 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1138 * wait until the timer is finished firing all pending
1139 * callbacks before returning. Otherwise, return
1140 * immediately and set the timer is done.
1142 * RETURNS
1143 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1144 or if the completion event is NULL.
1145 * Failure: Any NTSTATUS code.
1147 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1148 HANDLE CompletionEvent)
1150 struct queue_timer *t = Timer;
1151 struct timer_queue *q;
1152 NTSTATUS status = STATUS_PENDING;
1153 HANDLE event = NULL;
1155 if (!Timer)
1156 return STATUS_INVALID_PARAMETER_1;
1157 q = t->q;
1158 if (CompletionEvent == INVALID_HANDLE_VALUE)
1160 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1161 if (status == STATUS_SUCCESS)
1162 status = STATUS_PENDING;
1164 else if (CompletionEvent)
1165 event = CompletionEvent;
1167 RtlEnterCriticalSection(&q->cs);
1168 t->event = event;
1169 if (t->runcount == 0 && event)
1170 status = STATUS_SUCCESS;
1171 queue_destroy_timer(t);
1172 RtlLeaveCriticalSection(&q->cs);
1174 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1176 if (status == STATUS_PENDING)
1178 NtWaitForSingleObject(event, FALSE, NULL);
1179 status = STATUS_SUCCESS;
1181 NtClose(event);
1184 return status;
1187 /***********************************************************************
1188 * timerqueue_thread_proc (internal)
1190 static void CALLBACK timerqueue_thread_proc( void *param )
1192 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1193 struct threadpool_object *other_timer;
1194 LARGE_INTEGER now, timeout;
1195 struct list *ptr;
1197 TRACE( "starting timer queue thread\n" );
1199 RtlEnterCriticalSection( &timerqueue.cs );
1200 for (;;)
1202 NtQuerySystemTime( &now );
1204 /* Check for expired timers. */
1205 while ((ptr = list_head( &timerqueue.pending_timers )))
1207 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1208 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1209 assert( timer->u.timer.timer_pending );
1210 if (timer->u.timer.timeout > now.QuadPart)
1211 break;
1213 /* Queue a new callback in one of the worker threads. */
1214 list_remove( &timer->u.timer.timer_entry );
1215 timer->u.timer.timer_pending = FALSE;
1216 tp_object_submit( timer, FALSE );
1218 /* Insert the timer back into the queue, except its marked for shutdown. */
1219 if (timer->u.timer.period && !timer->shutdown)
1221 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1222 if (timer->u.timer.timeout <= now.QuadPart)
1223 timer->u.timer.timeout = now.QuadPart + 1;
1225 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1226 struct threadpool_object, u.timer.timer_entry )
1228 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1229 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1230 break;
1232 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1233 timer->u.timer.timer_pending = TRUE;
1237 timeout_lower = TIMEOUT_INFINITE;
1238 timeout_upper = TIMEOUT_INFINITE;
1240 /* Determine next timeout and use the window length to optimize wakeup times. */
1241 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1242 struct threadpool_object, u.timer.timer_entry )
1244 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1245 if (other_timer->u.timer.timeout >= timeout_upper)
1246 break;
1248 timeout_lower = other_timer->u.timer.timeout;
1249 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1250 if (new_timeout < timeout_upper)
1251 timeout_upper = new_timeout;
1254 /* Wait for timer update events or until the next timer expires. */
1255 if (timerqueue.objcount)
1257 timeout.QuadPart = timeout_lower;
1258 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1259 continue;
1262 /* All timers have been destroyed, if no new timers are created
1263 * within some amount of time, then we can shutdown this thread. */
1264 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1265 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1266 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1268 break;
1272 timerqueue.thread_running = FALSE;
1273 RtlLeaveCriticalSection( &timerqueue.cs );
1275 TRACE( "terminating timer queue thread\n" );
1276 RtlExitUserThread( 0 );
1279 /***********************************************************************
1280 * tp_timerqueue_lock (internal)
1282 * Acquires a lock on the global timerqueue. When the lock is acquired
1283 * successfully, it is guaranteed that the timer thread is running.
1285 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1287 NTSTATUS status = STATUS_SUCCESS;
1288 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1290 timer->u.timer.timer_initialized = FALSE;
1291 timer->u.timer.timer_pending = FALSE;
1292 timer->u.timer.timer_set = FALSE;
1293 timer->u.timer.timeout = 0;
1294 timer->u.timer.period = 0;
1295 timer->u.timer.window_length = 0;
1297 RtlEnterCriticalSection( &timerqueue.cs );
1299 /* Make sure that the timerqueue thread is running. */
1300 if (!timerqueue.thread_running)
1302 HANDLE thread;
1303 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1304 timerqueue_thread_proc, NULL, &thread, NULL );
1305 if (status == STATUS_SUCCESS)
1307 timerqueue.thread_running = TRUE;
1308 NtClose( thread );
1312 if (status == STATUS_SUCCESS)
1314 timer->u.timer.timer_initialized = TRUE;
1315 timerqueue.objcount++;
1318 RtlLeaveCriticalSection( &timerqueue.cs );
1319 return status;
1322 /***********************************************************************
1323 * tp_timerqueue_unlock (internal)
1325 * Releases a lock on the global timerqueue.
1327 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1329 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1331 RtlEnterCriticalSection( &timerqueue.cs );
1332 if (timer->u.timer.timer_initialized)
1334 /* If timer was pending, remove it. */
1335 if (timer->u.timer.timer_pending)
1337 list_remove( &timer->u.timer.timer_entry );
1338 timer->u.timer.timer_pending = FALSE;
1341 /* If the last timer object was destroyed, then wake up the thread. */
1342 if (!--timerqueue.objcount)
1344 assert( list_empty( &timerqueue.pending_timers ) );
1345 RtlWakeAllConditionVariable( &timerqueue.update_event );
1348 timer->u.timer.timer_initialized = FALSE;
1350 RtlLeaveCriticalSection( &timerqueue.cs );
1353 /***********************************************************************
1354 * waitqueue_thread_proc (internal)
1356 static void CALLBACK waitqueue_thread_proc( void *param )
1358 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1359 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1360 struct waitqueue_bucket *bucket = param;
1361 struct threadpool_object *wait, *next;
1362 LARGE_INTEGER now, timeout;
1363 DWORD num_handles;
1364 NTSTATUS status;
1366 TRACE( "starting wait queue thread\n" );
1368 RtlEnterCriticalSection( &waitqueue.cs );
1370 for (;;)
1372 NtQuerySystemTime( &now );
1373 timeout.QuadPart = TIMEOUT_INFINITE;
1374 num_handles = 0;
1376 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1377 u.wait.wait_entry )
1379 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1380 if (wait->u.wait.timeout <= now.QuadPart)
1382 /* Wait object timed out. */
1383 list_remove( &wait->u.wait.wait_entry );
1384 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1385 tp_object_submit( wait, FALSE );
1387 else
1389 if (wait->u.wait.timeout < timeout.QuadPart)
1390 timeout.QuadPart = wait->u.wait.timeout;
1392 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1393 interlocked_inc( &wait->refcount );
1394 objects[num_handles] = wait;
1395 handles[num_handles] = wait->u.wait.handle;
1396 num_handles++;
1400 if (!bucket->objcount)
1402 /* All wait objects have been destroyed, if no new wait objects are created
1403 * within some amount of time, then we can shutdown this thread. */
1404 assert( num_handles == 0 );
1405 RtlLeaveCriticalSection( &waitqueue.cs );
1406 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1407 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout );
1408 RtlEnterCriticalSection( &waitqueue.cs );
1410 if (status == STATUS_TIMEOUT && !bucket->objcount)
1411 break;
1413 else
1415 handles[num_handles] = bucket->update_event;
1416 RtlLeaveCriticalSection( &waitqueue.cs );
1417 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout );
1418 RtlEnterCriticalSection( &waitqueue.cs );
1420 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1422 wait = objects[status - STATUS_WAIT_0];
1423 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1424 if (wait->u.wait.bucket)
1426 /* Wait object signaled. */
1427 assert( wait->u.wait.bucket == bucket );
1428 list_remove( &wait->u.wait.wait_entry );
1429 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1430 tp_object_submit( wait, TRUE );
1432 else
1433 ERR("wait object %p triggered while object was destroyed\n", wait);
1436 /* Release temporary references to wait objects. */
1437 while (num_handles)
1439 wait = objects[--num_handles];
1440 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1441 tp_object_release( wait );
1445 /* Try to merge bucket with other threads. */
1446 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1447 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1449 struct waitqueue_bucket *other_bucket;
1450 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1452 if (other_bucket != bucket && other_bucket->objcount &&
1453 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1455 other_bucket->objcount += bucket->objcount;
1456 bucket->objcount = 0;
1458 /* Update reserved list. */
1459 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1461 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1462 wait->u.wait.bucket = other_bucket;
1464 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1466 /* Update waiting list. */
1467 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1469 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1470 wait->u.wait.bucket = other_bucket;
1472 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1474 /* Move bucket to the end, to keep the probability of
1475 * newly added wait objects as small as possible. */
1476 list_remove( &bucket->bucket_entry );
1477 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1479 NtSetEvent( other_bucket->update_event, NULL );
1480 break;
1486 /* Remove this bucket from the list. */
1487 list_remove( &bucket->bucket_entry );
1488 if (!--waitqueue.num_buckets)
1489 assert( list_empty( &waitqueue.buckets ) );
1491 RtlLeaveCriticalSection( &waitqueue.cs );
1493 TRACE( "terminating wait queue thread\n" );
1495 assert( bucket->objcount == 0 );
1496 assert( list_empty( &bucket->reserved ) );
1497 assert( list_empty( &bucket->waiting ) );
1498 NtClose( bucket->update_event );
1500 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1501 RtlExitUserThread( 0 );
1504 /***********************************************************************
1505 * tp_waitqueue_lock (internal)
1507 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1509 struct waitqueue_bucket *bucket;
1510 NTSTATUS status;
1511 HANDLE thread;
1512 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1514 wait->u.wait.signaled = 0;
1515 wait->u.wait.bucket = NULL;
1516 wait->u.wait.wait_pending = FALSE;
1517 wait->u.wait.timeout = 0;
1518 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1520 RtlEnterCriticalSection( &waitqueue.cs );
1522 /* Try to assign to existing bucket if possible. */
1523 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1525 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS)
1527 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1528 wait->u.wait.bucket = bucket;
1529 bucket->objcount++;
1531 status = STATUS_SUCCESS;
1532 goto out;
1536 /* Create a new bucket and corresponding worker thread. */
1537 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1538 if (!bucket)
1540 status = STATUS_NO_MEMORY;
1541 goto out;
1544 bucket->objcount = 0;
1545 list_init( &bucket->reserved );
1546 list_init( &bucket->waiting );
1548 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1549 NULL, SynchronizationEvent, FALSE );
1550 if (status)
1552 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1553 goto out;
1556 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1557 waitqueue_thread_proc, bucket, &thread, NULL );
1558 if (status == STATUS_SUCCESS)
1560 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1561 waitqueue.num_buckets++;
1563 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1564 wait->u.wait.bucket = bucket;
1565 bucket->objcount++;
1567 NtClose( thread );
1569 else
1571 NtClose( bucket->update_event );
1572 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1575 out:
1576 RtlLeaveCriticalSection( &waitqueue.cs );
1577 return status;
1580 /***********************************************************************
1581 * tp_waitqueue_unlock (internal)
1583 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1585 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1587 RtlEnterCriticalSection( &waitqueue.cs );
1588 if (wait->u.wait.bucket)
1590 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1591 assert( bucket->objcount > 0 );
1593 list_remove( &wait->u.wait.wait_entry );
1594 wait->u.wait.bucket = NULL;
1595 bucket->objcount--;
1597 NtSetEvent( bucket->update_event, NULL );
1599 RtlLeaveCriticalSection( &waitqueue.cs );
1602 /***********************************************************************
1603 * tp_threadpool_alloc (internal)
1605 * Allocates a new threadpool object.
1607 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1609 struct threadpool *pool;
1611 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1612 if (!pool)
1613 return STATUS_NO_MEMORY;
1615 pool->refcount = 1;
1616 pool->objcount = 0;
1617 pool->shutdown = FALSE;
1619 RtlInitializeCriticalSection( &pool->cs );
1620 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1622 list_init( &pool->pool );
1623 RtlInitializeConditionVariable( &pool->update_event );
1625 pool->max_workers = 500;
1626 pool->min_workers = 0;
1627 pool->num_workers = 0;
1628 pool->num_busy_workers = 0;
1630 TRACE( "allocated threadpool %p\n", pool );
1632 *out = pool;
1633 return STATUS_SUCCESS;
1636 /***********************************************************************
1637 * tp_threadpool_shutdown (internal)
1639 * Prepares the shutdown of a threadpool object and notifies all worker
1640 * threads to terminate (after all remaining work items have been
1641 * processed).
1643 static void tp_threadpool_shutdown( struct threadpool *pool )
1645 assert( pool != default_threadpool );
1647 pool->shutdown = TRUE;
1648 RtlWakeAllConditionVariable( &pool->update_event );
1651 /***********************************************************************
1652 * tp_threadpool_release (internal)
1654 * Releases a reference to a threadpool object.
1656 static BOOL tp_threadpool_release( struct threadpool *pool )
1658 if (interlocked_dec( &pool->refcount ))
1659 return FALSE;
1661 TRACE( "destroying threadpool %p\n", pool );
1663 assert( pool->shutdown );
1664 assert( !pool->objcount );
1665 assert( list_empty( &pool->pool ) );
1667 pool->cs.DebugInfo->Spare[0] = 0;
1668 RtlDeleteCriticalSection( &pool->cs );
1670 RtlFreeHeap( GetProcessHeap(), 0, pool );
1671 return TRUE;
1674 /***********************************************************************
1675 * tp_threadpool_lock (internal)
1677 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1678 * block. When the lock is acquired successfully, it is guaranteed that
1679 * there is at least one worker thread to process tasks.
1681 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1683 struct threadpool *pool = NULL;
1684 NTSTATUS status = STATUS_SUCCESS;
1686 if (environment)
1687 pool = (struct threadpool *)environment->Pool;
1689 if (!pool)
1691 if (!default_threadpool)
1693 status = tp_threadpool_alloc( &pool );
1694 if (status != STATUS_SUCCESS)
1695 return status;
1697 if (interlocked_cmpxchg_ptr( (void *)&default_threadpool, pool, NULL ) != NULL)
1699 tp_threadpool_shutdown( pool );
1700 tp_threadpool_release( pool );
1704 pool = default_threadpool;
1707 RtlEnterCriticalSection( &pool->cs );
1709 /* Make sure that the threadpool has at least one thread. */
1710 if (!pool->num_workers)
1712 HANDLE thread;
1713 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1714 threadpool_worker_proc, pool, &thread, NULL );
1715 if (status == STATUS_SUCCESS)
1717 interlocked_inc( &pool->refcount );
1718 pool->num_workers++;
1719 NtClose( thread );
1723 /* Keep a reference, and increment objcount to ensure that the
1724 * last thread doesn't terminate. */
1725 if (status == STATUS_SUCCESS)
1727 interlocked_inc( &pool->refcount );
1728 pool->objcount++;
1731 RtlLeaveCriticalSection( &pool->cs );
1733 if (status != STATUS_SUCCESS)
1734 return status;
1736 *out = pool;
1737 return STATUS_SUCCESS;
1740 /***********************************************************************
1741 * tp_threadpool_unlock (internal)
1743 * Releases a lock on a threadpool.
1745 static void tp_threadpool_unlock( struct threadpool *pool )
1747 RtlEnterCriticalSection( &pool->cs );
1748 pool->objcount--;
1749 RtlLeaveCriticalSection( &pool->cs );
1750 tp_threadpool_release( pool );
1753 /***********************************************************************
1754 * tp_group_alloc (internal)
1756 * Allocates a new threadpool group object.
1758 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1760 struct threadpool_group *group;
1762 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1763 if (!group)
1764 return STATUS_NO_MEMORY;
1766 group->refcount = 1;
1767 group->shutdown = FALSE;
1769 RtlInitializeCriticalSection( &group->cs );
1770 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1772 list_init( &group->members );
1774 TRACE( "allocated group %p\n", group );
1776 *out = group;
1777 return STATUS_SUCCESS;
1780 /***********************************************************************
1781 * tp_group_shutdown (internal)
1783 * Marks the group object for shutdown.
1785 static void tp_group_shutdown( struct threadpool_group *group )
1787 group->shutdown = TRUE;
1790 /***********************************************************************
1791 * tp_group_release (internal)
1793 * Releases a reference to a group object.
1795 static BOOL tp_group_release( struct threadpool_group *group )
1797 if (interlocked_dec( &group->refcount ))
1798 return FALSE;
1800 TRACE( "destroying group %p\n", group );
1802 assert( group->shutdown );
1803 assert( list_empty( &group->members ) );
1805 group->cs.DebugInfo->Spare[0] = 0;
1806 RtlDeleteCriticalSection( &group->cs );
1808 RtlFreeHeap( GetProcessHeap(), 0, group );
1809 return TRUE;
1812 /***********************************************************************
1813 * tp_object_initialize (internal)
1815 * Initializes members of a threadpool object.
1817 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
1818 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
1820 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
1822 object->refcount = 1;
1823 object->shutdown = FALSE;
1825 object->pool = pool;
1826 object->group = NULL;
1827 object->userdata = userdata;
1828 object->group_cancel_callback = NULL;
1829 object->finalization_callback = NULL;
1830 object->may_run_long = 0;
1831 object->race_dll = NULL;
1833 memset( &object->group_entry, 0, sizeof(object->group_entry) );
1834 object->is_group_member = FALSE;
1836 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
1837 RtlInitializeConditionVariable( &object->finished_event );
1838 RtlInitializeConditionVariable( &object->group_finished_event );
1839 object->num_pending_callbacks = 0;
1840 object->num_running_callbacks = 0;
1841 object->num_associated_callbacks = 0;
1843 if (environment)
1845 if (environment->Version != 1)
1846 FIXME( "unsupported environment version %u\n", environment->Version );
1848 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
1849 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
1850 object->finalization_callback = environment->FinalizationCallback;
1851 object->may_run_long = environment->u.s.LongFunction != 0;
1852 object->race_dll = environment->RaceDll;
1854 if (environment->ActivationContext)
1855 FIXME( "activation context not supported yet\n" );
1857 if (environment->u.s.Persistent)
1858 FIXME( "persistent threads not supported yet\n" );
1861 if (object->race_dll)
1862 LdrAddRefDll( 0, object->race_dll );
1864 TRACE( "allocated object %p of type %u\n", object, object->type );
1866 /* For simple callbacks we have to run tp_object_submit before adding this object
1867 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1868 * will be set, and tp_object_submit would fail with an assertion. */
1870 if (is_simple_callback)
1871 tp_object_submit( object, FALSE );
1873 if (object->group)
1875 struct threadpool_group *group = object->group;
1876 interlocked_inc( &group->refcount );
1878 RtlEnterCriticalSection( &group->cs );
1879 list_add_tail( &group->members, &object->group_entry );
1880 object->is_group_member = TRUE;
1881 RtlLeaveCriticalSection( &group->cs );
1884 if (is_simple_callback)
1886 tp_object_shutdown( object );
1887 tp_object_release( object );
1891 /***********************************************************************
1892 * tp_object_submit (internal)
1894 * Submits a threadpool object to the associcated threadpool. This
1895 * function has to be VOID because TpPostWork can never fail on Windows.
1897 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
1899 struct threadpool *pool = object->pool;
1900 NTSTATUS status = STATUS_UNSUCCESSFUL;
1902 assert( !object->shutdown );
1903 assert( !pool->shutdown );
1905 RtlEnterCriticalSection( &pool->cs );
1907 /* Start new worker threads if required. */
1908 if (pool->num_busy_workers >= pool->num_workers &&
1909 pool->num_workers < pool->max_workers)
1911 HANDLE thread;
1912 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1913 threadpool_worker_proc, pool, &thread, NULL );
1914 if (status == STATUS_SUCCESS)
1916 interlocked_inc( &pool->refcount );
1917 pool->num_workers++;
1918 NtClose( thread );
1922 /* Queue work item and increment refcount. */
1923 interlocked_inc( &object->refcount );
1924 if (!object->num_pending_callbacks++)
1925 list_add_tail( &pool->pool, &object->pool_entry );
1927 /* Count how often the object was signaled. */
1928 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
1929 object->u.wait.signaled++;
1931 /* No new thread started - wake up one existing thread. */
1932 if (status != STATUS_SUCCESS)
1934 assert( pool->num_workers > 0 );
1935 RtlWakeConditionVariable( &pool->update_event );
1938 RtlLeaveCriticalSection( &pool->cs );
1941 /***********************************************************************
1942 * tp_object_cancel (internal)
1944 * Cancels all currently pending callbacks for a specific object.
1946 static void tp_object_cancel( struct threadpool_object *object, BOOL group_cancel, PVOID userdata )
1948 struct threadpool *pool = object->pool;
1949 LONG pending_callbacks = 0;
1951 RtlEnterCriticalSection( &pool->cs );
1952 if (object->num_pending_callbacks)
1954 pending_callbacks = object->num_pending_callbacks;
1955 object->num_pending_callbacks = 0;
1956 list_remove( &object->pool_entry );
1958 if (object->type == TP_OBJECT_TYPE_WAIT)
1959 object->u.wait.signaled = 0;
1961 RtlLeaveCriticalSection( &pool->cs );
1963 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
1964 if (pending_callbacks && group_cancel && object->group_cancel_callback)
1966 TRACE( "executing group cancel callback %p(%p, %p)\n", object->group_cancel_callback, object, userdata );
1967 object->group_cancel_callback( object, userdata );
1968 TRACE( "callback %p returned\n", object->group_cancel_callback );
1971 while (pending_callbacks--)
1972 tp_object_release( object );
1975 /***********************************************************************
1976 * tp_object_wait (internal)
1978 * Waits until all pending and running callbacks of a specific object
1979 * have been processed.
1981 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
1983 struct threadpool *pool = object->pool;
1985 RtlEnterCriticalSection( &pool->cs );
1986 if (group_wait)
1988 while (object->num_pending_callbacks || object->num_running_callbacks)
1989 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
1991 else
1993 while (object->num_pending_callbacks || object->num_associated_callbacks)
1994 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
1996 RtlLeaveCriticalSection( &pool->cs );
1999 /***********************************************************************
2000 * tp_object_shutdown (internal)
2002 * Marks a threadpool object for shutdown (which means that no further
2003 * tasks can be submitted).
2005 static void tp_object_shutdown( struct threadpool_object *object )
2007 if (object->type == TP_OBJECT_TYPE_TIMER)
2008 tp_timerqueue_unlock( object );
2009 else if (object->type == TP_OBJECT_TYPE_WAIT)
2010 tp_waitqueue_unlock( object );
2012 object->shutdown = TRUE;
2015 /***********************************************************************
2016 * tp_object_release (internal)
2018 * Releases a reference to a threadpool object.
2020 static BOOL tp_object_release( struct threadpool_object *object )
2022 if (interlocked_dec( &object->refcount ))
2023 return FALSE;
2025 TRACE( "destroying object %p of type %u\n", object, object->type );
2027 assert( object->shutdown );
2028 assert( !object->num_pending_callbacks );
2029 assert( !object->num_running_callbacks );
2030 assert( !object->num_associated_callbacks );
2032 /* release reference to the group */
2033 if (object->group)
2035 struct threadpool_group *group = object->group;
2037 RtlEnterCriticalSection( &group->cs );
2038 if (object->is_group_member)
2040 list_remove( &object->group_entry );
2041 object->is_group_member = FALSE;
2043 RtlLeaveCriticalSection( &group->cs );
2045 tp_group_release( group );
2048 tp_threadpool_unlock( object->pool );
2050 if (object->race_dll)
2051 LdrUnloadDll( object->race_dll );
2053 RtlFreeHeap( GetProcessHeap(), 0, object );
2054 return TRUE;
2057 /***********************************************************************
2058 * threadpool_worker_proc (internal)
2060 static void CALLBACK threadpool_worker_proc( void *param )
2062 TP_CALLBACK_INSTANCE *callback_instance;
2063 struct threadpool_instance instance;
2064 struct threadpool *pool = param;
2065 TP_WAIT_RESULT wait_result = 0;
2066 LARGE_INTEGER timeout;
2067 struct list *ptr;
2068 NTSTATUS status;
2070 TRACE( "starting worker thread for pool %p\n", pool );
2072 RtlEnterCriticalSection( &pool->cs );
2073 for (;;)
2075 while ((ptr = list_head( &pool->pool )))
2077 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2078 assert( object->num_pending_callbacks > 0 );
2080 /* If further pending callbacks are queued, move the work item to
2081 * the end of the pool list. Otherwise remove it from the pool. */
2082 list_remove( &object->pool_entry );
2083 if (--object->num_pending_callbacks)
2084 list_add_tail( &pool->pool, &object->pool_entry );
2086 /* For wait objects check if they were signaled or have timed out. */
2087 if (object->type == TP_OBJECT_TYPE_WAIT)
2089 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2090 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2093 /* Leave critical section and do the actual callback. */
2094 object->num_associated_callbacks++;
2095 object->num_running_callbacks++;
2096 pool->num_busy_workers++;
2097 RtlLeaveCriticalSection( &pool->cs );
2099 /* Initialize threadpool instance struct. */
2100 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2101 instance.object = object;
2102 instance.threadid = GetCurrentThreadId();
2103 instance.associated = TRUE;
2104 instance.may_run_long = object->may_run_long;
2105 instance.cleanup.critical_section = NULL;
2106 instance.cleanup.mutex = NULL;
2107 instance.cleanup.semaphore = NULL;
2108 instance.cleanup.semaphore_count = 0;
2109 instance.cleanup.event = NULL;
2110 instance.cleanup.library = NULL;
2112 switch (object->type)
2114 case TP_OBJECT_TYPE_SIMPLE:
2116 TRACE( "executing simple callback %p(%p, %p)\n",
2117 object->u.simple.callback, callback_instance, object->userdata );
2118 object->u.simple.callback( callback_instance, object->userdata );
2119 TRACE( "callback %p returned\n", object->u.simple.callback );
2120 break;
2123 case TP_OBJECT_TYPE_WORK:
2125 TRACE( "executing work callback %p(%p, %p, %p)\n",
2126 object->u.work.callback, callback_instance, object->userdata, object );
2127 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2128 TRACE( "callback %p returned\n", object->u.work.callback );
2129 break;
2132 case TP_OBJECT_TYPE_TIMER:
2134 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2135 object->u.timer.callback, callback_instance, object->userdata, object );
2136 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2137 TRACE( "callback %p returned\n", object->u.timer.callback );
2138 break;
2141 case TP_OBJECT_TYPE_WAIT:
2143 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2144 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2145 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2146 TRACE( "callback %p returned\n", object->u.wait.callback );
2147 break;
2150 default:
2151 assert(0);
2152 break;
2155 /* Execute finalization callback. */
2156 if (object->finalization_callback)
2158 TRACE( "executing finalization callback %p(%p, %p)\n",
2159 object->finalization_callback, callback_instance, object->userdata );
2160 object->finalization_callback( callback_instance, object->userdata );
2161 TRACE( "callback %p returned\n", object->finalization_callback );
2164 /* Execute cleanup tasks. */
2165 if (instance.cleanup.critical_section)
2167 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2169 if (instance.cleanup.mutex)
2171 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2172 if (status != STATUS_SUCCESS) goto skip_cleanup;
2174 if (instance.cleanup.semaphore)
2176 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2177 if (status != STATUS_SUCCESS) goto skip_cleanup;
2179 if (instance.cleanup.event)
2181 status = NtSetEvent( instance.cleanup.event, NULL );
2182 if (status != STATUS_SUCCESS) goto skip_cleanup;
2184 if (instance.cleanup.library)
2186 LdrUnloadDll( instance.cleanup.library );
2189 skip_cleanup:
2190 RtlEnterCriticalSection( &pool->cs );
2191 pool->num_busy_workers--;
2193 object->num_running_callbacks--;
2194 if (!object->num_pending_callbacks && !object->num_running_callbacks)
2195 RtlWakeAllConditionVariable( &object->group_finished_event );
2197 if (instance.associated)
2199 object->num_associated_callbacks--;
2200 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2201 RtlWakeAllConditionVariable( &object->finished_event );
2204 tp_object_release( object );
2207 /* Shutdown worker thread if requested. */
2208 if (pool->shutdown)
2209 break;
2211 /* Wait for new tasks or until the timeout expires. A thread only terminates
2212 * when no new tasks are available, and the number of threads can be
2213 * decreased without violating the min_workers limit. An exception is when
2214 * min_workers == 0, then objcount is used to detect if the last thread
2215 * can be terminated. */
2216 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2217 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2218 !list_head( &pool->pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2219 (!pool->min_workers && !pool->objcount)))
2221 break;
2224 pool->num_workers--;
2225 RtlLeaveCriticalSection( &pool->cs );
2227 TRACE( "terminating worker thread for pool %p\n", pool );
2228 tp_threadpool_release( pool );
2229 RtlExitUserThread( 0 );
2232 /***********************************************************************
2233 * TpAllocCleanupGroup (NTDLL.@)
2235 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2237 TRACE( "%p\n", out );
2239 return tp_group_alloc( (struct threadpool_group **)out );
2242 /***********************************************************************
2243 * TpAllocPool (NTDLL.@)
2245 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2247 TRACE( "%p %p\n", out, reserved );
2249 if (reserved)
2250 FIXME( "reserved argument is nonzero (%p)", reserved );
2252 return tp_threadpool_alloc( (struct threadpool **)out );
2255 /***********************************************************************
2256 * TpAllocTimer (NTDLL.@)
2258 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2259 TP_CALLBACK_ENVIRON *environment )
2261 struct threadpool_object *object;
2262 struct threadpool *pool;
2263 NTSTATUS status;
2265 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2267 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2268 if (!object)
2269 return STATUS_NO_MEMORY;
2271 status = tp_threadpool_lock( &pool, environment );
2272 if (status)
2274 RtlFreeHeap( GetProcessHeap(), 0, object );
2275 return status;
2278 object->type = TP_OBJECT_TYPE_TIMER;
2279 object->u.timer.callback = callback;
2281 status = tp_timerqueue_lock( object );
2282 if (status)
2284 tp_threadpool_unlock( pool );
2285 RtlFreeHeap( GetProcessHeap(), 0, object );
2286 return status;
2289 tp_object_initialize( object, pool, userdata, environment );
2291 *out = (TP_TIMER *)object;
2292 return STATUS_SUCCESS;
2295 /***********************************************************************
2296 * TpAllocWait (NTDLL.@)
2298 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2299 TP_CALLBACK_ENVIRON *environment )
2301 struct threadpool_object *object;
2302 struct threadpool *pool;
2303 NTSTATUS status;
2305 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2307 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2308 if (!object)
2309 return STATUS_NO_MEMORY;
2311 status = tp_threadpool_lock( &pool, environment );
2312 if (status)
2314 RtlFreeHeap( GetProcessHeap(), 0, object );
2315 return status;
2318 object->type = TP_OBJECT_TYPE_WAIT;
2319 object->u.wait.callback = callback;
2321 status = tp_waitqueue_lock( object );
2322 if (status)
2324 tp_threadpool_unlock( pool );
2325 RtlFreeHeap( GetProcessHeap(), 0, object );
2326 return status;
2329 tp_object_initialize( object, pool, userdata, environment );
2331 *out = (TP_WAIT *)object;
2332 return STATUS_SUCCESS;
2335 /***********************************************************************
2336 * TpAllocWork (NTDLL.@)
2338 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2339 TP_CALLBACK_ENVIRON *environment )
2341 struct threadpool_object *object;
2342 struct threadpool *pool;
2343 NTSTATUS status;
2345 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2347 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2348 if (!object)
2349 return STATUS_NO_MEMORY;
2351 status = tp_threadpool_lock( &pool, environment );
2352 if (status)
2354 RtlFreeHeap( GetProcessHeap(), 0, object );
2355 return status;
2358 object->type = TP_OBJECT_TYPE_WORK;
2359 object->u.work.callback = callback;
2360 tp_object_initialize( object, pool, userdata, environment );
2362 *out = (TP_WORK *)object;
2363 return STATUS_SUCCESS;
2366 /***********************************************************************
2367 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2369 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2371 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2373 TRACE( "%p %p\n", instance, crit );
2375 if (!this->cleanup.critical_section)
2376 this->cleanup.critical_section = crit;
2379 /***********************************************************************
2380 * TpCallbackMayRunLong (NTDLL.@)
2382 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2384 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2385 struct threadpool_object *object = this->object;
2386 struct threadpool *pool;
2387 NTSTATUS status = STATUS_SUCCESS;
2389 TRACE( "%p\n", instance );
2391 if (this->threadid != GetCurrentThreadId())
2393 ERR("called from wrong thread, ignoring\n");
2394 return STATUS_UNSUCCESSFUL; /* FIXME */
2397 if (this->may_run_long)
2398 return STATUS_SUCCESS;
2400 pool = object->pool;
2401 RtlEnterCriticalSection( &pool->cs );
2403 /* Start new worker threads if required. */
2404 if (pool->num_busy_workers >= pool->num_workers)
2406 if (pool->num_workers < pool->max_workers)
2408 HANDLE thread;
2409 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
2410 threadpool_worker_proc, pool, &thread, NULL );
2411 if (status == STATUS_SUCCESS)
2413 interlocked_inc( &pool->refcount );
2414 pool->num_workers++;
2415 NtClose( thread );
2418 else
2420 status = STATUS_TOO_MANY_THREADS;
2424 RtlLeaveCriticalSection( &pool->cs );
2425 this->may_run_long = TRUE;
2426 return status;
2429 /***********************************************************************
2430 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2432 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2434 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2436 TRACE( "%p %p\n", instance, mutex );
2438 if (!this->cleanup.mutex)
2439 this->cleanup.mutex = mutex;
2442 /***********************************************************************
2443 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2445 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2447 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2449 TRACE( "%p %p %u\n", instance, semaphore, count );
2451 if (!this->cleanup.semaphore)
2453 this->cleanup.semaphore = semaphore;
2454 this->cleanup.semaphore_count = count;
2458 /***********************************************************************
2459 * TpCallbackSetEventOnCompletion (NTDLL.@)
2461 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2463 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2465 TRACE( "%p %p\n", instance, event );
2467 if (!this->cleanup.event)
2468 this->cleanup.event = event;
2471 /***********************************************************************
2472 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2474 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2476 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2478 TRACE( "%p %p\n", instance, module );
2480 if (!this->cleanup.library)
2481 this->cleanup.library = module;
2484 /***********************************************************************
2485 * TpDisassociateCallback (NTDLL.@)
2487 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2489 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2490 struct threadpool_object *object = this->object;
2491 struct threadpool *pool;
2493 TRACE( "%p\n", instance );
2495 if (this->threadid != GetCurrentThreadId())
2497 ERR("called from wrong thread, ignoring\n");
2498 return;
2501 if (!this->associated)
2502 return;
2504 pool = object->pool;
2505 RtlEnterCriticalSection( &pool->cs );
2507 object->num_associated_callbacks--;
2508 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2509 RtlWakeAllConditionVariable( &object->finished_event );
2511 RtlLeaveCriticalSection( &pool->cs );
2512 this->associated = FALSE;
2515 /***********************************************************************
2516 * TpIsTimerSet (NTDLL.@)
2518 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2520 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2522 TRACE( "%p\n", timer );
2524 return this->u.timer.timer_set;
2527 /***********************************************************************
2528 * TpPostWork (NTDLL.@)
2530 VOID WINAPI TpPostWork( TP_WORK *work )
2532 struct threadpool_object *this = impl_from_TP_WORK( work );
2534 TRACE( "%p\n", work );
2536 tp_object_submit( this, FALSE );
2539 /***********************************************************************
2540 * TpReleaseCleanupGroup (NTDLL.@)
2542 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2544 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2546 TRACE( "%p\n", group );
2548 tp_group_shutdown( this );
2549 tp_group_release( this );
2552 /***********************************************************************
2553 * TpReleaseCleanupGroupMembers (NTDLL.@)
2555 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2557 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2558 struct threadpool_object *object, *next;
2559 struct list members;
2561 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2563 RtlEnterCriticalSection( &this->cs );
2565 /* Unset group, increase references, and mark objects for shutdown */
2566 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2568 assert( object->group == this );
2569 assert( object->is_group_member );
2571 /* Simple callbacks are very special. The user doesn't hold any reference, so
2572 * they would be released too early. Add one additional temporary reference. */
2573 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2575 if (interlocked_inc( &object->refcount ) == 1)
2577 /* Object is basically already destroyed, but group reference
2578 * was not deleted yet. We can safely ignore this object. */
2579 interlocked_dec( &object->refcount );
2580 list_remove( &object->group_entry );
2581 object->is_group_member = FALSE;
2582 continue;
2586 object->is_group_member = FALSE;
2587 tp_object_shutdown( object );
2590 /* Move members to a new temporary list */
2591 list_init( &members );
2592 list_move_tail( &members, &this->members );
2594 RtlLeaveCriticalSection( &this->cs );
2596 /* Cancel pending callbacks if requested */
2597 if (cancel_pending)
2599 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2601 tp_object_cancel( object, TRUE, userdata );
2605 /* Wait for remaining callbacks to finish */
2606 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2608 tp_object_wait( object, TRUE );
2609 tp_object_release( object );
2613 /***********************************************************************
2614 * TpReleasePool (NTDLL.@)
2616 VOID WINAPI TpReleasePool( TP_POOL *pool )
2618 struct threadpool *this = impl_from_TP_POOL( pool );
2620 TRACE( "%p\n", pool );
2622 tp_threadpool_shutdown( this );
2623 tp_threadpool_release( this );
2626 /***********************************************************************
2627 * TpReleaseTimer (NTDLL.@)
2629 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2631 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2633 TRACE( "%p\n", timer );
2635 tp_object_shutdown( this );
2636 tp_object_release( this );
2639 /***********************************************************************
2640 * TpReleaseWait (NTDLL.@)
2642 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
2644 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2646 TRACE( "%p\n", wait );
2648 tp_object_shutdown( this );
2649 tp_object_release( this );
2652 /***********************************************************************
2653 * TpReleaseWork (NTDLL.@)
2655 VOID WINAPI TpReleaseWork( TP_WORK *work )
2657 struct threadpool_object *this = impl_from_TP_WORK( work );
2659 TRACE( "%p\n", work );
2661 tp_object_shutdown( this );
2662 tp_object_release( this );
2665 /***********************************************************************
2666 * TpSetPoolMaxThreads (NTDLL.@)
2668 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
2670 struct threadpool *this = impl_from_TP_POOL( pool );
2672 TRACE( "%p %u\n", pool, maximum );
2674 RtlEnterCriticalSection( &this->cs );
2675 this->max_workers = max( maximum, 1 );
2676 this->min_workers = min( this->min_workers, this->max_workers );
2677 RtlLeaveCriticalSection( &this->cs );
2680 /***********************************************************************
2681 * TpSetPoolMinThreads (NTDLL.@)
2683 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
2685 struct threadpool *this = impl_from_TP_POOL( pool );
2686 NTSTATUS status = STATUS_SUCCESS;
2688 TRACE( "%p %u\n", pool, minimum );
2690 RtlEnterCriticalSection( &this->cs );
2692 while (this->num_workers < minimum)
2694 HANDLE thread;
2695 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
2696 threadpool_worker_proc, this, &thread, NULL );
2697 if (status != STATUS_SUCCESS)
2698 break;
2700 interlocked_inc( &this->refcount );
2701 this->num_workers++;
2702 NtClose( thread );
2705 if (status == STATUS_SUCCESS)
2707 this->min_workers = minimum;
2708 this->max_workers = max( this->min_workers, this->max_workers );
2711 RtlLeaveCriticalSection( &this->cs );
2712 return !status;
2715 /***********************************************************************
2716 * TpSetTimer (NTDLL.@)
2718 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
2720 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2721 struct threadpool_object *other_timer;
2722 BOOL submit_timer = FALSE;
2723 ULONGLONG timestamp;
2725 TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
2727 RtlEnterCriticalSection( &timerqueue.cs );
2729 assert( this->u.timer.timer_initialized );
2730 this->u.timer.timer_set = timeout != NULL;
2732 /* Convert relative timeout to absolute timestamp and handle a timeout
2733 * of zero, which means that the timer is submitted immediately. */
2734 if (timeout)
2736 timestamp = timeout->QuadPart;
2737 if ((LONGLONG)timestamp < 0)
2739 LARGE_INTEGER now;
2740 NtQuerySystemTime( &now );
2741 timestamp = now.QuadPart - timestamp;
2743 else if (!timestamp)
2745 if (!period)
2746 timeout = NULL;
2747 else
2749 LARGE_INTEGER now;
2750 NtQuerySystemTime( &now );
2751 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
2753 submit_timer = TRUE;
2757 /* First remove existing timeout. */
2758 if (this->u.timer.timer_pending)
2760 list_remove( &this->u.timer.timer_entry );
2761 this->u.timer.timer_pending = FALSE;
2764 /* If the timer was enabled, then add it back to the queue. */
2765 if (timeout)
2767 this->u.timer.timeout = timestamp;
2768 this->u.timer.period = period;
2769 this->u.timer.window_length = window_length;
2771 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
2772 struct threadpool_object, u.timer.timer_entry )
2774 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
2775 if (this->u.timer.timeout < other_timer->u.timer.timeout)
2776 break;
2778 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
2780 /* Wake up the timer thread when the timeout has to be updated. */
2781 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
2782 RtlWakeAllConditionVariable( &timerqueue.update_event );
2784 this->u.timer.timer_pending = TRUE;
2787 RtlLeaveCriticalSection( &timerqueue.cs );
2789 if (submit_timer)
2790 tp_object_submit( this, FALSE );
2793 /***********************************************************************
2794 * TpSetWait (NTDLL.@)
2796 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
2798 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2799 ULONGLONG timestamp = TIMEOUT_INFINITE;
2800 BOOL submit_wait = FALSE;
2802 TRACE( "%p %p %p\n", wait, handle, timeout );
2804 RtlEnterCriticalSection( &waitqueue.cs );
2806 assert( this->u.wait.bucket );
2807 this->u.wait.handle = handle;
2809 if (handle || this->u.wait.wait_pending)
2811 struct waitqueue_bucket *bucket = this->u.wait.bucket;
2812 list_remove( &this->u.wait.wait_entry );
2814 /* Convert relative timeout to absolute timestamp. */
2815 if (handle && timeout)
2817 timestamp = timeout->QuadPart;
2818 if ((LONGLONG)timestamp < 0)
2820 LARGE_INTEGER now;
2821 NtQuerySystemTime( &now );
2822 timestamp = now.QuadPart - timestamp;
2824 else if (!timestamp)
2826 submit_wait = TRUE;
2827 handle = NULL;
2831 /* Add wait object back into one of the queues. */
2832 if (handle)
2834 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
2835 this->u.wait.wait_pending = TRUE;
2836 this->u.wait.timeout = timestamp;
2838 else
2840 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
2841 this->u.wait.wait_pending = FALSE;
2844 /* Wake up the wait queue thread. */
2845 NtSetEvent( bucket->update_event, NULL );
2848 RtlLeaveCriticalSection( &waitqueue.cs );
2850 if (submit_wait)
2851 tp_object_submit( this, FALSE );
2854 /***********************************************************************
2855 * TpSimpleTryPost (NTDLL.@)
2857 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
2858 TP_CALLBACK_ENVIRON *environment )
2860 struct threadpool_object *object;
2861 struct threadpool *pool;
2862 NTSTATUS status;
2864 TRACE( "%p %p %p\n", callback, userdata, environment );
2866 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2867 if (!object)
2868 return STATUS_NO_MEMORY;
2870 status = tp_threadpool_lock( &pool, environment );
2871 if (status)
2873 RtlFreeHeap( GetProcessHeap(), 0, object );
2874 return status;
2877 object->type = TP_OBJECT_TYPE_SIMPLE;
2878 object->u.simple.callback = callback;
2879 tp_object_initialize( object, pool, userdata, environment );
2881 return STATUS_SUCCESS;
2884 /***********************************************************************
2885 * TpWaitForTimer (NTDLL.@)
2887 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
2889 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2891 TRACE( "%p %d\n", timer, cancel_pending );
2893 if (cancel_pending)
2894 tp_object_cancel( this, FALSE, NULL );
2895 tp_object_wait( this, FALSE );
2898 /***********************************************************************
2899 * TpWaitForWait (NTDLL.@)
2901 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
2903 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2905 TRACE( "%p %d\n", wait, cancel_pending );
2907 if (cancel_pending)
2908 tp_object_cancel( this, FALSE, NULL );
2909 tp_object_wait( this, FALSE );
2912 /***********************************************************************
2913 * TpWaitForWork (NTDLL.@)
2915 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
2917 struct threadpool_object *this = impl_from_TP_WORK( work );
2919 TRACE( "%p %u\n", work, cancel_pending );
2921 if (cancel_pending)
2922 tp_object_cancel( this, FALSE, NULL );
2923 tp_object_wait( this, FALSE );