comctl32: Fix a typo in comment.
[wine.git] / dlls / ntdll / threadpool.c
blob1b0546a78164acd32e5b9504d001d770e662ae4a
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include "config.h"
23 #include "wine/port.h"
25 #include <assert.h>
26 #include <stdarg.h>
27 #include <limits.h>
29 #define NONAMELESSUNION
30 #include "ntstatus.h"
31 #define WIN32_NO_STATUS
32 #include "winternl.h"
34 #include "wine/debug.h"
35 #include "wine/list.h"
37 #include "ntdll_misc.h"
39 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
42 * Old thread pooling API
45 struct rtl_work_item
47 PRTL_WORK_ITEM_ROUTINE function;
48 PVOID context;
51 #define EXPIRE_NEVER (~(ULONGLONG)0)
52 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
54 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
56 static struct
58 HANDLE compl_port;
59 RTL_CRITICAL_SECTION threadpool_compl_cs;
61 old_threadpool =
63 NULL, /* compl_port */
64 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
67 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
69 0, 0, &old_threadpool.threadpool_compl_cs,
70 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
71 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
74 struct wait_work_item
76 HANDLE Object;
77 HANDLE CancelEvent;
78 WAITORTIMERCALLBACK Callback;
79 PVOID Context;
80 ULONG Milliseconds;
81 ULONG Flags;
82 HANDLE CompletionEvent;
83 LONG DeleteCount;
84 BOOLEAN CallbackInProgress;
87 struct timer_queue;
88 struct queue_timer
90 struct timer_queue *q;
91 struct list entry;
92 ULONG runcount; /* number of callbacks pending execution */
93 RTL_WAITORTIMERCALLBACKFUNC callback;
94 PVOID param;
95 DWORD period;
96 ULONG flags;
97 ULONGLONG expire;
98 BOOL destroy; /* timer should be deleted; once set, never unset */
99 HANDLE event; /* removal event */
102 struct timer_queue
104 DWORD magic;
105 RTL_CRITICAL_SECTION cs;
106 struct list timers; /* sorted by expiration time */
107 BOOL quit; /* queue should be deleted; once set, never unset */
108 HANDLE event;
109 HANDLE thread;
113 * Object-oriented thread pooling API
116 #define THREADPOOL_WORKER_TIMEOUT 5000
117 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
119 /* internal threadpool representation */
120 struct threadpool
122 LONG refcount;
123 LONG objcount;
124 BOOL shutdown;
125 CRITICAL_SECTION cs;
126 /* pool of work items, locked via .cs */
127 struct list pool;
128 RTL_CONDITION_VARIABLE update_event;
129 /* information about worker threads, locked via .cs */
130 int max_workers;
131 int min_workers;
132 int num_workers;
133 int num_busy_workers;
136 enum threadpool_objtype
138 TP_OBJECT_TYPE_SIMPLE,
139 TP_OBJECT_TYPE_WORK,
140 TP_OBJECT_TYPE_TIMER,
141 TP_OBJECT_TYPE_WAIT
144 /* internal threadpool object representation */
145 struct threadpool_object
147 LONG refcount;
148 BOOL shutdown;
149 /* read-only information */
150 enum threadpool_objtype type;
151 struct threadpool *pool;
152 struct threadpool_group *group;
153 PVOID userdata;
154 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
155 PTP_SIMPLE_CALLBACK finalization_callback;
156 BOOL may_run_long;
157 HMODULE race_dll;
158 /* information about the group, locked via .group->cs */
159 struct list group_entry;
160 BOOL is_group_member;
161 /* information about the pool, locked via .pool->cs */
162 struct list pool_entry;
163 RTL_CONDITION_VARIABLE finished_event;
164 RTL_CONDITION_VARIABLE group_finished_event;
165 LONG num_pending_callbacks;
166 LONG num_running_callbacks;
167 LONG num_associated_callbacks;
168 /* arguments for callback */
169 union
171 struct
173 PTP_SIMPLE_CALLBACK callback;
174 } simple;
175 struct
177 PTP_WORK_CALLBACK callback;
178 } work;
179 struct
181 PTP_TIMER_CALLBACK callback;
182 /* information about the timer, locked via timerqueue.cs */
183 BOOL timer_initialized;
184 BOOL timer_pending;
185 struct list timer_entry;
186 BOOL timer_set;
187 ULONGLONG timeout;
188 LONG period;
189 LONG window_length;
190 } timer;
191 struct
193 PTP_WAIT_CALLBACK callback;
194 LONG signaled;
195 /* information about the wait object, locked via waitqueue.cs */
196 struct waitqueue_bucket *bucket;
197 BOOL wait_pending;
198 struct list wait_entry;
199 ULONGLONG timeout;
200 HANDLE handle;
201 } wait;
202 } u;
205 /* internal threadpool instance representation */
206 struct threadpool_instance
208 struct threadpool_object *object;
209 DWORD threadid;
210 BOOL associated;
211 BOOL may_run_long;
212 struct
214 CRITICAL_SECTION *critical_section;
215 HANDLE mutex;
216 HANDLE semaphore;
217 LONG semaphore_count;
218 HANDLE event;
219 HMODULE library;
220 } cleanup;
223 /* internal threadpool group representation */
224 struct threadpool_group
226 LONG refcount;
227 BOOL shutdown;
228 CRITICAL_SECTION cs;
229 /* list of group members, locked via .cs */
230 struct list members;
233 /* global timerqueue object */
234 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
236 static struct
238 CRITICAL_SECTION cs;
239 LONG objcount;
240 BOOL thread_running;
241 struct list pending_timers;
242 RTL_CONDITION_VARIABLE update_event;
244 timerqueue =
246 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
247 0, /* objcount */
248 FALSE, /* thread_running */
249 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
250 RTL_CONDITION_VARIABLE_INIT /* update_event */
253 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
255 0, 0, &timerqueue.cs,
256 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
257 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
260 /* global waitqueue object */
261 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
263 static struct
265 CRITICAL_SECTION cs;
266 LONG num_buckets;
267 struct list buckets;
269 waitqueue =
271 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
272 0, /* num_buckets */
273 LIST_INIT( waitqueue.buckets ) /* buckets */
276 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
278 0, 0, &waitqueue.cs,
279 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
280 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
283 struct waitqueue_bucket
285 struct list bucket_entry;
286 LONG objcount;
287 struct list reserved;
288 struct list waiting;
289 HANDLE update_event;
292 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
294 return (struct threadpool *)pool;
297 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
299 struct threadpool_object *object = (struct threadpool_object *)work;
300 assert( object->type == TP_OBJECT_TYPE_WORK );
301 return object;
304 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
306 struct threadpool_object *object = (struct threadpool_object *)timer;
307 assert( object->type == TP_OBJECT_TYPE_TIMER );
308 return object;
311 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
313 struct threadpool_object *object = (struct threadpool_object *)wait;
314 assert( object->type == TP_OBJECT_TYPE_WAIT );
315 return object;
318 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
320 return (struct threadpool_group *)group;
323 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
325 return (struct threadpool_instance *)instance;
328 static void CALLBACK threadpool_worker_proc( void *param );
329 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
330 static void tp_object_prepare_shutdown( struct threadpool_object *object );
331 static BOOL tp_object_release( struct threadpool_object *object );
332 static struct threadpool *default_threadpool = NULL;
334 static inline LONG interlocked_inc( PLONG dest )
336 return interlocked_xchg_add( dest, 1 ) + 1;
339 static inline LONG interlocked_dec( PLONG dest )
341 return interlocked_xchg_add( dest, -1 ) - 1;
344 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
346 struct rtl_work_item *item = userdata;
348 TRACE("executing %p(%p)\n", item->function, item->context);
349 item->function( item->context );
351 RtlFreeHeap( GetProcessHeap(), 0, item );
354 /***********************************************************************
355 * RtlQueueWorkItem (NTDLL.@)
357 * Queues a work item into a thread in the thread pool.
359 * PARAMS
360 * function [I] Work function to execute.
361 * context [I] Context to pass to the work function when it is executed.
362 * flags [I] Flags. See notes.
364 * RETURNS
365 * Success: STATUS_SUCCESS.
366 * Failure: Any NTSTATUS code.
368 * NOTES
369 * Flags can be one or more of the following:
370 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
371 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
372 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
373 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
374 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
376 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
378 TP_CALLBACK_ENVIRON environment;
379 struct rtl_work_item *item;
380 NTSTATUS status;
382 TRACE( "%p %p %u\n", function, context, flags );
384 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
385 if (!item)
386 return STATUS_NO_MEMORY;
388 memset( &environment, 0, sizeof(environment) );
389 environment.Version = 1;
390 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
391 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
393 item->function = function;
394 item->context = context;
396 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
397 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
398 return status;
401 /***********************************************************************
402 * iocp_poller - get completion events and run callbacks
404 static DWORD CALLBACK iocp_poller(LPVOID Arg)
406 HANDLE cport = Arg;
408 while( TRUE )
410 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
411 LPVOID overlapped;
412 IO_STATUS_BLOCK iosb;
413 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
414 if (res)
416 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
418 else
420 DWORD transferred = 0;
421 DWORD err = 0;
423 if (iosb.u.Status == STATUS_SUCCESS)
424 transferred = iosb.Information;
425 else
426 err = RtlNtStatusToDosError(iosb.u.Status);
428 callback( err, transferred, overlapped );
431 return 0;
434 /***********************************************************************
435 * RtlSetIoCompletionCallback (NTDLL.@)
437 * Binds a handle to a thread pool's completion port, and possibly
438 * starts a non-I/O thread to monitor this port and call functions back.
440 * PARAMS
441 * FileHandle [I] Handle to bind to a completion port.
442 * Function [I] Callback function to call on I/O completions.
443 * Flags [I] Not used.
445 * RETURNS
446 * Success: STATUS_SUCCESS.
447 * Failure: Any NTSTATUS code.
450 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
452 IO_STATUS_BLOCK iosb;
453 FILE_COMPLETION_INFORMATION info;
455 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
457 if (!old_threadpool.compl_port)
459 NTSTATUS res = STATUS_SUCCESS;
461 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
462 if (!old_threadpool.compl_port)
464 HANDLE cport;
466 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
467 if (!res)
469 /* FIXME native can start additional threads in case of e.g. hung callback function. */
470 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
471 if (!res)
472 old_threadpool.compl_port = cport;
473 else
474 NtClose( cport );
477 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
478 if (res) return res;
481 info.CompletionPort = old_threadpool.compl_port;
482 info.CompletionKey = (ULONG_PTR)Function;
484 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
487 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
489 if (timeout == INFINITE) return NULL;
490 pTime->QuadPart = (ULONGLONG)timeout * -10000;
491 return pTime;
494 static void delete_wait_work_item(struct wait_work_item *wait_work_item)
496 NtClose( wait_work_item->CancelEvent );
497 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
500 static DWORD CALLBACK wait_thread_proc(LPVOID Arg)
502 struct wait_work_item *wait_work_item = Arg;
503 NTSTATUS status;
504 BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0;
505 HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent };
506 LARGE_INTEGER timeout;
507 HANDLE completion_event;
509 TRACE("\n");
511 while (TRUE)
513 status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable,
514 get_nt_timeout( &timeout, wait_work_item->Milliseconds ) );
515 if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT)
517 BOOLEAN TimerOrWaitFired;
519 if (status == STATUS_WAIT_0)
521 TRACE( "object %p signaled, calling callback %p with context %p\n",
522 wait_work_item->Object, wait_work_item->Callback,
523 wait_work_item->Context );
524 TimerOrWaitFired = FALSE;
526 else
528 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
529 wait_work_item->Object, wait_work_item->Callback,
530 wait_work_item->Context );
531 TimerOrWaitFired = TRUE;
533 wait_work_item->CallbackInProgress = TRUE;
534 wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired );
535 wait_work_item->CallbackInProgress = FALSE;
537 if (wait_work_item->Flags & WT_EXECUTEONLYONCE)
538 break;
540 else if (status != STATUS_USER_APC)
541 break;
545 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
547 completion_event = wait_work_item->CompletionEvent;
548 delete_wait_work_item( wait_work_item );
549 if (completion_event) NtSetEvent( completion_event, NULL );
552 return 0;
555 /***********************************************************************
556 * RtlRegisterWait (NTDLL.@)
558 * Registers a wait for a handle to become signaled.
560 * PARAMS
561 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
562 * Object [I] Object to wait to become signaled.
563 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
564 * Context [I] Context to pass to the callback function when it is executed.
565 * Milliseconds [I] Number of milliseconds to wait before timing out.
566 * Flags [I] Flags. See notes.
568 * RETURNS
569 * Success: STATUS_SUCCESS.
570 * Failure: Any NTSTATUS code.
572 * NOTES
573 * Flags can be one or more of the following:
574 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
575 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
576 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
577 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
578 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
580 NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object,
581 RTL_WAITORTIMERCALLBACKFUNC Callback,
582 PVOID Context, ULONG Milliseconds, ULONG Flags)
584 struct wait_work_item *wait_work_item;
585 NTSTATUS status;
587 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags );
589 wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) );
590 if (!wait_work_item)
591 return STATUS_NO_MEMORY;
593 wait_work_item->Object = Object;
594 wait_work_item->Callback = Callback;
595 wait_work_item->Context = Context;
596 wait_work_item->Milliseconds = Milliseconds;
597 wait_work_item->Flags = Flags;
598 wait_work_item->CallbackInProgress = FALSE;
599 wait_work_item->DeleteCount = 0;
600 wait_work_item->CompletionEvent = NULL;
602 status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
603 if (status != STATUS_SUCCESS)
605 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
606 return status;
609 Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD |
610 WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION);
611 status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags );
612 if (status != STATUS_SUCCESS)
614 delete_wait_work_item( wait_work_item );
615 return status;
618 *NewWaitObject = wait_work_item;
619 return status;
622 /***********************************************************************
623 * RtlDeregisterWaitEx (NTDLL.@)
625 * Cancels a wait operation and frees the resources associated with calling
626 * RtlRegisterWait().
628 * PARAMS
629 * WaitObject [I] Handle to the wait object to free.
631 * RETURNS
632 * Success: STATUS_SUCCESS.
633 * Failure: Any NTSTATUS code.
635 NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent)
637 struct wait_work_item *wait_work_item = WaitHandle;
638 NTSTATUS status;
639 HANDLE LocalEvent = NULL;
640 BOOLEAN CallbackInProgress;
642 TRACE( "(%p %p)\n", WaitHandle, CompletionEvent );
644 if (WaitHandle == NULL)
645 return STATUS_INVALID_HANDLE;
647 CallbackInProgress = wait_work_item->CallbackInProgress;
648 if (CompletionEvent == INVALID_HANDLE_VALUE || !CallbackInProgress)
650 status = NtCreateEvent( &LocalEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
651 if (status != STATUS_SUCCESS)
652 return status;
653 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, LocalEvent );
655 else if (CompletionEvent != NULL)
657 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
660 NtSetEvent( wait_work_item->CancelEvent, NULL );
662 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
664 status = STATUS_SUCCESS;
665 delete_wait_work_item( wait_work_item );
667 else if (LocalEvent)
669 NtWaitForSingleObject( LocalEvent, FALSE, NULL );
670 status = STATUS_SUCCESS;
672 else
674 status = STATUS_PENDING;
677 if (LocalEvent)
678 NtClose( LocalEvent );
680 return status;
683 /***********************************************************************
684 * RtlDeregisterWait (NTDLL.@)
686 * Cancels a wait operation and frees the resources associated with calling
687 * RtlRegisterWait().
689 * PARAMS
690 * WaitObject [I] Handle to the wait object to free.
692 * RETURNS
693 * Success: STATUS_SUCCESS.
694 * Failure: Any NTSTATUS code.
696 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
698 return RtlDeregisterWaitEx(WaitHandle, NULL);
702 /************************** Timer Queue Impl **************************/
704 static void queue_remove_timer(struct queue_timer *t)
706 /* We MUST hold the queue cs while calling this function. This ensures
707 that we cannot queue another callback for this timer. The runcount
708 being zero makes sure we don't have any already queued. */
709 struct timer_queue *q = t->q;
711 assert(t->runcount == 0);
712 assert(t->destroy);
714 list_remove(&t->entry);
715 if (t->event)
716 NtSetEvent(t->event, NULL);
717 RtlFreeHeap(GetProcessHeap(), 0, t);
719 if (q->quit && list_empty(&q->timers))
720 NtSetEvent(q->event, NULL);
723 static void timer_cleanup_callback(struct queue_timer *t)
725 struct timer_queue *q = t->q;
726 RtlEnterCriticalSection(&q->cs);
728 assert(0 < t->runcount);
729 --t->runcount;
731 if (t->destroy && t->runcount == 0)
732 queue_remove_timer(t);
734 RtlLeaveCriticalSection(&q->cs);
737 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
739 struct queue_timer *t = p;
740 t->callback(t->param, TRUE);
741 timer_cleanup_callback(t);
742 return 0;
745 static inline ULONGLONG queue_current_time(void)
747 LARGE_INTEGER now, freq;
748 NtQueryPerformanceCounter(&now, &freq);
749 return now.QuadPart * 1000 / freq.QuadPart;
752 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
753 BOOL set_event)
755 /* We MUST hold the queue cs while calling this function. */
756 struct timer_queue *q = t->q;
757 struct list *ptr = &q->timers;
759 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
761 if (time != EXPIRE_NEVER)
762 LIST_FOR_EACH(ptr, &q->timers)
764 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
765 if (time < cur->expire)
766 break;
768 list_add_before(ptr, &t->entry);
770 t->expire = time;
772 /* If we insert at the head of the list, we need to expire sooner
773 than expected. */
774 if (set_event && &t->entry == list_head(&q->timers))
775 NtSetEvent(q->event, NULL);
778 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
779 BOOL set_event)
781 /* We MUST hold the queue cs while calling this function. */
782 list_remove(&t->entry);
783 queue_add_timer(t, time, set_event);
786 static void queue_timer_expire(struct timer_queue *q)
788 struct queue_timer *t = NULL;
790 RtlEnterCriticalSection(&q->cs);
791 if (list_head(&q->timers))
793 ULONGLONG now, next;
794 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
795 if (!t->destroy && t->expire <= ((now = queue_current_time())))
797 ++t->runcount;
798 if (t->period)
800 next = t->expire + t->period;
801 /* avoid trigger cascade if overloaded / hibernated */
802 if (next < now)
803 next = now + t->period;
805 else
806 next = EXPIRE_NEVER;
807 queue_move_timer(t, next, FALSE);
809 else
810 t = NULL;
812 RtlLeaveCriticalSection(&q->cs);
814 if (t)
816 if (t->flags & WT_EXECUTEINTIMERTHREAD)
817 timer_callback_wrapper(t);
818 else
820 ULONG flags
821 = (t->flags
822 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
823 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
824 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
825 if (status != STATUS_SUCCESS)
826 timer_cleanup_callback(t);
831 static ULONG queue_get_timeout(struct timer_queue *q)
833 struct queue_timer *t;
834 ULONG timeout = INFINITE;
836 RtlEnterCriticalSection(&q->cs);
837 if (list_head(&q->timers))
839 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
840 assert(!t->destroy || t->expire == EXPIRE_NEVER);
842 if (t->expire != EXPIRE_NEVER)
844 ULONGLONG time = queue_current_time();
845 timeout = t->expire < time ? 0 : t->expire - time;
848 RtlLeaveCriticalSection(&q->cs);
850 return timeout;
853 static void WINAPI timer_queue_thread_proc(LPVOID p)
855 struct timer_queue *q = p;
856 ULONG timeout_ms;
858 timeout_ms = INFINITE;
859 for (;;)
861 LARGE_INTEGER timeout;
862 NTSTATUS status;
863 BOOL done = FALSE;
865 status = NtWaitForSingleObject(
866 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
868 if (status == STATUS_WAIT_0)
870 /* There are two possible ways to trigger the event. Either
871 we are quitting and the last timer got removed, or a new
872 timer got put at the head of the list so we need to adjust
873 our timeout. */
874 RtlEnterCriticalSection(&q->cs);
875 if (q->quit && list_empty(&q->timers))
876 done = TRUE;
877 RtlLeaveCriticalSection(&q->cs);
879 else if (status == STATUS_TIMEOUT)
880 queue_timer_expire(q);
882 if (done)
883 break;
885 timeout_ms = queue_get_timeout(q);
888 NtClose(q->event);
889 RtlDeleteCriticalSection(&q->cs);
890 q->magic = 0;
891 RtlFreeHeap(GetProcessHeap(), 0, q);
892 RtlExitUserThread( 0 );
895 static void queue_destroy_timer(struct queue_timer *t)
897 /* We MUST hold the queue cs while calling this function. */
898 t->destroy = TRUE;
899 if (t->runcount == 0)
900 /* Ensure a timer is promptly removed. If callbacks are pending,
901 it will be removed after the last one finishes by the callback
902 cleanup wrapper. */
903 queue_remove_timer(t);
904 else
905 /* Make sure no destroyed timer masks an active timer at the head
906 of the sorted list. */
907 queue_move_timer(t, EXPIRE_NEVER, FALSE);
910 /***********************************************************************
911 * RtlCreateTimerQueue (NTDLL.@)
913 * Creates a timer queue object and returns a handle to it.
915 * PARAMS
916 * NewTimerQueue [O] The newly created queue.
918 * RETURNS
919 * Success: STATUS_SUCCESS.
920 * Failure: Any NTSTATUS code.
922 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
924 NTSTATUS status;
925 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
926 if (!q)
927 return STATUS_NO_MEMORY;
929 RtlInitializeCriticalSection(&q->cs);
930 list_init(&q->timers);
931 q->quit = FALSE;
932 q->magic = TIMER_QUEUE_MAGIC;
933 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
934 if (status != STATUS_SUCCESS)
936 RtlFreeHeap(GetProcessHeap(), 0, q);
937 return status;
939 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
940 timer_queue_thread_proc, q, &q->thread, NULL);
941 if (status != STATUS_SUCCESS)
943 NtClose(q->event);
944 RtlFreeHeap(GetProcessHeap(), 0, q);
945 return status;
948 *NewTimerQueue = q;
949 return STATUS_SUCCESS;
952 /***********************************************************************
953 * RtlDeleteTimerQueueEx (NTDLL.@)
955 * Deletes a timer queue object.
957 * PARAMS
958 * TimerQueue [I] The timer queue to destroy.
959 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
960 * wait until all timers are finished firing before
961 * returning. Otherwise, return immediately and set the
962 * event when all timers are done.
964 * RETURNS
965 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
966 * Failure: Any NTSTATUS code.
968 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
970 struct timer_queue *q = TimerQueue;
971 struct queue_timer *t, *temp;
972 HANDLE thread;
973 NTSTATUS status;
975 if (!q || q->magic != TIMER_QUEUE_MAGIC)
976 return STATUS_INVALID_HANDLE;
978 thread = q->thread;
980 RtlEnterCriticalSection(&q->cs);
981 q->quit = TRUE;
982 if (list_head(&q->timers))
983 /* When the last timer is removed, it will signal the timer thread to
984 exit... */
985 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
986 queue_destroy_timer(t);
987 else
988 /* However if we have none, we must do it ourselves. */
989 NtSetEvent(q->event, NULL);
990 RtlLeaveCriticalSection(&q->cs);
992 if (CompletionEvent == INVALID_HANDLE_VALUE)
994 NtWaitForSingleObject(thread, FALSE, NULL);
995 status = STATUS_SUCCESS;
997 else
999 if (CompletionEvent)
1001 FIXME("asynchronous return on completion event unimplemented\n");
1002 NtWaitForSingleObject(thread, FALSE, NULL);
1003 NtSetEvent(CompletionEvent, NULL);
1005 status = STATUS_PENDING;
1008 NtClose(thread);
1009 return status;
1012 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
1014 static struct timer_queue *default_timer_queue;
1016 if (TimerQueue)
1017 return TimerQueue;
1018 else
1020 if (!default_timer_queue)
1022 HANDLE q;
1023 NTSTATUS status = RtlCreateTimerQueue(&q);
1024 if (status == STATUS_SUCCESS)
1026 PVOID p = interlocked_cmpxchg_ptr(
1027 (void **) &default_timer_queue, q, NULL);
1028 if (p)
1029 /* Got beat to the punch. */
1030 RtlDeleteTimerQueueEx(q, NULL);
1033 return default_timer_queue;
1037 /***********************************************************************
1038 * RtlCreateTimer (NTDLL.@)
1040 * Creates a new timer associated with the given queue.
1042 * PARAMS
1043 * NewTimer [O] The newly created timer.
1044 * TimerQueue [I] The queue to hold the timer.
1045 * Callback [I] The callback to fire.
1046 * Parameter [I] The argument for the callback.
1047 * DueTime [I] The delay, in milliseconds, before first firing the
1048 * timer.
1049 * Period [I] The period, in milliseconds, at which to fire the timer
1050 * after the first callback. If zero, the timer will only
1051 * fire once. It still needs to be deleted with
1052 * RtlDeleteTimer.
1053 * Flags [I] Flags controlling the execution of the callback. In
1054 * addition to the WT_* thread pool flags (see
1055 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1056 * WT_EXECUTEONLYONCE are supported.
1058 * RETURNS
1059 * Success: STATUS_SUCCESS.
1060 * Failure: Any NTSTATUS code.
1062 NTSTATUS WINAPI RtlCreateTimer(PHANDLE NewTimer, HANDLE TimerQueue,
1063 RTL_WAITORTIMERCALLBACKFUNC Callback,
1064 PVOID Parameter, DWORD DueTime, DWORD Period,
1065 ULONG Flags)
1067 NTSTATUS status;
1068 struct queue_timer *t;
1069 struct timer_queue *q = get_timer_queue(TimerQueue);
1071 if (!q) return STATUS_NO_MEMORY;
1072 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1074 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1075 if (!t)
1076 return STATUS_NO_MEMORY;
1078 t->q = q;
1079 t->runcount = 0;
1080 t->callback = Callback;
1081 t->param = Parameter;
1082 t->period = Period;
1083 t->flags = Flags;
1084 t->destroy = FALSE;
1085 t->event = NULL;
1087 status = STATUS_SUCCESS;
1088 RtlEnterCriticalSection(&q->cs);
1089 if (q->quit)
1090 status = STATUS_INVALID_HANDLE;
1091 else
1092 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
1093 RtlLeaveCriticalSection(&q->cs);
1095 if (status == STATUS_SUCCESS)
1096 *NewTimer = t;
1097 else
1098 RtlFreeHeap(GetProcessHeap(), 0, t);
1100 return status;
1103 /***********************************************************************
1104 * RtlUpdateTimer (NTDLL.@)
1106 * Changes the time at which a timer expires.
1108 * PARAMS
1109 * TimerQueue [I] The queue that holds the timer.
1110 * Timer [I] The timer to update.
1111 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1112 * Period [I] The period, in milliseconds, at which to fire the timer
1113 * after the first callback. If zero, the timer will not
1114 * refire once. It still needs to be deleted with
1115 * RtlDeleteTimer.
1117 * RETURNS
1118 * Success: STATUS_SUCCESS.
1119 * Failure: Any NTSTATUS code.
1121 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
1122 DWORD DueTime, DWORD Period)
1124 struct queue_timer *t = Timer;
1125 struct timer_queue *q = t->q;
1127 RtlEnterCriticalSection(&q->cs);
1128 /* Can't change a timer if it was once-only or destroyed. */
1129 if (t->expire != EXPIRE_NEVER)
1131 t->period = Period;
1132 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
1134 RtlLeaveCriticalSection(&q->cs);
1136 return STATUS_SUCCESS;
1139 /***********************************************************************
1140 * RtlDeleteTimer (NTDLL.@)
1142 * Cancels a timer-queue timer.
1144 * PARAMS
1145 * TimerQueue [I] The queue that holds the timer.
1146 * Timer [I] The timer to update.
1147 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1148 * wait until the timer is finished firing all pending
1149 * callbacks before returning. Otherwise, return
1150 * immediately and set the timer is done.
1152 * RETURNS
1153 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1154 or if the completion event is NULL.
1155 * Failure: Any NTSTATUS code.
1157 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1158 HANDLE CompletionEvent)
1160 struct queue_timer *t = Timer;
1161 struct timer_queue *q;
1162 NTSTATUS status = STATUS_PENDING;
1163 HANDLE event = NULL;
1165 if (!Timer)
1166 return STATUS_INVALID_PARAMETER_1;
1167 q = t->q;
1168 if (CompletionEvent == INVALID_HANDLE_VALUE)
1170 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1171 if (status == STATUS_SUCCESS)
1172 status = STATUS_PENDING;
1174 else if (CompletionEvent)
1175 event = CompletionEvent;
1177 RtlEnterCriticalSection(&q->cs);
1178 t->event = event;
1179 if (t->runcount == 0 && event)
1180 status = STATUS_SUCCESS;
1181 queue_destroy_timer(t);
1182 RtlLeaveCriticalSection(&q->cs);
1184 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1186 if (status == STATUS_PENDING)
1188 NtWaitForSingleObject(event, FALSE, NULL);
1189 status = STATUS_SUCCESS;
1191 NtClose(event);
1194 return status;
1197 /***********************************************************************
1198 * timerqueue_thread_proc (internal)
1200 static void CALLBACK timerqueue_thread_proc( void *param )
1202 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1203 struct threadpool_object *other_timer;
1204 LARGE_INTEGER now, timeout;
1205 struct list *ptr;
1207 TRACE( "starting timer queue thread\n" );
1209 RtlEnterCriticalSection( &timerqueue.cs );
1210 for (;;)
1212 NtQuerySystemTime( &now );
1214 /* Check for expired timers. */
1215 while ((ptr = list_head( &timerqueue.pending_timers )))
1217 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1218 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1219 assert( timer->u.timer.timer_pending );
1220 if (timer->u.timer.timeout > now.QuadPart)
1221 break;
1223 /* Queue a new callback in one of the worker threads. */
1224 list_remove( &timer->u.timer.timer_entry );
1225 timer->u.timer.timer_pending = FALSE;
1226 tp_object_submit( timer, FALSE );
1228 /* Insert the timer back into the queue, except it's marked for shutdown. */
1229 if (timer->u.timer.period && !timer->shutdown)
1231 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1232 if (timer->u.timer.timeout <= now.QuadPart)
1233 timer->u.timer.timeout = now.QuadPart + 1;
1235 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1236 struct threadpool_object, u.timer.timer_entry )
1238 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1239 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1240 break;
1242 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1243 timer->u.timer.timer_pending = TRUE;
1247 timeout_lower = TIMEOUT_INFINITE;
1248 timeout_upper = TIMEOUT_INFINITE;
1250 /* Determine next timeout and use the window length to optimize wakeup times. */
1251 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1252 struct threadpool_object, u.timer.timer_entry )
1254 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1255 if (other_timer->u.timer.timeout >= timeout_upper)
1256 break;
1258 timeout_lower = other_timer->u.timer.timeout;
1259 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1260 if (new_timeout < timeout_upper)
1261 timeout_upper = new_timeout;
1264 /* Wait for timer update events or until the next timer expires. */
1265 if (timerqueue.objcount)
1267 timeout.QuadPart = timeout_lower;
1268 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1269 continue;
1272 /* All timers have been destroyed, if no new timers are created
1273 * within some amount of time, then we can shutdown this thread. */
1274 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1275 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1276 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1278 break;
1282 timerqueue.thread_running = FALSE;
1283 RtlLeaveCriticalSection( &timerqueue.cs );
1285 TRACE( "terminating timer queue thread\n" );
1286 RtlExitUserThread( 0 );
1289 /***********************************************************************
1290 * tp_new_worker_thread (internal)
1292 * Create and account a new worker thread for the desired pool.
1294 static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1296 HANDLE thread;
1297 NTSTATUS status;
1299 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1300 threadpool_worker_proc, pool, &thread, NULL );
1301 if (status == STATUS_SUCCESS)
1303 interlocked_inc( &pool->refcount );
1304 pool->num_workers++;
1305 pool->num_busy_workers++;
1306 NtClose( thread );
1308 return status;
1311 /***********************************************************************
1312 * tp_timerqueue_lock (internal)
1314 * Acquires a lock on the global timerqueue. When the lock is acquired
1315 * successfully, it is guaranteed that the timer thread is running.
1317 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1319 NTSTATUS status = STATUS_SUCCESS;
1320 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1322 timer->u.timer.timer_initialized = FALSE;
1323 timer->u.timer.timer_pending = FALSE;
1324 timer->u.timer.timer_set = FALSE;
1325 timer->u.timer.timeout = 0;
1326 timer->u.timer.period = 0;
1327 timer->u.timer.window_length = 0;
1329 RtlEnterCriticalSection( &timerqueue.cs );
1331 /* Make sure that the timerqueue thread is running. */
1332 if (!timerqueue.thread_running)
1334 HANDLE thread;
1335 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1336 timerqueue_thread_proc, NULL, &thread, NULL );
1337 if (status == STATUS_SUCCESS)
1339 timerqueue.thread_running = TRUE;
1340 NtClose( thread );
1344 if (status == STATUS_SUCCESS)
1346 timer->u.timer.timer_initialized = TRUE;
1347 timerqueue.objcount++;
1350 RtlLeaveCriticalSection( &timerqueue.cs );
1351 return status;
1354 /***********************************************************************
1355 * tp_timerqueue_unlock (internal)
1357 * Releases a lock on the global timerqueue.
1359 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1361 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1363 RtlEnterCriticalSection( &timerqueue.cs );
1364 if (timer->u.timer.timer_initialized)
1366 /* If timer was pending, remove it. */
1367 if (timer->u.timer.timer_pending)
1369 list_remove( &timer->u.timer.timer_entry );
1370 timer->u.timer.timer_pending = FALSE;
1373 /* If the last timer object was destroyed, then wake up the thread. */
1374 if (!--timerqueue.objcount)
1376 assert( list_empty( &timerqueue.pending_timers ) );
1377 RtlWakeAllConditionVariable( &timerqueue.update_event );
1380 timer->u.timer.timer_initialized = FALSE;
1382 RtlLeaveCriticalSection( &timerqueue.cs );
1385 /***********************************************************************
1386 * waitqueue_thread_proc (internal)
1388 static void CALLBACK waitqueue_thread_proc( void *param )
1390 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1391 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1392 struct waitqueue_bucket *bucket = param;
1393 struct threadpool_object *wait, *next;
1394 LARGE_INTEGER now, timeout;
1395 DWORD num_handles;
1396 NTSTATUS status;
1398 TRACE( "starting wait queue thread\n" );
1400 RtlEnterCriticalSection( &waitqueue.cs );
1402 for (;;)
1404 NtQuerySystemTime( &now );
1405 timeout.QuadPart = TIMEOUT_INFINITE;
1406 num_handles = 0;
1408 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1409 u.wait.wait_entry )
1411 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1412 if (wait->u.wait.timeout <= now.QuadPart)
1414 /* Wait object timed out. */
1415 list_remove( &wait->u.wait.wait_entry );
1416 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1417 tp_object_submit( wait, FALSE );
1419 else
1421 if (wait->u.wait.timeout < timeout.QuadPart)
1422 timeout.QuadPart = wait->u.wait.timeout;
1424 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1425 interlocked_inc( &wait->refcount );
1426 objects[num_handles] = wait;
1427 handles[num_handles] = wait->u.wait.handle;
1428 num_handles++;
1432 if (!bucket->objcount)
1434 /* All wait objects have been destroyed, if no new wait objects are created
1435 * within some amount of time, then we can shutdown this thread. */
1436 assert( num_handles == 0 );
1437 RtlLeaveCriticalSection( &waitqueue.cs );
1438 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1439 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout );
1440 RtlEnterCriticalSection( &waitqueue.cs );
1442 if (status == STATUS_TIMEOUT && !bucket->objcount)
1443 break;
1445 else
1447 handles[num_handles] = bucket->update_event;
1448 RtlLeaveCriticalSection( &waitqueue.cs );
1449 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout );
1450 RtlEnterCriticalSection( &waitqueue.cs );
1452 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1454 wait = objects[status - STATUS_WAIT_0];
1455 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1456 if (wait->u.wait.bucket)
1458 /* Wait object signaled. */
1459 assert( wait->u.wait.bucket == bucket );
1460 list_remove( &wait->u.wait.wait_entry );
1461 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1462 tp_object_submit( wait, TRUE );
1464 else
1465 WARN("wait object %p triggered while object was destroyed\n", wait);
1468 /* Release temporary references to wait objects. */
1469 while (num_handles)
1471 wait = objects[--num_handles];
1472 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1473 tp_object_release( wait );
1477 /* Try to merge bucket with other threads. */
1478 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1479 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1481 struct waitqueue_bucket *other_bucket;
1482 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1484 if (other_bucket != bucket && other_bucket->objcount &&
1485 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1487 other_bucket->objcount += bucket->objcount;
1488 bucket->objcount = 0;
1490 /* Update reserved list. */
1491 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1493 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1494 wait->u.wait.bucket = other_bucket;
1496 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1498 /* Update waiting list. */
1499 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1501 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1502 wait->u.wait.bucket = other_bucket;
1504 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1506 /* Move bucket to the end, to keep the probability of
1507 * newly added wait objects as small as possible. */
1508 list_remove( &bucket->bucket_entry );
1509 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1511 NtSetEvent( other_bucket->update_event, NULL );
1512 break;
1518 /* Remove this bucket from the list. */
1519 list_remove( &bucket->bucket_entry );
1520 if (!--waitqueue.num_buckets)
1521 assert( list_empty( &waitqueue.buckets ) );
1523 RtlLeaveCriticalSection( &waitqueue.cs );
1525 TRACE( "terminating wait queue thread\n" );
1527 assert( bucket->objcount == 0 );
1528 assert( list_empty( &bucket->reserved ) );
1529 assert( list_empty( &bucket->waiting ) );
1530 NtClose( bucket->update_event );
1532 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1533 RtlExitUserThread( 0 );
1536 /***********************************************************************
1537 * tp_waitqueue_lock (internal)
1539 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1541 struct waitqueue_bucket *bucket;
1542 NTSTATUS status;
1543 HANDLE thread;
1544 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1546 wait->u.wait.signaled = 0;
1547 wait->u.wait.bucket = NULL;
1548 wait->u.wait.wait_pending = FALSE;
1549 wait->u.wait.timeout = 0;
1550 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1552 RtlEnterCriticalSection( &waitqueue.cs );
1554 /* Try to assign to existing bucket if possible. */
1555 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1557 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS)
1559 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1560 wait->u.wait.bucket = bucket;
1561 bucket->objcount++;
1563 status = STATUS_SUCCESS;
1564 goto out;
1568 /* Create a new bucket and corresponding worker thread. */
1569 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1570 if (!bucket)
1572 status = STATUS_NO_MEMORY;
1573 goto out;
1576 bucket->objcount = 0;
1577 list_init( &bucket->reserved );
1578 list_init( &bucket->waiting );
1580 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1581 NULL, SynchronizationEvent, FALSE );
1582 if (status)
1584 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1585 goto out;
1588 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1589 waitqueue_thread_proc, bucket, &thread, NULL );
1590 if (status == STATUS_SUCCESS)
1592 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1593 waitqueue.num_buckets++;
1595 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1596 wait->u.wait.bucket = bucket;
1597 bucket->objcount++;
1599 NtClose( thread );
1601 else
1603 NtClose( bucket->update_event );
1604 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1607 out:
1608 RtlLeaveCriticalSection( &waitqueue.cs );
1609 return status;
1612 /***********************************************************************
1613 * tp_waitqueue_unlock (internal)
1615 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1617 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1619 RtlEnterCriticalSection( &waitqueue.cs );
1620 if (wait->u.wait.bucket)
1622 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1623 assert( bucket->objcount > 0 );
1625 list_remove( &wait->u.wait.wait_entry );
1626 wait->u.wait.bucket = NULL;
1627 bucket->objcount--;
1629 NtSetEvent( bucket->update_event, NULL );
1631 RtlLeaveCriticalSection( &waitqueue.cs );
1634 /***********************************************************************
1635 * tp_threadpool_alloc (internal)
1637 * Allocates a new threadpool object.
1639 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1641 struct threadpool *pool;
1643 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1644 if (!pool)
1645 return STATUS_NO_MEMORY;
1647 pool->refcount = 1;
1648 pool->objcount = 0;
1649 pool->shutdown = FALSE;
1651 RtlInitializeCriticalSection( &pool->cs );
1652 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1654 list_init( &pool->pool );
1655 RtlInitializeConditionVariable( &pool->update_event );
1657 pool->max_workers = 500;
1658 pool->min_workers = 0;
1659 pool->num_workers = 0;
1660 pool->num_busy_workers = 0;
1662 TRACE( "allocated threadpool %p\n", pool );
1664 *out = pool;
1665 return STATUS_SUCCESS;
1668 /***********************************************************************
1669 * tp_threadpool_shutdown (internal)
1671 * Prepares the shutdown of a threadpool object and notifies all worker
1672 * threads to terminate (after all remaining work items have been
1673 * processed).
1675 static void tp_threadpool_shutdown( struct threadpool *pool )
1677 assert( pool != default_threadpool );
1679 pool->shutdown = TRUE;
1680 RtlWakeAllConditionVariable( &pool->update_event );
1683 /***********************************************************************
1684 * tp_threadpool_release (internal)
1686 * Releases a reference to a threadpool object.
1688 static BOOL tp_threadpool_release( struct threadpool *pool )
1690 if (interlocked_dec( &pool->refcount ))
1691 return FALSE;
1693 TRACE( "destroying threadpool %p\n", pool );
1695 assert( pool->shutdown );
1696 assert( !pool->objcount );
1697 assert( list_empty( &pool->pool ) );
1699 pool->cs.DebugInfo->Spare[0] = 0;
1700 RtlDeleteCriticalSection( &pool->cs );
1702 RtlFreeHeap( GetProcessHeap(), 0, pool );
1703 return TRUE;
1706 /***********************************************************************
1707 * tp_threadpool_lock (internal)
1709 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1710 * block. When the lock is acquired successfully, it is guaranteed that
1711 * there is at least one worker thread to process tasks.
1713 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1715 struct threadpool *pool = NULL;
1716 NTSTATUS status = STATUS_SUCCESS;
1718 if (environment)
1719 pool = (struct threadpool *)environment->Pool;
1721 if (!pool)
1723 if (!default_threadpool)
1725 status = tp_threadpool_alloc( &pool );
1726 if (status != STATUS_SUCCESS)
1727 return status;
1729 if (interlocked_cmpxchg_ptr( (void *)&default_threadpool, pool, NULL ) != NULL)
1731 tp_threadpool_shutdown( pool );
1732 tp_threadpool_release( pool );
1736 pool = default_threadpool;
1739 RtlEnterCriticalSection( &pool->cs );
1741 /* Make sure that the threadpool has at least one thread. */
1742 if (!pool->num_workers)
1743 status = tp_new_worker_thread( pool );
1745 /* Keep a reference, and increment objcount to ensure that the
1746 * last thread doesn't terminate. */
1747 if (status == STATUS_SUCCESS)
1749 interlocked_inc( &pool->refcount );
1750 pool->objcount++;
1753 RtlLeaveCriticalSection( &pool->cs );
1755 if (status != STATUS_SUCCESS)
1756 return status;
1758 *out = pool;
1759 return STATUS_SUCCESS;
1762 /***********************************************************************
1763 * tp_threadpool_unlock (internal)
1765 * Releases a lock on a threadpool.
1767 static void tp_threadpool_unlock( struct threadpool *pool )
1769 RtlEnterCriticalSection( &pool->cs );
1770 pool->objcount--;
1771 RtlLeaveCriticalSection( &pool->cs );
1772 tp_threadpool_release( pool );
1775 /***********************************************************************
1776 * tp_group_alloc (internal)
1778 * Allocates a new threadpool group object.
1780 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1782 struct threadpool_group *group;
1784 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1785 if (!group)
1786 return STATUS_NO_MEMORY;
1788 group->refcount = 1;
1789 group->shutdown = FALSE;
1791 RtlInitializeCriticalSection( &group->cs );
1792 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1794 list_init( &group->members );
1796 TRACE( "allocated group %p\n", group );
1798 *out = group;
1799 return STATUS_SUCCESS;
1802 /***********************************************************************
1803 * tp_group_shutdown (internal)
1805 * Marks the group object for shutdown.
1807 static void tp_group_shutdown( struct threadpool_group *group )
1809 group->shutdown = TRUE;
1812 /***********************************************************************
1813 * tp_group_release (internal)
1815 * Releases a reference to a group object.
1817 static BOOL tp_group_release( struct threadpool_group *group )
1819 if (interlocked_dec( &group->refcount ))
1820 return FALSE;
1822 TRACE( "destroying group %p\n", group );
1824 assert( group->shutdown );
1825 assert( list_empty( &group->members ) );
1827 group->cs.DebugInfo->Spare[0] = 0;
1828 RtlDeleteCriticalSection( &group->cs );
1830 RtlFreeHeap( GetProcessHeap(), 0, group );
1831 return TRUE;
1834 /***********************************************************************
1835 * tp_object_initialize (internal)
1837 * Initializes members of a threadpool object.
1839 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
1840 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
1842 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
1844 object->refcount = 1;
1845 object->shutdown = FALSE;
1847 object->pool = pool;
1848 object->group = NULL;
1849 object->userdata = userdata;
1850 object->group_cancel_callback = NULL;
1851 object->finalization_callback = NULL;
1852 object->may_run_long = 0;
1853 object->race_dll = NULL;
1855 memset( &object->group_entry, 0, sizeof(object->group_entry) );
1856 object->is_group_member = FALSE;
1858 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
1859 RtlInitializeConditionVariable( &object->finished_event );
1860 RtlInitializeConditionVariable( &object->group_finished_event );
1861 object->num_pending_callbacks = 0;
1862 object->num_running_callbacks = 0;
1863 object->num_associated_callbacks = 0;
1865 if (environment)
1867 if (environment->Version != 1 && environment->Version != 3)
1868 FIXME( "unsupported environment version %u\n", environment->Version );
1870 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
1871 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
1872 object->finalization_callback = environment->FinalizationCallback;
1873 object->may_run_long = environment->u.s.LongFunction != 0;
1874 object->race_dll = environment->RaceDll;
1876 if (environment->ActivationContext)
1877 FIXME( "activation context not supported yet\n" );
1879 if (environment->u.s.Persistent)
1880 FIXME( "persistent threads not supported yet\n" );
1883 if (object->race_dll)
1884 LdrAddRefDll( 0, object->race_dll );
1886 TRACE( "allocated object %p of type %u\n", object, object->type );
1888 /* For simple callbacks we have to run tp_object_submit before adding this object
1889 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1890 * will be set, and tp_object_submit would fail with an assertion. */
1892 if (is_simple_callback)
1893 tp_object_submit( object, FALSE );
1895 if (object->group)
1897 struct threadpool_group *group = object->group;
1898 interlocked_inc( &group->refcount );
1900 RtlEnterCriticalSection( &group->cs );
1901 list_add_tail( &group->members, &object->group_entry );
1902 object->is_group_member = TRUE;
1903 RtlLeaveCriticalSection( &group->cs );
1906 if (is_simple_callback)
1907 tp_object_release( object );
1910 /***********************************************************************
1911 * tp_object_submit (internal)
1913 * Submits a threadpool object to the associated threadpool. This
1914 * function has to be VOID because TpPostWork can never fail on Windows.
1916 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
1918 struct threadpool *pool = object->pool;
1919 NTSTATUS status = STATUS_UNSUCCESSFUL;
1921 assert( !object->shutdown );
1922 assert( !pool->shutdown );
1924 RtlEnterCriticalSection( &pool->cs );
1926 /* Start new worker threads if required. */
1927 if (pool->num_busy_workers >= pool->num_workers &&
1928 pool->num_workers < pool->max_workers)
1929 status = tp_new_worker_thread( pool );
1931 /* Queue work item and increment refcount. */
1932 interlocked_inc( &object->refcount );
1933 if (!object->num_pending_callbacks++)
1934 list_add_tail( &pool->pool, &object->pool_entry );
1936 /* Count how often the object was signaled. */
1937 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
1938 object->u.wait.signaled++;
1940 /* No new thread started - wake up one existing thread. */
1941 if (status != STATUS_SUCCESS)
1943 assert( pool->num_workers > 0 );
1944 RtlWakeConditionVariable( &pool->update_event );
1947 RtlLeaveCriticalSection( &pool->cs );
1950 /***********************************************************************
1951 * tp_object_cancel (internal)
1953 * Cancels all currently pending callbacks for a specific object.
1955 static void tp_object_cancel( struct threadpool_object *object )
1957 struct threadpool *pool = object->pool;
1958 LONG pending_callbacks = 0;
1960 RtlEnterCriticalSection( &pool->cs );
1961 if (object->num_pending_callbacks)
1963 pending_callbacks = object->num_pending_callbacks;
1964 object->num_pending_callbacks = 0;
1965 list_remove( &object->pool_entry );
1967 if (object->type == TP_OBJECT_TYPE_WAIT)
1968 object->u.wait.signaled = 0;
1970 RtlLeaveCriticalSection( &pool->cs );
1972 while (pending_callbacks--)
1973 tp_object_release( object );
1976 /***********************************************************************
1977 * tp_object_wait (internal)
1979 * Waits until all pending and running callbacks of a specific object
1980 * have been processed.
1982 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
1984 struct threadpool *pool = object->pool;
1986 RtlEnterCriticalSection( &pool->cs );
1987 if (group_wait)
1989 while (object->num_pending_callbacks || object->num_running_callbacks)
1990 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
1992 else
1994 while (object->num_pending_callbacks || object->num_associated_callbacks)
1995 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
1997 RtlLeaveCriticalSection( &pool->cs );
2000 /***********************************************************************
2001 * tp_object_prepare_shutdown (internal)
2003 * Prepares a threadpool object for shutdown.
2005 static void tp_object_prepare_shutdown( struct threadpool_object *object )
2007 if (object->type == TP_OBJECT_TYPE_TIMER)
2008 tp_timerqueue_unlock( object );
2009 else if (object->type == TP_OBJECT_TYPE_WAIT)
2010 tp_waitqueue_unlock( object );
2013 /***********************************************************************
2014 * tp_object_release (internal)
2016 * Releases a reference to a threadpool object.
2018 static BOOL tp_object_release( struct threadpool_object *object )
2020 if (interlocked_dec( &object->refcount ))
2021 return FALSE;
2023 TRACE( "destroying object %p of type %u\n", object, object->type );
2025 assert( object->shutdown );
2026 assert( !object->num_pending_callbacks );
2027 assert( !object->num_running_callbacks );
2028 assert( !object->num_associated_callbacks );
2030 /* release reference to the group */
2031 if (object->group)
2033 struct threadpool_group *group = object->group;
2035 RtlEnterCriticalSection( &group->cs );
2036 if (object->is_group_member)
2038 list_remove( &object->group_entry );
2039 object->is_group_member = FALSE;
2041 RtlLeaveCriticalSection( &group->cs );
2043 tp_group_release( group );
2046 tp_threadpool_unlock( object->pool );
2048 if (object->race_dll)
2049 LdrUnloadDll( object->race_dll );
2051 RtlFreeHeap( GetProcessHeap(), 0, object );
2052 return TRUE;
2055 /***********************************************************************
2056 * threadpool_worker_proc (internal)
2058 static void CALLBACK threadpool_worker_proc( void *param )
2060 TP_CALLBACK_INSTANCE *callback_instance;
2061 struct threadpool_instance instance;
2062 struct threadpool *pool = param;
2063 TP_WAIT_RESULT wait_result = 0;
2064 LARGE_INTEGER timeout;
2065 struct list *ptr;
2066 NTSTATUS status;
2068 TRACE( "starting worker thread for pool %p\n", pool );
2070 RtlEnterCriticalSection( &pool->cs );
2071 pool->num_busy_workers--;
2072 for (;;)
2074 while ((ptr = list_head( &pool->pool )))
2076 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2077 assert( object->num_pending_callbacks > 0 );
2079 /* If further pending callbacks are queued, move the work item to
2080 * the end of the pool list. Otherwise remove it from the pool. */
2081 list_remove( &object->pool_entry );
2082 if (--object->num_pending_callbacks)
2083 list_add_tail( &pool->pool, &object->pool_entry );
2085 /* For wait objects check if they were signaled or have timed out. */
2086 if (object->type == TP_OBJECT_TYPE_WAIT)
2088 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2089 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2092 /* Leave critical section and do the actual callback. */
2093 object->num_associated_callbacks++;
2094 object->num_running_callbacks++;
2095 pool->num_busy_workers++;
2096 RtlLeaveCriticalSection( &pool->cs );
2098 /* Initialize threadpool instance struct. */
2099 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2100 instance.object = object;
2101 instance.threadid = GetCurrentThreadId();
2102 instance.associated = TRUE;
2103 instance.may_run_long = object->may_run_long;
2104 instance.cleanup.critical_section = NULL;
2105 instance.cleanup.mutex = NULL;
2106 instance.cleanup.semaphore = NULL;
2107 instance.cleanup.semaphore_count = 0;
2108 instance.cleanup.event = NULL;
2109 instance.cleanup.library = NULL;
2111 switch (object->type)
2113 case TP_OBJECT_TYPE_SIMPLE:
2115 TRACE( "executing simple callback %p(%p, %p)\n",
2116 object->u.simple.callback, callback_instance, object->userdata );
2117 object->u.simple.callback( callback_instance, object->userdata );
2118 TRACE( "callback %p returned\n", object->u.simple.callback );
2119 break;
2122 case TP_OBJECT_TYPE_WORK:
2124 TRACE( "executing work callback %p(%p, %p, %p)\n",
2125 object->u.work.callback, callback_instance, object->userdata, object );
2126 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2127 TRACE( "callback %p returned\n", object->u.work.callback );
2128 break;
2131 case TP_OBJECT_TYPE_TIMER:
2133 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2134 object->u.timer.callback, callback_instance, object->userdata, object );
2135 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2136 TRACE( "callback %p returned\n", object->u.timer.callback );
2137 break;
2140 case TP_OBJECT_TYPE_WAIT:
2142 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2143 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2144 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2145 TRACE( "callback %p returned\n", object->u.wait.callback );
2146 break;
2149 default:
2150 assert(0);
2151 break;
2154 /* Execute finalization callback. */
2155 if (object->finalization_callback)
2157 TRACE( "executing finalization callback %p(%p, %p)\n",
2158 object->finalization_callback, callback_instance, object->userdata );
2159 object->finalization_callback( callback_instance, object->userdata );
2160 TRACE( "callback %p returned\n", object->finalization_callback );
2163 /* Execute cleanup tasks. */
2164 if (instance.cleanup.critical_section)
2166 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2168 if (instance.cleanup.mutex)
2170 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2171 if (status != STATUS_SUCCESS) goto skip_cleanup;
2173 if (instance.cleanup.semaphore)
2175 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2176 if (status != STATUS_SUCCESS) goto skip_cleanup;
2178 if (instance.cleanup.event)
2180 status = NtSetEvent( instance.cleanup.event, NULL );
2181 if (status != STATUS_SUCCESS) goto skip_cleanup;
2183 if (instance.cleanup.library)
2185 LdrUnloadDll( instance.cleanup.library );
2188 skip_cleanup:
2189 RtlEnterCriticalSection( &pool->cs );
2190 pool->num_busy_workers--;
2192 /* Simple callbacks are automatically shutdown after execution. */
2193 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2195 tp_object_prepare_shutdown( object );
2196 object->shutdown = TRUE;
2199 object->num_running_callbacks--;
2200 if (!object->num_pending_callbacks && !object->num_running_callbacks)
2201 RtlWakeAllConditionVariable( &object->group_finished_event );
2203 if (instance.associated)
2205 object->num_associated_callbacks--;
2206 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2207 RtlWakeAllConditionVariable( &object->finished_event );
2210 tp_object_release( object );
2213 /* Shutdown worker thread if requested. */
2214 if (pool->shutdown)
2215 break;
2217 /* Wait for new tasks or until the timeout expires. A thread only terminates
2218 * when no new tasks are available, and the number of threads can be
2219 * decreased without violating the min_workers limit. An exception is when
2220 * min_workers == 0, then objcount is used to detect if the last thread
2221 * can be terminated. */
2222 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2223 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2224 !list_head( &pool->pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2225 (!pool->min_workers && !pool->objcount)))
2227 break;
2230 pool->num_workers--;
2231 RtlLeaveCriticalSection( &pool->cs );
2233 TRACE( "terminating worker thread for pool %p\n", pool );
2234 tp_threadpool_release( pool );
2235 RtlExitUserThread( 0 );
2238 /***********************************************************************
2239 * TpAllocCleanupGroup (NTDLL.@)
2241 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2243 TRACE( "%p\n", out );
2245 return tp_group_alloc( (struct threadpool_group **)out );
2248 /***********************************************************************
2249 * TpAllocPool (NTDLL.@)
2251 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2253 TRACE( "%p %p\n", out, reserved );
2255 if (reserved)
2256 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2258 return tp_threadpool_alloc( (struct threadpool **)out );
2261 /***********************************************************************
2262 * TpAllocTimer (NTDLL.@)
2264 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2265 TP_CALLBACK_ENVIRON *environment )
2267 struct threadpool_object *object;
2268 struct threadpool *pool;
2269 NTSTATUS status;
2271 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2273 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2274 if (!object)
2275 return STATUS_NO_MEMORY;
2277 status = tp_threadpool_lock( &pool, environment );
2278 if (status)
2280 RtlFreeHeap( GetProcessHeap(), 0, object );
2281 return status;
2284 object->type = TP_OBJECT_TYPE_TIMER;
2285 object->u.timer.callback = callback;
2287 status = tp_timerqueue_lock( object );
2288 if (status)
2290 tp_threadpool_unlock( pool );
2291 RtlFreeHeap( GetProcessHeap(), 0, object );
2292 return status;
2295 tp_object_initialize( object, pool, userdata, environment );
2297 *out = (TP_TIMER *)object;
2298 return STATUS_SUCCESS;
2301 /***********************************************************************
2302 * TpAllocWait (NTDLL.@)
2304 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2305 TP_CALLBACK_ENVIRON *environment )
2307 struct threadpool_object *object;
2308 struct threadpool *pool;
2309 NTSTATUS status;
2311 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2313 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2314 if (!object)
2315 return STATUS_NO_MEMORY;
2317 status = tp_threadpool_lock( &pool, environment );
2318 if (status)
2320 RtlFreeHeap( GetProcessHeap(), 0, object );
2321 return status;
2324 object->type = TP_OBJECT_TYPE_WAIT;
2325 object->u.wait.callback = callback;
2327 status = tp_waitqueue_lock( object );
2328 if (status)
2330 tp_threadpool_unlock( pool );
2331 RtlFreeHeap( GetProcessHeap(), 0, object );
2332 return status;
2335 tp_object_initialize( object, pool, userdata, environment );
2337 *out = (TP_WAIT *)object;
2338 return STATUS_SUCCESS;
2341 /***********************************************************************
2342 * TpAllocWork (NTDLL.@)
2344 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2345 TP_CALLBACK_ENVIRON *environment )
2347 struct threadpool_object *object;
2348 struct threadpool *pool;
2349 NTSTATUS status;
2351 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2353 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2354 if (!object)
2355 return STATUS_NO_MEMORY;
2357 status = tp_threadpool_lock( &pool, environment );
2358 if (status)
2360 RtlFreeHeap( GetProcessHeap(), 0, object );
2361 return status;
2364 object->type = TP_OBJECT_TYPE_WORK;
2365 object->u.work.callback = callback;
2366 tp_object_initialize( object, pool, userdata, environment );
2368 *out = (TP_WORK *)object;
2369 return STATUS_SUCCESS;
2372 /***********************************************************************
2373 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2375 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2377 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2379 TRACE( "%p %p\n", instance, crit );
2381 if (!this->cleanup.critical_section)
2382 this->cleanup.critical_section = crit;
2385 /***********************************************************************
2386 * TpCallbackMayRunLong (NTDLL.@)
2388 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2390 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2391 struct threadpool_object *object = this->object;
2392 struct threadpool *pool;
2393 NTSTATUS status = STATUS_SUCCESS;
2395 TRACE( "%p\n", instance );
2397 if (this->threadid != GetCurrentThreadId())
2399 ERR("called from wrong thread, ignoring\n");
2400 return STATUS_UNSUCCESSFUL; /* FIXME */
2403 if (this->may_run_long)
2404 return STATUS_SUCCESS;
2406 pool = object->pool;
2407 RtlEnterCriticalSection( &pool->cs );
2409 /* Start new worker threads if required. */
2410 if (pool->num_busy_workers >= pool->num_workers)
2412 if (pool->num_workers < pool->max_workers)
2414 status = tp_new_worker_thread( pool );
2416 else
2418 status = STATUS_TOO_MANY_THREADS;
2422 RtlLeaveCriticalSection( &pool->cs );
2423 this->may_run_long = TRUE;
2424 return status;
2427 /***********************************************************************
2428 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2430 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2432 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2434 TRACE( "%p %p\n", instance, mutex );
2436 if (!this->cleanup.mutex)
2437 this->cleanup.mutex = mutex;
2440 /***********************************************************************
2441 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2443 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2445 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2447 TRACE( "%p %p %u\n", instance, semaphore, count );
2449 if (!this->cleanup.semaphore)
2451 this->cleanup.semaphore = semaphore;
2452 this->cleanup.semaphore_count = count;
2456 /***********************************************************************
2457 * TpCallbackSetEventOnCompletion (NTDLL.@)
2459 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2461 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2463 TRACE( "%p %p\n", instance, event );
2465 if (!this->cleanup.event)
2466 this->cleanup.event = event;
2469 /***********************************************************************
2470 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2472 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2474 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2476 TRACE( "%p %p\n", instance, module );
2478 if (!this->cleanup.library)
2479 this->cleanup.library = module;
2482 /***********************************************************************
2483 * TpDisassociateCallback (NTDLL.@)
2485 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2487 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2488 struct threadpool_object *object = this->object;
2489 struct threadpool *pool;
2491 TRACE( "%p\n", instance );
2493 if (this->threadid != GetCurrentThreadId())
2495 ERR("called from wrong thread, ignoring\n");
2496 return;
2499 if (!this->associated)
2500 return;
2502 pool = object->pool;
2503 RtlEnterCriticalSection( &pool->cs );
2505 object->num_associated_callbacks--;
2506 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2507 RtlWakeAllConditionVariable( &object->finished_event );
2509 RtlLeaveCriticalSection( &pool->cs );
2510 this->associated = FALSE;
2513 /***********************************************************************
2514 * TpIsTimerSet (NTDLL.@)
2516 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2518 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2520 TRACE( "%p\n", timer );
2522 return this->u.timer.timer_set;
2525 /***********************************************************************
2526 * TpPostWork (NTDLL.@)
2528 VOID WINAPI TpPostWork( TP_WORK *work )
2530 struct threadpool_object *this = impl_from_TP_WORK( work );
2532 TRACE( "%p\n", work );
2534 tp_object_submit( this, FALSE );
2537 /***********************************************************************
2538 * TpReleaseCleanupGroup (NTDLL.@)
2540 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2542 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2544 TRACE( "%p\n", group );
2546 tp_group_shutdown( this );
2547 tp_group_release( this );
2550 /***********************************************************************
2551 * TpReleaseCleanupGroupMembers (NTDLL.@)
2553 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2555 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2556 struct threadpool_object *object, *next;
2557 struct list members;
2559 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2561 RtlEnterCriticalSection( &this->cs );
2563 /* Unset group, increase references, and mark objects for shutdown */
2564 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2566 assert( object->group == this );
2567 assert( object->is_group_member );
2569 if (interlocked_inc( &object->refcount ) == 1)
2571 /* Object is basically already destroyed, but group reference
2572 * was not deleted yet. We can safely ignore this object. */
2573 interlocked_dec( &object->refcount );
2574 list_remove( &object->group_entry );
2575 object->is_group_member = FALSE;
2576 continue;
2579 object->is_group_member = FALSE;
2580 tp_object_prepare_shutdown( object );
2583 /* Move members to a new temporary list */
2584 list_init( &members );
2585 list_move_tail( &members, &this->members );
2587 RtlLeaveCriticalSection( &this->cs );
2589 /* Cancel pending callbacks if requested */
2590 if (cancel_pending)
2592 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2594 tp_object_cancel( object );
2598 /* Wait for remaining callbacks to finish */
2599 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2601 tp_object_wait( object, TRUE );
2603 if (!object->shutdown)
2605 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2606 if (cancel_pending && object->group_cancel_callback)
2608 TRACE( "executing group cancel callback %p(%p, %p)\n",
2609 object->group_cancel_callback, object->userdata, userdata );
2610 object->group_cancel_callback( object->userdata, userdata );
2611 TRACE( "callback %p returned\n", object->group_cancel_callback );
2614 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2615 tp_object_release( object );
2618 object->shutdown = TRUE;
2619 tp_object_release( object );
2623 /***********************************************************************
2624 * TpReleasePool (NTDLL.@)
2626 VOID WINAPI TpReleasePool( TP_POOL *pool )
2628 struct threadpool *this = impl_from_TP_POOL( pool );
2630 TRACE( "%p\n", pool );
2632 tp_threadpool_shutdown( this );
2633 tp_threadpool_release( this );
2636 /***********************************************************************
2637 * TpReleaseTimer (NTDLL.@)
2639 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2641 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2643 TRACE( "%p\n", timer );
2645 tp_object_prepare_shutdown( this );
2646 this->shutdown = TRUE;
2647 tp_object_release( this );
2650 /***********************************************************************
2651 * TpReleaseWait (NTDLL.@)
2653 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
2655 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2657 TRACE( "%p\n", wait );
2659 tp_object_prepare_shutdown( this );
2660 this->shutdown = TRUE;
2661 tp_object_release( this );
2664 /***********************************************************************
2665 * TpReleaseWork (NTDLL.@)
2667 VOID WINAPI TpReleaseWork( TP_WORK *work )
2669 struct threadpool_object *this = impl_from_TP_WORK( work );
2671 TRACE( "%p\n", work );
2673 tp_object_prepare_shutdown( this );
2674 this->shutdown = TRUE;
2675 tp_object_release( this );
2678 /***********************************************************************
2679 * TpSetPoolMaxThreads (NTDLL.@)
2681 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
2683 struct threadpool *this = impl_from_TP_POOL( pool );
2685 TRACE( "%p %u\n", pool, maximum );
2687 RtlEnterCriticalSection( &this->cs );
2688 this->max_workers = max( maximum, 1 );
2689 this->min_workers = min( this->min_workers, this->max_workers );
2690 RtlLeaveCriticalSection( &this->cs );
2693 /***********************************************************************
2694 * TpSetPoolMinThreads (NTDLL.@)
2696 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
2698 struct threadpool *this = impl_from_TP_POOL( pool );
2699 NTSTATUS status = STATUS_SUCCESS;
2701 TRACE( "%p %u\n", pool, minimum );
2703 RtlEnterCriticalSection( &this->cs );
2705 while (this->num_workers < minimum)
2707 status = tp_new_worker_thread( this );
2708 if (status != STATUS_SUCCESS)
2709 break;
2712 if (status == STATUS_SUCCESS)
2714 this->min_workers = minimum;
2715 this->max_workers = max( this->min_workers, this->max_workers );
2718 RtlLeaveCriticalSection( &this->cs );
2719 return !status;
2722 /***********************************************************************
2723 * TpSetTimer (NTDLL.@)
2725 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
2727 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2728 struct threadpool_object *other_timer;
2729 BOOL submit_timer = FALSE;
2730 ULONGLONG timestamp;
2732 TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
2734 RtlEnterCriticalSection( &timerqueue.cs );
2736 assert( this->u.timer.timer_initialized );
2737 this->u.timer.timer_set = timeout != NULL;
2739 /* Convert relative timeout to absolute timestamp and handle a timeout
2740 * of zero, which means that the timer is submitted immediately. */
2741 if (timeout)
2743 timestamp = timeout->QuadPart;
2744 if ((LONGLONG)timestamp < 0)
2746 LARGE_INTEGER now;
2747 NtQuerySystemTime( &now );
2748 timestamp = now.QuadPart - timestamp;
2750 else if (!timestamp)
2752 if (!period)
2753 timeout = NULL;
2754 else
2756 LARGE_INTEGER now;
2757 NtQuerySystemTime( &now );
2758 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
2760 submit_timer = TRUE;
2764 /* First remove existing timeout. */
2765 if (this->u.timer.timer_pending)
2767 list_remove( &this->u.timer.timer_entry );
2768 this->u.timer.timer_pending = FALSE;
2771 /* If the timer was enabled, then add it back to the queue. */
2772 if (timeout)
2774 this->u.timer.timeout = timestamp;
2775 this->u.timer.period = period;
2776 this->u.timer.window_length = window_length;
2778 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
2779 struct threadpool_object, u.timer.timer_entry )
2781 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
2782 if (this->u.timer.timeout < other_timer->u.timer.timeout)
2783 break;
2785 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
2787 /* Wake up the timer thread when the timeout has to be updated. */
2788 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
2789 RtlWakeAllConditionVariable( &timerqueue.update_event );
2791 this->u.timer.timer_pending = TRUE;
2794 RtlLeaveCriticalSection( &timerqueue.cs );
2796 if (submit_timer)
2797 tp_object_submit( this, FALSE );
2800 /***********************************************************************
2801 * TpSetWait (NTDLL.@)
2803 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
2805 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2806 ULONGLONG timestamp = TIMEOUT_INFINITE;
2807 BOOL submit_wait = FALSE;
2809 TRACE( "%p %p %p\n", wait, handle, timeout );
2811 RtlEnterCriticalSection( &waitqueue.cs );
2813 assert( this->u.wait.bucket );
2814 this->u.wait.handle = handle;
2816 if (handle || this->u.wait.wait_pending)
2818 struct waitqueue_bucket *bucket = this->u.wait.bucket;
2819 list_remove( &this->u.wait.wait_entry );
2821 /* Convert relative timeout to absolute timestamp. */
2822 if (handle && timeout)
2824 timestamp = timeout->QuadPart;
2825 if ((LONGLONG)timestamp < 0)
2827 LARGE_INTEGER now;
2828 NtQuerySystemTime( &now );
2829 timestamp = now.QuadPart - timestamp;
2831 else if (!timestamp)
2833 submit_wait = TRUE;
2834 handle = NULL;
2838 /* Add wait object back into one of the queues. */
2839 if (handle)
2841 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
2842 this->u.wait.wait_pending = TRUE;
2843 this->u.wait.timeout = timestamp;
2845 else
2847 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
2848 this->u.wait.wait_pending = FALSE;
2851 /* Wake up the wait queue thread. */
2852 NtSetEvent( bucket->update_event, NULL );
2855 RtlLeaveCriticalSection( &waitqueue.cs );
2857 if (submit_wait)
2858 tp_object_submit( this, FALSE );
2861 /***********************************************************************
2862 * TpSimpleTryPost (NTDLL.@)
2864 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
2865 TP_CALLBACK_ENVIRON *environment )
2867 struct threadpool_object *object;
2868 struct threadpool *pool;
2869 NTSTATUS status;
2871 TRACE( "%p %p %p\n", callback, userdata, environment );
2873 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2874 if (!object)
2875 return STATUS_NO_MEMORY;
2877 status = tp_threadpool_lock( &pool, environment );
2878 if (status)
2880 RtlFreeHeap( GetProcessHeap(), 0, object );
2881 return status;
2884 object->type = TP_OBJECT_TYPE_SIMPLE;
2885 object->u.simple.callback = callback;
2886 tp_object_initialize( object, pool, userdata, environment );
2888 return STATUS_SUCCESS;
2891 /***********************************************************************
2892 * TpWaitForTimer (NTDLL.@)
2894 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
2896 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2898 TRACE( "%p %d\n", timer, cancel_pending );
2900 if (cancel_pending)
2901 tp_object_cancel( this );
2902 tp_object_wait( this, FALSE );
2905 /***********************************************************************
2906 * TpWaitForWait (NTDLL.@)
2908 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
2910 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2912 TRACE( "%p %d\n", wait, cancel_pending );
2914 if (cancel_pending)
2915 tp_object_cancel( this );
2916 tp_object_wait( this, FALSE );
2919 /***********************************************************************
2920 * TpWaitForWork (NTDLL.@)
2922 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
2924 struct threadpool_object *this = impl_from_TP_WORK( work );
2926 TRACE( "%p %u\n", work, cancel_pending );
2928 if (cancel_pending)
2929 tp_object_cancel( this );
2930 tp_object_wait( this, FALSE );