ntdll: Mark newly spawned worker threads as busy.
[wine.git] / dlls / ntdll / threadpool.c
blob8605a229bb2d9491225bd2b1ed3c6d7009e79f93
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2015 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include "config.h"
23 #include "wine/port.h"
25 #include <assert.h>
26 #include <stdarg.h>
27 #include <limits.h>
29 #define NONAMELESSUNION
30 #include "ntstatus.h"
31 #define WIN32_NO_STATUS
32 #include "winternl.h"
34 #include "wine/debug.h"
35 #include "wine/list.h"
37 #include "ntdll_misc.h"
39 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
42 * Old thread pooling API
45 struct rtl_work_item
47 PRTL_WORK_ITEM_ROUTINE function;
48 PVOID context;
51 #define EXPIRE_NEVER (~(ULONGLONG)0)
52 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
54 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
56 static struct
58 HANDLE compl_port;
59 RTL_CRITICAL_SECTION threadpool_compl_cs;
61 old_threadpool =
63 NULL, /* compl_port */
64 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
67 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
69 0, 0, &old_threadpool.threadpool_compl_cs,
70 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
71 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
74 struct wait_work_item
76 HANDLE Object;
77 HANDLE CancelEvent;
78 WAITORTIMERCALLBACK Callback;
79 PVOID Context;
80 ULONG Milliseconds;
81 ULONG Flags;
82 HANDLE CompletionEvent;
83 LONG DeleteCount;
84 BOOLEAN CallbackInProgress;
87 struct timer_queue;
88 struct queue_timer
90 struct timer_queue *q;
91 struct list entry;
92 ULONG runcount; /* number of callbacks pending execution */
93 RTL_WAITORTIMERCALLBACKFUNC callback;
94 PVOID param;
95 DWORD period;
96 ULONG flags;
97 ULONGLONG expire;
98 BOOL destroy; /* timer should be deleted; once set, never unset */
99 HANDLE event; /* removal event */
102 struct timer_queue
104 DWORD magic;
105 RTL_CRITICAL_SECTION cs;
106 struct list timers; /* sorted by expiration time */
107 BOOL quit; /* queue should be deleted; once set, never unset */
108 HANDLE event;
109 HANDLE thread;
113 * Object-oriented thread pooling API
116 #define THREADPOOL_WORKER_TIMEOUT 5000
117 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
119 /* internal threadpool representation */
120 struct threadpool
122 LONG refcount;
123 LONG objcount;
124 BOOL shutdown;
125 CRITICAL_SECTION cs;
126 /* pool of work items, locked via .cs */
127 struct list pool;
128 RTL_CONDITION_VARIABLE update_event;
129 /* information about worker threads, locked via .cs */
130 int max_workers;
131 int min_workers;
132 int num_workers;
133 int num_busy_workers;
136 enum threadpool_objtype
138 TP_OBJECT_TYPE_SIMPLE,
139 TP_OBJECT_TYPE_WORK,
140 TP_OBJECT_TYPE_TIMER,
141 TP_OBJECT_TYPE_WAIT
144 /* internal threadpool object representation */
145 struct threadpool_object
147 LONG refcount;
148 BOOL shutdown;
149 /* read-only information */
150 enum threadpool_objtype type;
151 struct threadpool *pool;
152 struct threadpool_group *group;
153 PVOID userdata;
154 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
155 PTP_SIMPLE_CALLBACK finalization_callback;
156 BOOL may_run_long;
157 HMODULE race_dll;
158 /* information about the group, locked via .group->cs */
159 struct list group_entry;
160 BOOL is_group_member;
161 /* information about the pool, locked via .pool->cs */
162 struct list pool_entry;
163 RTL_CONDITION_VARIABLE finished_event;
164 RTL_CONDITION_VARIABLE group_finished_event;
165 LONG num_pending_callbacks;
166 LONG num_running_callbacks;
167 LONG num_associated_callbacks;
168 /* arguments for callback */
169 union
171 struct
173 PTP_SIMPLE_CALLBACK callback;
174 } simple;
175 struct
177 PTP_WORK_CALLBACK callback;
178 } work;
179 struct
181 PTP_TIMER_CALLBACK callback;
182 /* information about the timer, locked via timerqueue.cs */
183 BOOL timer_initialized;
184 BOOL timer_pending;
185 struct list timer_entry;
186 BOOL timer_set;
187 ULONGLONG timeout;
188 LONG period;
189 LONG window_length;
190 } timer;
191 struct
193 PTP_WAIT_CALLBACK callback;
194 LONG signaled;
195 /* information about the wait object, locked via waitqueue.cs */
196 struct waitqueue_bucket *bucket;
197 BOOL wait_pending;
198 struct list wait_entry;
199 ULONGLONG timeout;
200 HANDLE handle;
201 } wait;
202 } u;
205 /* internal threadpool instance representation */
206 struct threadpool_instance
208 struct threadpool_object *object;
209 DWORD threadid;
210 BOOL associated;
211 BOOL may_run_long;
212 struct
214 CRITICAL_SECTION *critical_section;
215 HANDLE mutex;
216 HANDLE semaphore;
217 LONG semaphore_count;
218 HANDLE event;
219 HMODULE library;
220 } cleanup;
223 /* internal threadpool group representation */
224 struct threadpool_group
226 LONG refcount;
227 BOOL shutdown;
228 CRITICAL_SECTION cs;
229 /* list of group members, locked via .cs */
230 struct list members;
233 /* global timerqueue object */
234 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
236 static struct
238 CRITICAL_SECTION cs;
239 LONG objcount;
240 BOOL thread_running;
241 struct list pending_timers;
242 RTL_CONDITION_VARIABLE update_event;
244 timerqueue =
246 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
247 0, /* objcount */
248 FALSE, /* thread_running */
249 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
250 RTL_CONDITION_VARIABLE_INIT /* update_event */
253 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
255 0, 0, &timerqueue.cs,
256 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
257 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
260 /* global waitqueue object */
261 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
263 static struct
265 CRITICAL_SECTION cs;
266 LONG num_buckets;
267 struct list buckets;
269 waitqueue =
271 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
272 0, /* num_buckets */
273 LIST_INIT( waitqueue.buckets ) /* buckets */
276 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
278 0, 0, &waitqueue.cs,
279 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
280 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
283 struct waitqueue_bucket
285 struct list bucket_entry;
286 LONG objcount;
287 struct list reserved;
288 struct list waiting;
289 HANDLE update_event;
292 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
294 return (struct threadpool *)pool;
297 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
299 struct threadpool_object *object = (struct threadpool_object *)work;
300 assert( object->type == TP_OBJECT_TYPE_WORK );
301 return object;
304 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
306 struct threadpool_object *object = (struct threadpool_object *)timer;
307 assert( object->type == TP_OBJECT_TYPE_TIMER );
308 return object;
311 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
313 struct threadpool_object *object = (struct threadpool_object *)wait;
314 assert( object->type == TP_OBJECT_TYPE_WAIT );
315 return object;
318 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
320 return (struct threadpool_group *)group;
323 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
325 return (struct threadpool_instance *)instance;
328 static void CALLBACK threadpool_worker_proc( void *param );
329 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
330 static void tp_object_shutdown( struct threadpool_object *object );
331 static BOOL tp_object_release( struct threadpool_object *object );
332 static struct threadpool *default_threadpool = NULL;
334 static inline LONG interlocked_inc( PLONG dest )
336 return interlocked_xchg_add( dest, 1 ) + 1;
339 static inline LONG interlocked_dec( PLONG dest )
341 return interlocked_xchg_add( dest, -1 ) - 1;
344 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
346 struct rtl_work_item *item = userdata;
348 TRACE("executing %p(%p)\n", item->function, item->context);
349 item->function( item->context );
351 RtlFreeHeap( GetProcessHeap(), 0, item );
354 /***********************************************************************
355 * RtlQueueWorkItem (NTDLL.@)
357 * Queues a work item into a thread in the thread pool.
359 * PARAMS
360 * function [I] Work function to execute.
361 * context [I] Context to pass to the work function when it is executed.
362 * flags [I] Flags. See notes.
364 * RETURNS
365 * Success: STATUS_SUCCESS.
366 * Failure: Any NTSTATUS code.
368 * NOTES
369 * Flags can be one or more of the following:
370 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
371 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
372 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
373 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
374 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
376 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
378 TP_CALLBACK_ENVIRON environment;
379 struct rtl_work_item *item;
380 NTSTATUS status;
382 TRACE( "%p %p %u\n", function, context, flags );
384 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
385 if (!item)
386 return STATUS_NO_MEMORY;
388 memset( &environment, 0, sizeof(environment) );
389 environment.Version = 1;
390 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
391 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
393 item->function = function;
394 item->context = context;
396 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
397 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
398 return status;
401 /***********************************************************************
402 * iocp_poller - get completion events and run callbacks
404 static DWORD CALLBACK iocp_poller(LPVOID Arg)
406 HANDLE cport = Arg;
408 while( TRUE )
410 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
411 LPVOID overlapped;
412 IO_STATUS_BLOCK iosb;
413 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
414 if (res)
416 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
418 else
420 DWORD transferred = 0;
421 DWORD err = 0;
423 if (iosb.u.Status == STATUS_SUCCESS)
424 transferred = iosb.Information;
425 else
426 err = RtlNtStatusToDosError(iosb.u.Status);
428 callback( err, transferred, overlapped );
431 return 0;
434 /***********************************************************************
435 * RtlSetIoCompletionCallback (NTDLL.@)
437 * Binds a handle to a thread pool's completion port, and possibly
438 * starts a non-I/O thread to monitor this port and call functions back.
440 * PARAMS
441 * FileHandle [I] Handle to bind to a completion port.
442 * Function [I] Callback function to call on I/O completions.
443 * Flags [I] Not used.
445 * RETURNS
446 * Success: STATUS_SUCCESS.
447 * Failure: Any NTSTATUS code.
450 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
452 IO_STATUS_BLOCK iosb;
453 FILE_COMPLETION_INFORMATION info;
455 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
457 if (!old_threadpool.compl_port)
459 NTSTATUS res = STATUS_SUCCESS;
461 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
462 if (!old_threadpool.compl_port)
464 HANDLE cport;
466 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
467 if (!res)
469 /* FIXME native can start additional threads in case of e.g. hung callback function. */
470 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
471 if (!res)
472 old_threadpool.compl_port = cport;
473 else
474 NtClose( cport );
477 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
478 if (res) return res;
481 info.CompletionPort = old_threadpool.compl_port;
482 info.CompletionKey = (ULONG_PTR)Function;
484 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
487 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
489 if (timeout == INFINITE) return NULL;
490 pTime->QuadPart = (ULONGLONG)timeout * -10000;
491 return pTime;
494 static void delete_wait_work_item(struct wait_work_item *wait_work_item)
496 NtClose( wait_work_item->CancelEvent );
497 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
500 static DWORD CALLBACK wait_thread_proc(LPVOID Arg)
502 struct wait_work_item *wait_work_item = Arg;
503 NTSTATUS status;
504 BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0;
505 HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent };
506 LARGE_INTEGER timeout;
507 HANDLE completion_event;
509 TRACE("\n");
511 while (TRUE)
513 status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable,
514 get_nt_timeout( &timeout, wait_work_item->Milliseconds ) );
515 if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT)
517 BOOLEAN TimerOrWaitFired;
519 if (status == STATUS_WAIT_0)
521 TRACE( "object %p signaled, calling callback %p with context %p\n",
522 wait_work_item->Object, wait_work_item->Callback,
523 wait_work_item->Context );
524 TimerOrWaitFired = FALSE;
526 else
528 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
529 wait_work_item->Object, wait_work_item->Callback,
530 wait_work_item->Context );
531 TimerOrWaitFired = TRUE;
533 wait_work_item->CallbackInProgress = TRUE;
534 wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired );
535 wait_work_item->CallbackInProgress = FALSE;
537 if (wait_work_item->Flags & WT_EXECUTEONLYONCE)
538 break;
540 else
541 break;
544 completion_event = wait_work_item->CompletionEvent;
545 if (completion_event) NtSetEvent( completion_event, NULL );
547 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
548 delete_wait_work_item( wait_work_item );
550 return 0;
553 /***********************************************************************
554 * RtlRegisterWait (NTDLL.@)
556 * Registers a wait for a handle to become signaled.
558 * PARAMS
559 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
560 * Object [I] Object to wait to become signaled.
561 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
562 * Context [I] Context to pass to the callback function when it is executed.
563 * Milliseconds [I] Number of milliseconds to wait before timing out.
564 * Flags [I] Flags. See notes.
566 * RETURNS
567 * Success: STATUS_SUCCESS.
568 * Failure: Any NTSTATUS code.
570 * NOTES
571 * Flags can be one or more of the following:
572 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
573 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
574 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
575 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
576 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
578 NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object,
579 RTL_WAITORTIMERCALLBACKFUNC Callback,
580 PVOID Context, ULONG Milliseconds, ULONG Flags)
582 struct wait_work_item *wait_work_item;
583 NTSTATUS status;
585 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags );
587 wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) );
588 if (!wait_work_item)
589 return STATUS_NO_MEMORY;
591 wait_work_item->Object = Object;
592 wait_work_item->Callback = Callback;
593 wait_work_item->Context = Context;
594 wait_work_item->Milliseconds = Milliseconds;
595 wait_work_item->Flags = Flags;
596 wait_work_item->CallbackInProgress = FALSE;
597 wait_work_item->DeleteCount = 0;
598 wait_work_item->CompletionEvent = NULL;
600 status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
601 if (status != STATUS_SUCCESS)
603 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
604 return status;
607 Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD |
608 WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION);
609 status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags );
610 if (status != STATUS_SUCCESS)
612 delete_wait_work_item( wait_work_item );
613 return status;
616 *NewWaitObject = wait_work_item;
617 return status;
620 /***********************************************************************
621 * RtlDeregisterWaitEx (NTDLL.@)
623 * Cancels a wait operation and frees the resources associated with calling
624 * RtlRegisterWait().
626 * PARAMS
627 * WaitObject [I] Handle to the wait object to free.
629 * RETURNS
630 * Success: STATUS_SUCCESS.
631 * Failure: Any NTSTATUS code.
633 NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent)
635 struct wait_work_item *wait_work_item = WaitHandle;
636 NTSTATUS status = STATUS_SUCCESS;
638 TRACE( "(%p)\n", WaitHandle );
640 NtSetEvent( wait_work_item->CancelEvent, NULL );
641 if (wait_work_item->CallbackInProgress)
643 if (CompletionEvent != NULL)
645 if (CompletionEvent == INVALID_HANDLE_VALUE)
647 status = NtCreateEvent( &CompletionEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
648 if (status != STATUS_SUCCESS)
649 return status;
650 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
651 if (wait_work_item->CallbackInProgress)
652 NtWaitForSingleObject( CompletionEvent, FALSE, NULL );
653 NtClose( CompletionEvent );
655 else
657 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
658 if (wait_work_item->CallbackInProgress)
659 status = STATUS_PENDING;
662 else
663 status = STATUS_PENDING;
666 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
668 status = STATUS_SUCCESS;
669 delete_wait_work_item( wait_work_item );
672 return status;
675 /***********************************************************************
676 * RtlDeregisterWait (NTDLL.@)
678 * Cancels a wait operation and frees the resources associated with calling
679 * RtlRegisterWait().
681 * PARAMS
682 * WaitObject [I] Handle to the wait object to free.
684 * RETURNS
685 * Success: STATUS_SUCCESS.
686 * Failure: Any NTSTATUS code.
688 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
690 return RtlDeregisterWaitEx(WaitHandle, NULL);
694 /************************** Timer Queue Impl **************************/
696 static void queue_remove_timer(struct queue_timer *t)
698 /* We MUST hold the queue cs while calling this function. This ensures
699 that we cannot queue another callback for this timer. The runcount
700 being zero makes sure we don't have any already queued. */
701 struct timer_queue *q = t->q;
703 assert(t->runcount == 0);
704 assert(t->destroy);
706 list_remove(&t->entry);
707 if (t->event)
708 NtSetEvent(t->event, NULL);
709 RtlFreeHeap(GetProcessHeap(), 0, t);
711 if (q->quit && list_empty(&q->timers))
712 NtSetEvent(q->event, NULL);
715 static void timer_cleanup_callback(struct queue_timer *t)
717 struct timer_queue *q = t->q;
718 RtlEnterCriticalSection(&q->cs);
720 assert(0 < t->runcount);
721 --t->runcount;
723 if (t->destroy && t->runcount == 0)
724 queue_remove_timer(t);
726 RtlLeaveCriticalSection(&q->cs);
729 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
731 struct queue_timer *t = p;
732 t->callback(t->param, TRUE);
733 timer_cleanup_callback(t);
734 return 0;
737 static inline ULONGLONG queue_current_time(void)
739 LARGE_INTEGER now, freq;
740 NtQueryPerformanceCounter(&now, &freq);
741 return now.QuadPart * 1000 / freq.QuadPart;
744 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
745 BOOL set_event)
747 /* We MUST hold the queue cs while calling this function. */
748 struct timer_queue *q = t->q;
749 struct list *ptr = &q->timers;
751 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
753 if (time != EXPIRE_NEVER)
754 LIST_FOR_EACH(ptr, &q->timers)
756 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
757 if (time < cur->expire)
758 break;
760 list_add_before(ptr, &t->entry);
762 t->expire = time;
764 /* If we insert at the head of the list, we need to expire sooner
765 than expected. */
766 if (set_event && &t->entry == list_head(&q->timers))
767 NtSetEvent(q->event, NULL);
770 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
771 BOOL set_event)
773 /* We MUST hold the queue cs while calling this function. */
774 list_remove(&t->entry);
775 queue_add_timer(t, time, set_event);
778 static void queue_timer_expire(struct timer_queue *q)
780 struct queue_timer *t = NULL;
782 RtlEnterCriticalSection(&q->cs);
783 if (list_head(&q->timers))
785 ULONGLONG now, next;
786 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
787 if (!t->destroy && t->expire <= ((now = queue_current_time())))
789 ++t->runcount;
790 if (t->period)
792 next = t->expire + t->period;
793 /* avoid trigger cascade if overloaded / hibernated */
794 if (next < now)
795 next = now + t->period;
797 else
798 next = EXPIRE_NEVER;
799 queue_move_timer(t, next, FALSE);
801 else
802 t = NULL;
804 RtlLeaveCriticalSection(&q->cs);
806 if (t)
808 if (t->flags & WT_EXECUTEINTIMERTHREAD)
809 timer_callback_wrapper(t);
810 else
812 ULONG flags
813 = (t->flags
814 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
815 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
816 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
817 if (status != STATUS_SUCCESS)
818 timer_cleanup_callback(t);
823 static ULONG queue_get_timeout(struct timer_queue *q)
825 struct queue_timer *t;
826 ULONG timeout = INFINITE;
828 RtlEnterCriticalSection(&q->cs);
829 if (list_head(&q->timers))
831 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
832 assert(!t->destroy || t->expire == EXPIRE_NEVER);
834 if (t->expire != EXPIRE_NEVER)
836 ULONGLONG time = queue_current_time();
837 timeout = t->expire < time ? 0 : t->expire - time;
840 RtlLeaveCriticalSection(&q->cs);
842 return timeout;
845 static void WINAPI timer_queue_thread_proc(LPVOID p)
847 struct timer_queue *q = p;
848 ULONG timeout_ms;
850 timeout_ms = INFINITE;
851 for (;;)
853 LARGE_INTEGER timeout;
854 NTSTATUS status;
855 BOOL done = FALSE;
857 status = NtWaitForSingleObject(
858 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
860 if (status == STATUS_WAIT_0)
862 /* There are two possible ways to trigger the event. Either
863 we are quitting and the last timer got removed, or a new
864 timer got put at the head of the list so we need to adjust
865 our timeout. */
866 RtlEnterCriticalSection(&q->cs);
867 if (q->quit && list_empty(&q->timers))
868 done = TRUE;
869 RtlLeaveCriticalSection(&q->cs);
871 else if (status == STATUS_TIMEOUT)
872 queue_timer_expire(q);
874 if (done)
875 break;
877 timeout_ms = queue_get_timeout(q);
880 NtClose(q->event);
881 RtlDeleteCriticalSection(&q->cs);
882 q->magic = 0;
883 RtlFreeHeap(GetProcessHeap(), 0, q);
884 RtlExitUserThread( 0 );
887 static void queue_destroy_timer(struct queue_timer *t)
889 /* We MUST hold the queue cs while calling this function. */
890 t->destroy = TRUE;
891 if (t->runcount == 0)
892 /* Ensure a timer is promptly removed. If callbacks are pending,
893 it will be removed after the last one finishes by the callback
894 cleanup wrapper. */
895 queue_remove_timer(t);
896 else
897 /* Make sure no destroyed timer masks an active timer at the head
898 of the sorted list. */
899 queue_move_timer(t, EXPIRE_NEVER, FALSE);
902 /***********************************************************************
903 * RtlCreateTimerQueue (NTDLL.@)
905 * Creates a timer queue object and returns a handle to it.
907 * PARAMS
908 * NewTimerQueue [O] The newly created queue.
910 * RETURNS
911 * Success: STATUS_SUCCESS.
912 * Failure: Any NTSTATUS code.
914 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
916 NTSTATUS status;
917 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
918 if (!q)
919 return STATUS_NO_MEMORY;
921 RtlInitializeCriticalSection(&q->cs);
922 list_init(&q->timers);
923 q->quit = FALSE;
924 q->magic = TIMER_QUEUE_MAGIC;
925 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
926 if (status != STATUS_SUCCESS)
928 RtlFreeHeap(GetProcessHeap(), 0, q);
929 return status;
931 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
932 timer_queue_thread_proc, q, &q->thread, NULL);
933 if (status != STATUS_SUCCESS)
935 NtClose(q->event);
936 RtlFreeHeap(GetProcessHeap(), 0, q);
937 return status;
940 *NewTimerQueue = q;
941 return STATUS_SUCCESS;
944 /***********************************************************************
945 * RtlDeleteTimerQueueEx (NTDLL.@)
947 * Deletes a timer queue object.
949 * PARAMS
950 * TimerQueue [I] The timer queue to destroy.
951 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
952 * wait until all timers are finished firing before
953 * returning. Otherwise, return immediately and set the
954 * event when all timers are done.
956 * RETURNS
957 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
958 * Failure: Any NTSTATUS code.
960 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
962 struct timer_queue *q = TimerQueue;
963 struct queue_timer *t, *temp;
964 HANDLE thread;
965 NTSTATUS status;
967 if (!q || q->magic != TIMER_QUEUE_MAGIC)
968 return STATUS_INVALID_HANDLE;
970 thread = q->thread;
972 RtlEnterCriticalSection(&q->cs);
973 q->quit = TRUE;
974 if (list_head(&q->timers))
975 /* When the last timer is removed, it will signal the timer thread to
976 exit... */
977 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
978 queue_destroy_timer(t);
979 else
980 /* However if we have none, we must do it ourselves. */
981 NtSetEvent(q->event, NULL);
982 RtlLeaveCriticalSection(&q->cs);
984 if (CompletionEvent == INVALID_HANDLE_VALUE)
986 NtWaitForSingleObject(thread, FALSE, NULL);
987 status = STATUS_SUCCESS;
989 else
991 if (CompletionEvent)
993 FIXME("asynchronous return on completion event unimplemented\n");
994 NtWaitForSingleObject(thread, FALSE, NULL);
995 NtSetEvent(CompletionEvent, NULL);
997 status = STATUS_PENDING;
1000 NtClose(thread);
1001 return status;
1004 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
1006 static struct timer_queue *default_timer_queue;
1008 if (TimerQueue)
1009 return TimerQueue;
1010 else
1012 if (!default_timer_queue)
1014 HANDLE q;
1015 NTSTATUS status = RtlCreateTimerQueue(&q);
1016 if (status == STATUS_SUCCESS)
1018 PVOID p = interlocked_cmpxchg_ptr(
1019 (void **) &default_timer_queue, q, NULL);
1020 if (p)
1021 /* Got beat to the punch. */
1022 RtlDeleteTimerQueueEx(q, NULL);
1025 return default_timer_queue;
1029 /***********************************************************************
1030 * RtlCreateTimer (NTDLL.@)
1032 * Creates a new timer associated with the given queue.
1034 * PARAMS
1035 * NewTimer [O] The newly created timer.
1036 * TimerQueue [I] The queue to hold the timer.
1037 * Callback [I] The callback to fire.
1038 * Parameter [I] The argument for the callback.
1039 * DueTime [I] The delay, in milliseconds, before first firing the
1040 * timer.
1041 * Period [I] The period, in milliseconds, at which to fire the timer
1042 * after the first callback. If zero, the timer will only
1043 * fire once. It still needs to be deleted with
1044 * RtlDeleteTimer.
1045 * Flags [I] Flags controlling the execution of the callback. In
1046 * addition to the WT_* thread pool flags (see
1047 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1048 * WT_EXECUTEONLYONCE are supported.
1050 * RETURNS
1051 * Success: STATUS_SUCCESS.
1052 * Failure: Any NTSTATUS code.
1054 NTSTATUS WINAPI RtlCreateTimer(PHANDLE NewTimer, HANDLE TimerQueue,
1055 RTL_WAITORTIMERCALLBACKFUNC Callback,
1056 PVOID Parameter, DWORD DueTime, DWORD Period,
1057 ULONG Flags)
1059 NTSTATUS status;
1060 struct queue_timer *t;
1061 struct timer_queue *q = get_timer_queue(TimerQueue);
1063 if (!q) return STATUS_NO_MEMORY;
1064 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1066 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1067 if (!t)
1068 return STATUS_NO_MEMORY;
1070 t->q = q;
1071 t->runcount = 0;
1072 t->callback = Callback;
1073 t->param = Parameter;
1074 t->period = Period;
1075 t->flags = Flags;
1076 t->destroy = FALSE;
1077 t->event = NULL;
1079 status = STATUS_SUCCESS;
1080 RtlEnterCriticalSection(&q->cs);
1081 if (q->quit)
1082 status = STATUS_INVALID_HANDLE;
1083 else
1084 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
1085 RtlLeaveCriticalSection(&q->cs);
1087 if (status == STATUS_SUCCESS)
1088 *NewTimer = t;
1089 else
1090 RtlFreeHeap(GetProcessHeap(), 0, t);
1092 return status;
1095 /***********************************************************************
1096 * RtlUpdateTimer (NTDLL.@)
1098 * Changes the time at which a timer expires.
1100 * PARAMS
1101 * TimerQueue [I] The queue that holds the timer.
1102 * Timer [I] The timer to update.
1103 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1104 * Period [I] The period, in milliseconds, at which to fire the timer
1105 * after the first callback. If zero, the timer will not
1106 * refire once. It still needs to be deleted with
1107 * RtlDeleteTimer.
1109 * RETURNS
1110 * Success: STATUS_SUCCESS.
1111 * Failure: Any NTSTATUS code.
1113 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
1114 DWORD DueTime, DWORD Period)
1116 struct queue_timer *t = Timer;
1117 struct timer_queue *q = t->q;
1119 RtlEnterCriticalSection(&q->cs);
1120 /* Can't change a timer if it was once-only or destroyed. */
1121 if (t->expire != EXPIRE_NEVER)
1123 t->period = Period;
1124 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
1126 RtlLeaveCriticalSection(&q->cs);
1128 return STATUS_SUCCESS;
1131 /***********************************************************************
1132 * RtlDeleteTimer (NTDLL.@)
1134 * Cancels a timer-queue timer.
1136 * PARAMS
1137 * TimerQueue [I] The queue that holds the timer.
1138 * Timer [I] The timer to update.
1139 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1140 * wait until the timer is finished firing all pending
1141 * callbacks before returning. Otherwise, return
1142 * immediately and set the timer is done.
1144 * RETURNS
1145 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1146 or if the completion event is NULL.
1147 * Failure: Any NTSTATUS code.
1149 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1150 HANDLE CompletionEvent)
1152 struct queue_timer *t = Timer;
1153 struct timer_queue *q;
1154 NTSTATUS status = STATUS_PENDING;
1155 HANDLE event = NULL;
1157 if (!Timer)
1158 return STATUS_INVALID_PARAMETER_1;
1159 q = t->q;
1160 if (CompletionEvent == INVALID_HANDLE_VALUE)
1162 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1163 if (status == STATUS_SUCCESS)
1164 status = STATUS_PENDING;
1166 else if (CompletionEvent)
1167 event = CompletionEvent;
1169 RtlEnterCriticalSection(&q->cs);
1170 t->event = event;
1171 if (t->runcount == 0 && event)
1172 status = STATUS_SUCCESS;
1173 queue_destroy_timer(t);
1174 RtlLeaveCriticalSection(&q->cs);
1176 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1178 if (status == STATUS_PENDING)
1180 NtWaitForSingleObject(event, FALSE, NULL);
1181 status = STATUS_SUCCESS;
1183 NtClose(event);
1186 return status;
1189 /***********************************************************************
1190 * timerqueue_thread_proc (internal)
1192 static void CALLBACK timerqueue_thread_proc( void *param )
1194 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1195 struct threadpool_object *other_timer;
1196 LARGE_INTEGER now, timeout;
1197 struct list *ptr;
1199 TRACE( "starting timer queue thread\n" );
1201 RtlEnterCriticalSection( &timerqueue.cs );
1202 for (;;)
1204 NtQuerySystemTime( &now );
1206 /* Check for expired timers. */
1207 while ((ptr = list_head( &timerqueue.pending_timers )))
1209 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1210 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1211 assert( timer->u.timer.timer_pending );
1212 if (timer->u.timer.timeout > now.QuadPart)
1213 break;
1215 /* Queue a new callback in one of the worker threads. */
1216 list_remove( &timer->u.timer.timer_entry );
1217 timer->u.timer.timer_pending = FALSE;
1218 tp_object_submit( timer, FALSE );
1220 /* Insert the timer back into the queue, except its marked for shutdown. */
1221 if (timer->u.timer.period && !timer->shutdown)
1223 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1224 if (timer->u.timer.timeout <= now.QuadPart)
1225 timer->u.timer.timeout = now.QuadPart + 1;
1227 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1228 struct threadpool_object, u.timer.timer_entry )
1230 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1231 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1232 break;
1234 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1235 timer->u.timer.timer_pending = TRUE;
1239 timeout_lower = TIMEOUT_INFINITE;
1240 timeout_upper = TIMEOUT_INFINITE;
1242 /* Determine next timeout and use the window length to optimize wakeup times. */
1243 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1244 struct threadpool_object, u.timer.timer_entry )
1246 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1247 if (other_timer->u.timer.timeout >= timeout_upper)
1248 break;
1250 timeout_lower = other_timer->u.timer.timeout;
1251 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1252 if (new_timeout < timeout_upper)
1253 timeout_upper = new_timeout;
1256 /* Wait for timer update events or until the next timer expires. */
1257 if (timerqueue.objcount)
1259 timeout.QuadPart = timeout_lower;
1260 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1261 continue;
1264 /* All timers have been destroyed, if no new timers are created
1265 * within some amount of time, then we can shutdown this thread. */
1266 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1267 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1268 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1270 break;
1274 timerqueue.thread_running = FALSE;
1275 RtlLeaveCriticalSection( &timerqueue.cs );
1277 TRACE( "terminating timer queue thread\n" );
1278 RtlExitUserThread( 0 );
1281 /***********************************************************************
1282 * tp_timerqueue_lock (internal)
1284 * Acquires a lock on the global timerqueue. When the lock is acquired
1285 * successfully, it is guaranteed that the timer thread is running.
1287 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1289 NTSTATUS status = STATUS_SUCCESS;
1290 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1292 timer->u.timer.timer_initialized = FALSE;
1293 timer->u.timer.timer_pending = FALSE;
1294 timer->u.timer.timer_set = FALSE;
1295 timer->u.timer.timeout = 0;
1296 timer->u.timer.period = 0;
1297 timer->u.timer.window_length = 0;
1299 RtlEnterCriticalSection( &timerqueue.cs );
1301 /* Make sure that the timerqueue thread is running. */
1302 if (!timerqueue.thread_running)
1304 HANDLE thread;
1305 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1306 timerqueue_thread_proc, NULL, &thread, NULL );
1307 if (status == STATUS_SUCCESS)
1309 timerqueue.thread_running = TRUE;
1310 NtClose( thread );
1314 if (status == STATUS_SUCCESS)
1316 timer->u.timer.timer_initialized = TRUE;
1317 timerqueue.objcount++;
1320 RtlLeaveCriticalSection( &timerqueue.cs );
1321 return status;
1324 /***********************************************************************
1325 * tp_timerqueue_unlock (internal)
1327 * Releases a lock on the global timerqueue.
1329 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1331 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1333 RtlEnterCriticalSection( &timerqueue.cs );
1334 if (timer->u.timer.timer_initialized)
1336 /* If timer was pending, remove it. */
1337 if (timer->u.timer.timer_pending)
1339 list_remove( &timer->u.timer.timer_entry );
1340 timer->u.timer.timer_pending = FALSE;
1343 /* If the last timer object was destroyed, then wake up the thread. */
1344 if (!--timerqueue.objcount)
1346 assert( list_empty( &timerqueue.pending_timers ) );
1347 RtlWakeAllConditionVariable( &timerqueue.update_event );
1350 timer->u.timer.timer_initialized = FALSE;
1352 RtlLeaveCriticalSection( &timerqueue.cs );
1355 /***********************************************************************
1356 * waitqueue_thread_proc (internal)
1358 static void CALLBACK waitqueue_thread_proc( void *param )
1360 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1361 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1362 struct waitqueue_bucket *bucket = param;
1363 struct threadpool_object *wait, *next;
1364 LARGE_INTEGER now, timeout;
1365 DWORD num_handles;
1366 NTSTATUS status;
1368 TRACE( "starting wait queue thread\n" );
1370 RtlEnterCriticalSection( &waitqueue.cs );
1372 for (;;)
1374 NtQuerySystemTime( &now );
1375 timeout.QuadPart = TIMEOUT_INFINITE;
1376 num_handles = 0;
1378 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1379 u.wait.wait_entry )
1381 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1382 if (wait->u.wait.timeout <= now.QuadPart)
1384 /* Wait object timed out. */
1385 list_remove( &wait->u.wait.wait_entry );
1386 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1387 tp_object_submit( wait, FALSE );
1389 else
1391 if (wait->u.wait.timeout < timeout.QuadPart)
1392 timeout.QuadPart = wait->u.wait.timeout;
1394 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1395 interlocked_inc( &wait->refcount );
1396 objects[num_handles] = wait;
1397 handles[num_handles] = wait->u.wait.handle;
1398 num_handles++;
1402 if (!bucket->objcount)
1404 /* All wait objects have been destroyed, if no new wait objects are created
1405 * within some amount of time, then we can shutdown this thread. */
1406 assert( num_handles == 0 );
1407 RtlLeaveCriticalSection( &waitqueue.cs );
1408 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1409 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout );
1410 RtlEnterCriticalSection( &waitqueue.cs );
1412 if (status == STATUS_TIMEOUT && !bucket->objcount)
1413 break;
1415 else
1417 handles[num_handles] = bucket->update_event;
1418 RtlLeaveCriticalSection( &waitqueue.cs );
1419 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout );
1420 RtlEnterCriticalSection( &waitqueue.cs );
1422 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1424 wait = objects[status - STATUS_WAIT_0];
1425 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1426 if (wait->u.wait.bucket)
1428 /* Wait object signaled. */
1429 assert( wait->u.wait.bucket == bucket );
1430 list_remove( &wait->u.wait.wait_entry );
1431 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1432 tp_object_submit( wait, TRUE );
1434 else
1435 ERR("wait object %p triggered while object was destroyed\n", wait);
1438 /* Release temporary references to wait objects. */
1439 while (num_handles)
1441 wait = objects[--num_handles];
1442 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1443 tp_object_release( wait );
1447 /* Try to merge bucket with other threads. */
1448 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1449 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1451 struct waitqueue_bucket *other_bucket;
1452 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1454 if (other_bucket != bucket && other_bucket->objcount &&
1455 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1457 other_bucket->objcount += bucket->objcount;
1458 bucket->objcount = 0;
1460 /* Update reserved list. */
1461 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1463 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1464 wait->u.wait.bucket = other_bucket;
1466 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1468 /* Update waiting list. */
1469 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1471 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1472 wait->u.wait.bucket = other_bucket;
1474 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1476 /* Move bucket to the end, to keep the probability of
1477 * newly added wait objects as small as possible. */
1478 list_remove( &bucket->bucket_entry );
1479 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1481 NtSetEvent( other_bucket->update_event, NULL );
1482 break;
1488 /* Remove this bucket from the list. */
1489 list_remove( &bucket->bucket_entry );
1490 if (!--waitqueue.num_buckets)
1491 assert( list_empty( &waitqueue.buckets ) );
1493 RtlLeaveCriticalSection( &waitqueue.cs );
1495 TRACE( "terminating wait queue thread\n" );
1497 assert( bucket->objcount == 0 );
1498 assert( list_empty( &bucket->reserved ) );
1499 assert( list_empty( &bucket->waiting ) );
1500 NtClose( bucket->update_event );
1502 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1503 RtlExitUserThread( 0 );
1506 /***********************************************************************
1507 * tp_waitqueue_lock (internal)
1509 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1511 struct waitqueue_bucket *bucket;
1512 NTSTATUS status;
1513 HANDLE thread;
1514 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1516 wait->u.wait.signaled = 0;
1517 wait->u.wait.bucket = NULL;
1518 wait->u.wait.wait_pending = FALSE;
1519 wait->u.wait.timeout = 0;
1520 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1522 RtlEnterCriticalSection( &waitqueue.cs );
1524 /* Try to assign to existing bucket if possible. */
1525 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1527 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS)
1529 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1530 wait->u.wait.bucket = bucket;
1531 bucket->objcount++;
1533 status = STATUS_SUCCESS;
1534 goto out;
1538 /* Create a new bucket and corresponding worker thread. */
1539 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1540 if (!bucket)
1542 status = STATUS_NO_MEMORY;
1543 goto out;
1546 bucket->objcount = 0;
1547 list_init( &bucket->reserved );
1548 list_init( &bucket->waiting );
1550 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1551 NULL, SynchronizationEvent, FALSE );
1552 if (status)
1554 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1555 goto out;
1558 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1559 waitqueue_thread_proc, bucket, &thread, NULL );
1560 if (status == STATUS_SUCCESS)
1562 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1563 waitqueue.num_buckets++;
1565 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1566 wait->u.wait.bucket = bucket;
1567 bucket->objcount++;
1569 NtClose( thread );
1571 else
1573 NtClose( bucket->update_event );
1574 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1577 out:
1578 RtlLeaveCriticalSection( &waitqueue.cs );
1579 return status;
1582 /***********************************************************************
1583 * tp_waitqueue_unlock (internal)
1585 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1587 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1589 RtlEnterCriticalSection( &waitqueue.cs );
1590 if (wait->u.wait.bucket)
1592 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1593 assert( bucket->objcount > 0 );
1595 list_remove( &wait->u.wait.wait_entry );
1596 wait->u.wait.bucket = NULL;
1597 bucket->objcount--;
1599 NtSetEvent( bucket->update_event, NULL );
1601 RtlLeaveCriticalSection( &waitqueue.cs );
1604 /***********************************************************************
1605 * tp_threadpool_alloc (internal)
1607 * Allocates a new threadpool object.
1609 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1611 struct threadpool *pool;
1613 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1614 if (!pool)
1615 return STATUS_NO_MEMORY;
1617 pool->refcount = 1;
1618 pool->objcount = 0;
1619 pool->shutdown = FALSE;
1621 RtlInitializeCriticalSection( &pool->cs );
1622 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1624 list_init( &pool->pool );
1625 RtlInitializeConditionVariable( &pool->update_event );
1627 pool->max_workers = 500;
1628 pool->min_workers = 0;
1629 pool->num_workers = 0;
1630 pool->num_busy_workers = 0;
1632 TRACE( "allocated threadpool %p\n", pool );
1634 *out = pool;
1635 return STATUS_SUCCESS;
1638 /***********************************************************************
1639 * tp_threadpool_shutdown (internal)
1641 * Prepares the shutdown of a threadpool object and notifies all worker
1642 * threads to terminate (after all remaining work items have been
1643 * processed).
1645 static void tp_threadpool_shutdown( struct threadpool *pool )
1647 assert( pool != default_threadpool );
1649 pool->shutdown = TRUE;
1650 RtlWakeAllConditionVariable( &pool->update_event );
1653 /***********************************************************************
1654 * tp_threadpool_release (internal)
1656 * Releases a reference to a threadpool object.
1658 static BOOL tp_threadpool_release( struct threadpool *pool )
1660 if (interlocked_dec( &pool->refcount ))
1661 return FALSE;
1663 TRACE( "destroying threadpool %p\n", pool );
1665 assert( pool->shutdown );
1666 assert( !pool->objcount );
1667 assert( list_empty( &pool->pool ) );
1669 pool->cs.DebugInfo->Spare[0] = 0;
1670 RtlDeleteCriticalSection( &pool->cs );
1672 RtlFreeHeap( GetProcessHeap(), 0, pool );
1673 return TRUE;
1676 /***********************************************************************
1677 * tp_threadpool_lock (internal)
1679 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1680 * block. When the lock is acquired successfully, it is guaranteed that
1681 * there is at least one worker thread to process tasks.
1683 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1685 struct threadpool *pool = NULL;
1686 NTSTATUS status = STATUS_SUCCESS;
1688 if (environment)
1689 pool = (struct threadpool *)environment->Pool;
1691 if (!pool)
1693 if (!default_threadpool)
1695 status = tp_threadpool_alloc( &pool );
1696 if (status != STATUS_SUCCESS)
1697 return status;
1699 if (interlocked_cmpxchg_ptr( (void *)&default_threadpool, pool, NULL ) != NULL)
1701 tp_threadpool_shutdown( pool );
1702 tp_threadpool_release( pool );
1706 pool = default_threadpool;
1709 RtlEnterCriticalSection( &pool->cs );
1711 /* Make sure that the threadpool has at least one thread. */
1712 if (!pool->num_workers)
1714 HANDLE thread;
1715 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1716 threadpool_worker_proc, pool, &thread, NULL );
1717 if (status == STATUS_SUCCESS)
1719 interlocked_inc( &pool->refcount );
1720 pool->num_workers++;
1721 pool->num_busy_workers++;
1722 NtClose( thread );
1726 /* Keep a reference, and increment objcount to ensure that the
1727 * last thread doesn't terminate. */
1728 if (status == STATUS_SUCCESS)
1730 interlocked_inc( &pool->refcount );
1731 pool->objcount++;
1734 RtlLeaveCriticalSection( &pool->cs );
1736 if (status != STATUS_SUCCESS)
1737 return status;
1739 *out = pool;
1740 return STATUS_SUCCESS;
1743 /***********************************************************************
1744 * tp_threadpool_unlock (internal)
1746 * Releases a lock on a threadpool.
1748 static void tp_threadpool_unlock( struct threadpool *pool )
1750 RtlEnterCriticalSection( &pool->cs );
1751 pool->objcount--;
1752 RtlLeaveCriticalSection( &pool->cs );
1753 tp_threadpool_release( pool );
1756 /***********************************************************************
1757 * tp_group_alloc (internal)
1759 * Allocates a new threadpool group object.
1761 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1763 struct threadpool_group *group;
1765 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1766 if (!group)
1767 return STATUS_NO_MEMORY;
1769 group->refcount = 1;
1770 group->shutdown = FALSE;
1772 RtlInitializeCriticalSection( &group->cs );
1773 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1775 list_init( &group->members );
1777 TRACE( "allocated group %p\n", group );
1779 *out = group;
1780 return STATUS_SUCCESS;
1783 /***********************************************************************
1784 * tp_group_shutdown (internal)
1786 * Marks the group object for shutdown.
1788 static void tp_group_shutdown( struct threadpool_group *group )
1790 group->shutdown = TRUE;
1793 /***********************************************************************
1794 * tp_group_release (internal)
1796 * Releases a reference to a group object.
1798 static BOOL tp_group_release( struct threadpool_group *group )
1800 if (interlocked_dec( &group->refcount ))
1801 return FALSE;
1803 TRACE( "destroying group %p\n", group );
1805 assert( group->shutdown );
1806 assert( list_empty( &group->members ) );
1808 group->cs.DebugInfo->Spare[0] = 0;
1809 RtlDeleteCriticalSection( &group->cs );
1811 RtlFreeHeap( GetProcessHeap(), 0, group );
1812 return TRUE;
1815 /***********************************************************************
1816 * tp_object_initialize (internal)
1818 * Initializes members of a threadpool object.
1820 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
1821 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
1823 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
1825 object->refcount = 1;
1826 object->shutdown = FALSE;
1828 object->pool = pool;
1829 object->group = NULL;
1830 object->userdata = userdata;
1831 object->group_cancel_callback = NULL;
1832 object->finalization_callback = NULL;
1833 object->may_run_long = 0;
1834 object->race_dll = NULL;
1836 memset( &object->group_entry, 0, sizeof(object->group_entry) );
1837 object->is_group_member = FALSE;
1839 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
1840 RtlInitializeConditionVariable( &object->finished_event );
1841 RtlInitializeConditionVariable( &object->group_finished_event );
1842 object->num_pending_callbacks = 0;
1843 object->num_running_callbacks = 0;
1844 object->num_associated_callbacks = 0;
1846 if (environment)
1848 if (environment->Version != 1)
1849 FIXME( "unsupported environment version %u\n", environment->Version );
1851 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
1852 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
1853 object->finalization_callback = environment->FinalizationCallback;
1854 object->may_run_long = environment->u.s.LongFunction != 0;
1855 object->race_dll = environment->RaceDll;
1857 if (environment->ActivationContext)
1858 FIXME( "activation context not supported yet\n" );
1860 if (environment->u.s.Persistent)
1861 FIXME( "persistent threads not supported yet\n" );
1864 if (object->race_dll)
1865 LdrAddRefDll( 0, object->race_dll );
1867 TRACE( "allocated object %p of type %u\n", object, object->type );
1869 /* For simple callbacks we have to run tp_object_submit before adding this object
1870 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1871 * will be set, and tp_object_submit would fail with an assertion. */
1873 if (is_simple_callback)
1874 tp_object_submit( object, FALSE );
1876 if (object->group)
1878 struct threadpool_group *group = object->group;
1879 interlocked_inc( &group->refcount );
1881 RtlEnterCriticalSection( &group->cs );
1882 list_add_tail( &group->members, &object->group_entry );
1883 object->is_group_member = TRUE;
1884 RtlLeaveCriticalSection( &group->cs );
1887 if (is_simple_callback)
1889 tp_object_shutdown( object );
1890 tp_object_release( object );
1894 /***********************************************************************
1895 * tp_object_submit (internal)
1897 * Submits a threadpool object to the associcated threadpool. This
1898 * function has to be VOID because TpPostWork can never fail on Windows.
1900 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
1902 struct threadpool *pool = object->pool;
1903 NTSTATUS status = STATUS_UNSUCCESSFUL;
1905 assert( !object->shutdown );
1906 assert( !pool->shutdown );
1908 RtlEnterCriticalSection( &pool->cs );
1910 /* Start new worker threads if required. */
1911 if (pool->num_busy_workers >= pool->num_workers &&
1912 pool->num_workers < pool->max_workers)
1914 HANDLE thread;
1915 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1916 threadpool_worker_proc, pool, &thread, NULL );
1917 if (status == STATUS_SUCCESS)
1919 interlocked_inc( &pool->refcount );
1920 pool->num_workers++;
1921 pool->num_busy_workers++;
1922 NtClose( thread );
1926 /* Queue work item and increment refcount. */
1927 interlocked_inc( &object->refcount );
1928 if (!object->num_pending_callbacks++)
1929 list_add_tail( &pool->pool, &object->pool_entry );
1931 /* Count how often the object was signaled. */
1932 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
1933 object->u.wait.signaled++;
1935 /* No new thread started - wake up one existing thread. */
1936 if (status != STATUS_SUCCESS)
1938 assert( pool->num_workers > 0 );
1939 RtlWakeConditionVariable( &pool->update_event );
1942 RtlLeaveCriticalSection( &pool->cs );
1945 /***********************************************************************
1946 * tp_object_cancel (internal)
1948 * Cancels all currently pending callbacks for a specific object.
1950 static void tp_object_cancel( struct threadpool_object *object, BOOL group_cancel, PVOID userdata )
1952 struct threadpool *pool = object->pool;
1953 LONG pending_callbacks = 0;
1955 RtlEnterCriticalSection( &pool->cs );
1956 if (object->num_pending_callbacks)
1958 pending_callbacks = object->num_pending_callbacks;
1959 object->num_pending_callbacks = 0;
1960 list_remove( &object->pool_entry );
1962 if (object->type == TP_OBJECT_TYPE_WAIT)
1963 object->u.wait.signaled = 0;
1965 RtlLeaveCriticalSection( &pool->cs );
1967 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
1968 if (pending_callbacks && group_cancel && object->group_cancel_callback)
1970 TRACE( "executing group cancel callback %p(%p, %p)\n", object->group_cancel_callback, object, userdata );
1971 object->group_cancel_callback( object, userdata );
1972 TRACE( "callback %p returned\n", object->group_cancel_callback );
1975 while (pending_callbacks--)
1976 tp_object_release( object );
1979 /***********************************************************************
1980 * tp_object_wait (internal)
1982 * Waits until all pending and running callbacks of a specific object
1983 * have been processed.
1985 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
1987 struct threadpool *pool = object->pool;
1989 RtlEnterCriticalSection( &pool->cs );
1990 if (group_wait)
1992 while (object->num_pending_callbacks || object->num_running_callbacks)
1993 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
1995 else
1997 while (object->num_pending_callbacks || object->num_associated_callbacks)
1998 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2000 RtlLeaveCriticalSection( &pool->cs );
2003 /***********************************************************************
2004 * tp_object_shutdown (internal)
2006 * Marks a threadpool object for shutdown (which means that no further
2007 * tasks can be submitted).
2009 static void tp_object_shutdown( struct threadpool_object *object )
2011 if (object->type == TP_OBJECT_TYPE_TIMER)
2012 tp_timerqueue_unlock( object );
2013 else if (object->type == TP_OBJECT_TYPE_WAIT)
2014 tp_waitqueue_unlock( object );
2016 object->shutdown = TRUE;
2019 /***********************************************************************
2020 * tp_object_release (internal)
2022 * Releases a reference to a threadpool object.
2024 static BOOL tp_object_release( struct threadpool_object *object )
2026 if (interlocked_dec( &object->refcount ))
2027 return FALSE;
2029 TRACE( "destroying object %p of type %u\n", object, object->type );
2031 assert( object->shutdown );
2032 assert( !object->num_pending_callbacks );
2033 assert( !object->num_running_callbacks );
2034 assert( !object->num_associated_callbacks );
2036 /* release reference to the group */
2037 if (object->group)
2039 struct threadpool_group *group = object->group;
2041 RtlEnterCriticalSection( &group->cs );
2042 if (object->is_group_member)
2044 list_remove( &object->group_entry );
2045 object->is_group_member = FALSE;
2047 RtlLeaveCriticalSection( &group->cs );
2049 tp_group_release( group );
2052 tp_threadpool_unlock( object->pool );
2054 if (object->race_dll)
2055 LdrUnloadDll( object->race_dll );
2057 RtlFreeHeap( GetProcessHeap(), 0, object );
2058 return TRUE;
2061 /***********************************************************************
2062 * threadpool_worker_proc (internal)
2064 static void CALLBACK threadpool_worker_proc( void *param )
2066 TP_CALLBACK_INSTANCE *callback_instance;
2067 struct threadpool_instance instance;
2068 struct threadpool *pool = param;
2069 TP_WAIT_RESULT wait_result = 0;
2070 LARGE_INTEGER timeout;
2071 struct list *ptr;
2072 NTSTATUS status;
2074 TRACE( "starting worker thread for pool %p\n", pool );
2076 RtlEnterCriticalSection( &pool->cs );
2077 pool->num_busy_workers--;
2078 for (;;)
2080 while ((ptr = list_head( &pool->pool )))
2082 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2083 assert( object->num_pending_callbacks > 0 );
2085 /* If further pending callbacks are queued, move the work item to
2086 * the end of the pool list. Otherwise remove it from the pool. */
2087 list_remove( &object->pool_entry );
2088 if (--object->num_pending_callbacks)
2089 list_add_tail( &pool->pool, &object->pool_entry );
2091 /* For wait objects check if they were signaled or have timed out. */
2092 if (object->type == TP_OBJECT_TYPE_WAIT)
2094 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2095 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2098 /* Leave critical section and do the actual callback. */
2099 object->num_associated_callbacks++;
2100 object->num_running_callbacks++;
2101 pool->num_busy_workers++;
2102 RtlLeaveCriticalSection( &pool->cs );
2104 /* Initialize threadpool instance struct. */
2105 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2106 instance.object = object;
2107 instance.threadid = GetCurrentThreadId();
2108 instance.associated = TRUE;
2109 instance.may_run_long = object->may_run_long;
2110 instance.cleanup.critical_section = NULL;
2111 instance.cleanup.mutex = NULL;
2112 instance.cleanup.semaphore = NULL;
2113 instance.cleanup.semaphore_count = 0;
2114 instance.cleanup.event = NULL;
2115 instance.cleanup.library = NULL;
2117 switch (object->type)
2119 case TP_OBJECT_TYPE_SIMPLE:
2121 TRACE( "executing simple callback %p(%p, %p)\n",
2122 object->u.simple.callback, callback_instance, object->userdata );
2123 object->u.simple.callback( callback_instance, object->userdata );
2124 TRACE( "callback %p returned\n", object->u.simple.callback );
2125 break;
2128 case TP_OBJECT_TYPE_WORK:
2130 TRACE( "executing work callback %p(%p, %p, %p)\n",
2131 object->u.work.callback, callback_instance, object->userdata, object );
2132 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2133 TRACE( "callback %p returned\n", object->u.work.callback );
2134 break;
2137 case TP_OBJECT_TYPE_TIMER:
2139 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2140 object->u.timer.callback, callback_instance, object->userdata, object );
2141 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2142 TRACE( "callback %p returned\n", object->u.timer.callback );
2143 break;
2146 case TP_OBJECT_TYPE_WAIT:
2148 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2149 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2150 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2151 TRACE( "callback %p returned\n", object->u.wait.callback );
2152 break;
2155 default:
2156 assert(0);
2157 break;
2160 /* Execute finalization callback. */
2161 if (object->finalization_callback)
2163 TRACE( "executing finalization callback %p(%p, %p)\n",
2164 object->finalization_callback, callback_instance, object->userdata );
2165 object->finalization_callback( callback_instance, object->userdata );
2166 TRACE( "callback %p returned\n", object->finalization_callback );
2169 /* Execute cleanup tasks. */
2170 if (instance.cleanup.critical_section)
2172 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2174 if (instance.cleanup.mutex)
2176 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2177 if (status != STATUS_SUCCESS) goto skip_cleanup;
2179 if (instance.cleanup.semaphore)
2181 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2182 if (status != STATUS_SUCCESS) goto skip_cleanup;
2184 if (instance.cleanup.event)
2186 status = NtSetEvent( instance.cleanup.event, NULL );
2187 if (status != STATUS_SUCCESS) goto skip_cleanup;
2189 if (instance.cleanup.library)
2191 LdrUnloadDll( instance.cleanup.library );
2194 skip_cleanup:
2195 RtlEnterCriticalSection( &pool->cs );
2196 pool->num_busy_workers--;
2198 object->num_running_callbacks--;
2199 if (!object->num_pending_callbacks && !object->num_running_callbacks)
2200 RtlWakeAllConditionVariable( &object->group_finished_event );
2202 if (instance.associated)
2204 object->num_associated_callbacks--;
2205 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2206 RtlWakeAllConditionVariable( &object->finished_event );
2209 tp_object_release( object );
2212 /* Shutdown worker thread if requested. */
2213 if (pool->shutdown)
2214 break;
2216 /* Wait for new tasks or until the timeout expires. A thread only terminates
2217 * when no new tasks are available, and the number of threads can be
2218 * decreased without violating the min_workers limit. An exception is when
2219 * min_workers == 0, then objcount is used to detect if the last thread
2220 * can be terminated. */
2221 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2222 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2223 !list_head( &pool->pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2224 (!pool->min_workers && !pool->objcount)))
2226 break;
2229 pool->num_workers--;
2230 RtlLeaveCriticalSection( &pool->cs );
2232 TRACE( "terminating worker thread for pool %p\n", pool );
2233 tp_threadpool_release( pool );
2234 RtlExitUserThread( 0 );
2237 /***********************************************************************
2238 * TpAllocCleanupGroup (NTDLL.@)
2240 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2242 TRACE( "%p\n", out );
2244 return tp_group_alloc( (struct threadpool_group **)out );
2247 /***********************************************************************
2248 * TpAllocPool (NTDLL.@)
2250 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2252 TRACE( "%p %p\n", out, reserved );
2254 if (reserved)
2255 FIXME( "reserved argument is nonzero (%p)", reserved );
2257 return tp_threadpool_alloc( (struct threadpool **)out );
2260 /***********************************************************************
2261 * TpAllocTimer (NTDLL.@)
2263 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2264 TP_CALLBACK_ENVIRON *environment )
2266 struct threadpool_object *object;
2267 struct threadpool *pool;
2268 NTSTATUS status;
2270 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2272 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2273 if (!object)
2274 return STATUS_NO_MEMORY;
2276 status = tp_threadpool_lock( &pool, environment );
2277 if (status)
2279 RtlFreeHeap( GetProcessHeap(), 0, object );
2280 return status;
2283 object->type = TP_OBJECT_TYPE_TIMER;
2284 object->u.timer.callback = callback;
2286 status = tp_timerqueue_lock( object );
2287 if (status)
2289 tp_threadpool_unlock( pool );
2290 RtlFreeHeap( GetProcessHeap(), 0, object );
2291 return status;
2294 tp_object_initialize( object, pool, userdata, environment );
2296 *out = (TP_TIMER *)object;
2297 return STATUS_SUCCESS;
2300 /***********************************************************************
2301 * TpAllocWait (NTDLL.@)
2303 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2304 TP_CALLBACK_ENVIRON *environment )
2306 struct threadpool_object *object;
2307 struct threadpool *pool;
2308 NTSTATUS status;
2310 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2312 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2313 if (!object)
2314 return STATUS_NO_MEMORY;
2316 status = tp_threadpool_lock( &pool, environment );
2317 if (status)
2319 RtlFreeHeap( GetProcessHeap(), 0, object );
2320 return status;
2323 object->type = TP_OBJECT_TYPE_WAIT;
2324 object->u.wait.callback = callback;
2326 status = tp_waitqueue_lock( object );
2327 if (status)
2329 tp_threadpool_unlock( pool );
2330 RtlFreeHeap( GetProcessHeap(), 0, object );
2331 return status;
2334 tp_object_initialize( object, pool, userdata, environment );
2336 *out = (TP_WAIT *)object;
2337 return STATUS_SUCCESS;
2340 /***********************************************************************
2341 * TpAllocWork (NTDLL.@)
2343 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2344 TP_CALLBACK_ENVIRON *environment )
2346 struct threadpool_object *object;
2347 struct threadpool *pool;
2348 NTSTATUS status;
2350 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2352 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2353 if (!object)
2354 return STATUS_NO_MEMORY;
2356 status = tp_threadpool_lock( &pool, environment );
2357 if (status)
2359 RtlFreeHeap( GetProcessHeap(), 0, object );
2360 return status;
2363 object->type = TP_OBJECT_TYPE_WORK;
2364 object->u.work.callback = callback;
2365 tp_object_initialize( object, pool, userdata, environment );
2367 *out = (TP_WORK *)object;
2368 return STATUS_SUCCESS;
2371 /***********************************************************************
2372 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2374 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2376 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2378 TRACE( "%p %p\n", instance, crit );
2380 if (!this->cleanup.critical_section)
2381 this->cleanup.critical_section = crit;
2384 /***********************************************************************
2385 * TpCallbackMayRunLong (NTDLL.@)
2387 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2389 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2390 struct threadpool_object *object = this->object;
2391 struct threadpool *pool;
2392 NTSTATUS status = STATUS_SUCCESS;
2394 TRACE( "%p\n", instance );
2396 if (this->threadid != GetCurrentThreadId())
2398 ERR("called from wrong thread, ignoring\n");
2399 return STATUS_UNSUCCESSFUL; /* FIXME */
2402 if (this->may_run_long)
2403 return STATUS_SUCCESS;
2405 pool = object->pool;
2406 RtlEnterCriticalSection( &pool->cs );
2408 /* Start new worker threads if required. */
2409 if (pool->num_busy_workers >= pool->num_workers)
2411 if (pool->num_workers < pool->max_workers)
2413 HANDLE thread;
2414 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
2415 threadpool_worker_proc, pool, &thread, NULL );
2416 if (status == STATUS_SUCCESS)
2418 interlocked_inc( &pool->refcount );
2419 pool->num_workers++;
2420 pool->num_busy_workers++;
2421 NtClose( thread );
2424 else
2426 status = STATUS_TOO_MANY_THREADS;
2430 RtlLeaveCriticalSection( &pool->cs );
2431 this->may_run_long = TRUE;
2432 return status;
2435 /***********************************************************************
2436 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2438 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2440 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2442 TRACE( "%p %p\n", instance, mutex );
2444 if (!this->cleanup.mutex)
2445 this->cleanup.mutex = mutex;
2448 /***********************************************************************
2449 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2451 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2453 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2455 TRACE( "%p %p %u\n", instance, semaphore, count );
2457 if (!this->cleanup.semaphore)
2459 this->cleanup.semaphore = semaphore;
2460 this->cleanup.semaphore_count = count;
2464 /***********************************************************************
2465 * TpCallbackSetEventOnCompletion (NTDLL.@)
2467 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2469 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2471 TRACE( "%p %p\n", instance, event );
2473 if (!this->cleanup.event)
2474 this->cleanup.event = event;
2477 /***********************************************************************
2478 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2480 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2482 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2484 TRACE( "%p %p\n", instance, module );
2486 if (!this->cleanup.library)
2487 this->cleanup.library = module;
2490 /***********************************************************************
2491 * TpDisassociateCallback (NTDLL.@)
2493 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2495 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2496 struct threadpool_object *object = this->object;
2497 struct threadpool *pool;
2499 TRACE( "%p\n", instance );
2501 if (this->threadid != GetCurrentThreadId())
2503 ERR("called from wrong thread, ignoring\n");
2504 return;
2507 if (!this->associated)
2508 return;
2510 pool = object->pool;
2511 RtlEnterCriticalSection( &pool->cs );
2513 object->num_associated_callbacks--;
2514 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2515 RtlWakeAllConditionVariable( &object->finished_event );
2517 RtlLeaveCriticalSection( &pool->cs );
2518 this->associated = FALSE;
2521 /***********************************************************************
2522 * TpIsTimerSet (NTDLL.@)
2524 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2526 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2528 TRACE( "%p\n", timer );
2530 return this->u.timer.timer_set;
2533 /***********************************************************************
2534 * TpPostWork (NTDLL.@)
2536 VOID WINAPI TpPostWork( TP_WORK *work )
2538 struct threadpool_object *this = impl_from_TP_WORK( work );
2540 TRACE( "%p\n", work );
2542 tp_object_submit( this, FALSE );
2545 /***********************************************************************
2546 * TpReleaseCleanupGroup (NTDLL.@)
2548 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2550 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2552 TRACE( "%p\n", group );
2554 tp_group_shutdown( this );
2555 tp_group_release( this );
2558 /***********************************************************************
2559 * TpReleaseCleanupGroupMembers (NTDLL.@)
2561 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2563 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2564 struct threadpool_object *object, *next;
2565 struct list members;
2567 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2569 RtlEnterCriticalSection( &this->cs );
2571 /* Unset group, increase references, and mark objects for shutdown */
2572 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2574 assert( object->group == this );
2575 assert( object->is_group_member );
2577 /* Simple callbacks are very special. The user doesn't hold any reference, so
2578 * they would be released too early. Add one additional temporary reference. */
2579 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2581 if (interlocked_inc( &object->refcount ) == 1)
2583 /* Object is basically already destroyed, but group reference
2584 * was not deleted yet. We can safely ignore this object. */
2585 interlocked_dec( &object->refcount );
2586 list_remove( &object->group_entry );
2587 object->is_group_member = FALSE;
2588 continue;
2592 object->is_group_member = FALSE;
2593 tp_object_shutdown( object );
2596 /* Move members to a new temporary list */
2597 list_init( &members );
2598 list_move_tail( &members, &this->members );
2600 RtlLeaveCriticalSection( &this->cs );
2602 /* Cancel pending callbacks if requested */
2603 if (cancel_pending)
2605 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2607 tp_object_cancel( object, TRUE, userdata );
2611 /* Wait for remaining callbacks to finish */
2612 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2614 tp_object_wait( object, TRUE );
2615 tp_object_release( object );
2619 /***********************************************************************
2620 * TpReleasePool (NTDLL.@)
2622 VOID WINAPI TpReleasePool( TP_POOL *pool )
2624 struct threadpool *this = impl_from_TP_POOL( pool );
2626 TRACE( "%p\n", pool );
2628 tp_threadpool_shutdown( this );
2629 tp_threadpool_release( this );
2632 /***********************************************************************
2633 * TpReleaseTimer (NTDLL.@)
2635 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2637 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2639 TRACE( "%p\n", timer );
2641 tp_object_shutdown( this );
2642 tp_object_release( this );
2645 /***********************************************************************
2646 * TpReleaseWait (NTDLL.@)
2648 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
2650 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2652 TRACE( "%p\n", wait );
2654 tp_object_shutdown( this );
2655 tp_object_release( this );
2658 /***********************************************************************
2659 * TpReleaseWork (NTDLL.@)
2661 VOID WINAPI TpReleaseWork( TP_WORK *work )
2663 struct threadpool_object *this = impl_from_TP_WORK( work );
2665 TRACE( "%p\n", work );
2667 tp_object_shutdown( this );
2668 tp_object_release( this );
2671 /***********************************************************************
2672 * TpSetPoolMaxThreads (NTDLL.@)
2674 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
2676 struct threadpool *this = impl_from_TP_POOL( pool );
2678 TRACE( "%p %u\n", pool, maximum );
2680 RtlEnterCriticalSection( &this->cs );
2681 this->max_workers = max( maximum, 1 );
2682 this->min_workers = min( this->min_workers, this->max_workers );
2683 RtlLeaveCriticalSection( &this->cs );
2686 /***********************************************************************
2687 * TpSetPoolMinThreads (NTDLL.@)
2689 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
2691 struct threadpool *this = impl_from_TP_POOL( pool );
2692 NTSTATUS status = STATUS_SUCCESS;
2694 TRACE( "%p %u\n", pool, minimum );
2696 RtlEnterCriticalSection( &this->cs );
2698 while (this->num_workers < minimum)
2700 HANDLE thread;
2701 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
2702 threadpool_worker_proc, this, &thread, NULL );
2703 if (status != STATUS_SUCCESS)
2704 break;
2706 interlocked_inc( &this->refcount );
2707 this->num_workers++;
2708 this->num_busy_workers++;
2709 NtClose( thread );
2712 if (status == STATUS_SUCCESS)
2714 this->min_workers = minimum;
2715 this->max_workers = max( this->min_workers, this->max_workers );
2718 RtlLeaveCriticalSection( &this->cs );
2719 return !status;
2722 /***********************************************************************
2723 * TpSetTimer (NTDLL.@)
2725 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
2727 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2728 struct threadpool_object *other_timer;
2729 BOOL submit_timer = FALSE;
2730 ULONGLONG timestamp;
2732 TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
2734 RtlEnterCriticalSection( &timerqueue.cs );
2736 assert( this->u.timer.timer_initialized );
2737 this->u.timer.timer_set = timeout != NULL;
2739 /* Convert relative timeout to absolute timestamp and handle a timeout
2740 * of zero, which means that the timer is submitted immediately. */
2741 if (timeout)
2743 timestamp = timeout->QuadPart;
2744 if ((LONGLONG)timestamp < 0)
2746 LARGE_INTEGER now;
2747 NtQuerySystemTime( &now );
2748 timestamp = now.QuadPart - timestamp;
2750 else if (!timestamp)
2752 if (!period)
2753 timeout = NULL;
2754 else
2756 LARGE_INTEGER now;
2757 NtQuerySystemTime( &now );
2758 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
2760 submit_timer = TRUE;
2764 /* First remove existing timeout. */
2765 if (this->u.timer.timer_pending)
2767 list_remove( &this->u.timer.timer_entry );
2768 this->u.timer.timer_pending = FALSE;
2771 /* If the timer was enabled, then add it back to the queue. */
2772 if (timeout)
2774 this->u.timer.timeout = timestamp;
2775 this->u.timer.period = period;
2776 this->u.timer.window_length = window_length;
2778 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
2779 struct threadpool_object, u.timer.timer_entry )
2781 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
2782 if (this->u.timer.timeout < other_timer->u.timer.timeout)
2783 break;
2785 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
2787 /* Wake up the timer thread when the timeout has to be updated. */
2788 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
2789 RtlWakeAllConditionVariable( &timerqueue.update_event );
2791 this->u.timer.timer_pending = TRUE;
2794 RtlLeaveCriticalSection( &timerqueue.cs );
2796 if (submit_timer)
2797 tp_object_submit( this, FALSE );
2800 /***********************************************************************
2801 * TpSetWait (NTDLL.@)
2803 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
2805 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2806 ULONGLONG timestamp = TIMEOUT_INFINITE;
2807 BOOL submit_wait = FALSE;
2809 TRACE( "%p %p %p\n", wait, handle, timeout );
2811 RtlEnterCriticalSection( &waitqueue.cs );
2813 assert( this->u.wait.bucket );
2814 this->u.wait.handle = handle;
2816 if (handle || this->u.wait.wait_pending)
2818 struct waitqueue_bucket *bucket = this->u.wait.bucket;
2819 list_remove( &this->u.wait.wait_entry );
2821 /* Convert relative timeout to absolute timestamp. */
2822 if (handle && timeout)
2824 timestamp = timeout->QuadPart;
2825 if ((LONGLONG)timestamp < 0)
2827 LARGE_INTEGER now;
2828 NtQuerySystemTime( &now );
2829 timestamp = now.QuadPart - timestamp;
2831 else if (!timestamp)
2833 submit_wait = TRUE;
2834 handle = NULL;
2838 /* Add wait object back into one of the queues. */
2839 if (handle)
2841 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
2842 this->u.wait.wait_pending = TRUE;
2843 this->u.wait.timeout = timestamp;
2845 else
2847 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
2848 this->u.wait.wait_pending = FALSE;
2851 /* Wake up the wait queue thread. */
2852 NtSetEvent( bucket->update_event, NULL );
2855 RtlLeaveCriticalSection( &waitqueue.cs );
2857 if (submit_wait)
2858 tp_object_submit( this, FALSE );
2861 /***********************************************************************
2862 * TpSimpleTryPost (NTDLL.@)
2864 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
2865 TP_CALLBACK_ENVIRON *environment )
2867 struct threadpool_object *object;
2868 struct threadpool *pool;
2869 NTSTATUS status;
2871 TRACE( "%p %p %p\n", callback, userdata, environment );
2873 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2874 if (!object)
2875 return STATUS_NO_MEMORY;
2877 status = tp_threadpool_lock( &pool, environment );
2878 if (status)
2880 RtlFreeHeap( GetProcessHeap(), 0, object );
2881 return status;
2884 object->type = TP_OBJECT_TYPE_SIMPLE;
2885 object->u.simple.callback = callback;
2886 tp_object_initialize( object, pool, userdata, environment );
2888 return STATUS_SUCCESS;
2891 /***********************************************************************
2892 * TpWaitForTimer (NTDLL.@)
2894 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
2896 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2898 TRACE( "%p %d\n", timer, cancel_pending );
2900 if (cancel_pending)
2901 tp_object_cancel( this, FALSE, NULL );
2902 tp_object_wait( this, FALSE );
2905 /***********************************************************************
2906 * TpWaitForWait (NTDLL.@)
2908 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
2910 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2912 TRACE( "%p %d\n", wait, cancel_pending );
2914 if (cancel_pending)
2915 tp_object_cancel( this, FALSE, NULL );
2916 tp_object_wait( this, FALSE );
2919 /***********************************************************************
2920 * TpWaitForWork (NTDLL.@)
2922 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
2924 struct threadpool_object *this = impl_from_TP_WORK( work );
2926 TRACE( "%p %u\n", work, cancel_pending );
2928 if (cancel_pending)
2929 tp_object_cancel( this, FALSE, NULL );
2930 tp_object_wait( this, FALSE );