storage.dll16: Fix get_nth_next_small_blocknr.
[wine.git] / dlls / ntdll / threadpool.c
blob6063d51d9f9712f7ac0ca3e26d655da13c79eff9
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include "config.h"
23 #include "wine/port.h"
25 #include <assert.h>
26 #include <stdarg.h>
27 #include <limits.h>
29 #define NONAMELESSUNION
30 #include "ntstatus.h"
31 #define WIN32_NO_STATUS
32 #include "winternl.h"
34 #include "wine/debug.h"
35 #include "wine/list.h"
37 #include "ntdll_misc.h"
39 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
42 * Old thread pooling API
45 struct rtl_work_item
47 PRTL_WORK_ITEM_ROUTINE function;
48 PVOID context;
51 #define EXPIRE_NEVER (~(ULONGLONG)0)
52 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
54 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
56 static struct
58 HANDLE compl_port;
59 RTL_CRITICAL_SECTION threadpool_compl_cs;
61 old_threadpool =
63 NULL, /* compl_port */
64 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
67 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
69 0, 0, &old_threadpool.threadpool_compl_cs,
70 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
71 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
74 struct wait_work_item
76 HANDLE Object;
77 HANDLE CancelEvent;
78 WAITORTIMERCALLBACK Callback;
79 PVOID Context;
80 ULONG Milliseconds;
81 ULONG Flags;
82 HANDLE CompletionEvent;
83 LONG DeleteCount;
84 BOOLEAN CallbackInProgress;
87 struct timer_queue;
88 struct queue_timer
90 struct timer_queue *q;
91 struct list entry;
92 ULONG runcount; /* number of callbacks pending execution */
93 RTL_WAITORTIMERCALLBACKFUNC callback;
94 PVOID param;
95 DWORD period;
96 ULONG flags;
97 ULONGLONG expire;
98 BOOL destroy; /* timer should be deleted; once set, never unset */
99 HANDLE event; /* removal event */
102 struct timer_queue
104 DWORD magic;
105 RTL_CRITICAL_SECTION cs;
106 struct list timers; /* sorted by expiration time */
107 BOOL quit; /* queue should be deleted; once set, never unset */
108 HANDLE event;
109 HANDLE thread;
113 * Object-oriented thread pooling API
116 #define THREADPOOL_WORKER_TIMEOUT 5000
117 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
119 /* internal threadpool representation */
120 struct threadpool
122 LONG refcount;
123 LONG objcount;
124 BOOL shutdown;
125 CRITICAL_SECTION cs;
126 /* pool of work items, locked via .cs */
127 struct list pool;
128 RTL_CONDITION_VARIABLE update_event;
129 /* information about worker threads, locked via .cs */
130 int max_workers;
131 int min_workers;
132 int num_workers;
133 int num_busy_workers;
136 enum threadpool_objtype
138 TP_OBJECT_TYPE_SIMPLE,
139 TP_OBJECT_TYPE_WORK,
140 TP_OBJECT_TYPE_TIMER,
141 TP_OBJECT_TYPE_WAIT
144 /* internal threadpool object representation */
145 struct threadpool_object
147 LONG refcount;
148 BOOL shutdown;
149 /* read-only information */
150 enum threadpool_objtype type;
151 struct threadpool *pool;
152 struct threadpool_group *group;
153 PVOID userdata;
154 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
155 PTP_SIMPLE_CALLBACK finalization_callback;
156 BOOL may_run_long;
157 HMODULE race_dll;
158 /* information about the group, locked via .group->cs */
159 struct list group_entry;
160 BOOL is_group_member;
161 /* information about the pool, locked via .pool->cs */
162 struct list pool_entry;
163 RTL_CONDITION_VARIABLE finished_event;
164 RTL_CONDITION_VARIABLE group_finished_event;
165 LONG num_pending_callbacks;
166 LONG num_running_callbacks;
167 LONG num_associated_callbacks;
168 /* arguments for callback */
169 union
171 struct
173 PTP_SIMPLE_CALLBACK callback;
174 } simple;
175 struct
177 PTP_WORK_CALLBACK callback;
178 } work;
179 struct
181 PTP_TIMER_CALLBACK callback;
182 /* information about the timer, locked via timerqueue.cs */
183 BOOL timer_initialized;
184 BOOL timer_pending;
185 struct list timer_entry;
186 BOOL timer_set;
187 ULONGLONG timeout;
188 LONG period;
189 LONG window_length;
190 } timer;
191 struct
193 PTP_WAIT_CALLBACK callback;
194 LONG signaled;
195 /* information about the wait object, locked via waitqueue.cs */
196 struct waitqueue_bucket *bucket;
197 BOOL wait_pending;
198 struct list wait_entry;
199 ULONGLONG timeout;
200 HANDLE handle;
201 } wait;
202 } u;
205 /* internal threadpool instance representation */
206 struct threadpool_instance
208 struct threadpool_object *object;
209 DWORD threadid;
210 BOOL associated;
211 BOOL may_run_long;
212 struct
214 CRITICAL_SECTION *critical_section;
215 HANDLE mutex;
216 HANDLE semaphore;
217 LONG semaphore_count;
218 HANDLE event;
219 HMODULE library;
220 } cleanup;
223 /* internal threadpool group representation */
224 struct threadpool_group
226 LONG refcount;
227 BOOL shutdown;
228 CRITICAL_SECTION cs;
229 /* list of group members, locked via .cs */
230 struct list members;
233 /* global timerqueue object */
234 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
236 static struct
238 CRITICAL_SECTION cs;
239 LONG objcount;
240 BOOL thread_running;
241 struct list pending_timers;
242 RTL_CONDITION_VARIABLE update_event;
244 timerqueue =
246 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
247 0, /* objcount */
248 FALSE, /* thread_running */
249 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
250 RTL_CONDITION_VARIABLE_INIT /* update_event */
253 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
255 0, 0, &timerqueue.cs,
256 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
257 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
260 /* global waitqueue object */
261 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
263 static struct
265 CRITICAL_SECTION cs;
266 LONG num_buckets;
267 struct list buckets;
269 waitqueue =
271 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
272 0, /* num_buckets */
273 LIST_INIT( waitqueue.buckets ) /* buckets */
276 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
278 0, 0, &waitqueue.cs,
279 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
280 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
283 struct waitqueue_bucket
285 struct list bucket_entry;
286 LONG objcount;
287 struct list reserved;
288 struct list waiting;
289 HANDLE update_event;
292 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
294 return (struct threadpool *)pool;
297 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
299 struct threadpool_object *object = (struct threadpool_object *)work;
300 assert( object->type == TP_OBJECT_TYPE_WORK );
301 return object;
304 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
306 struct threadpool_object *object = (struct threadpool_object *)timer;
307 assert( object->type == TP_OBJECT_TYPE_TIMER );
308 return object;
311 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
313 struct threadpool_object *object = (struct threadpool_object *)wait;
314 assert( object->type == TP_OBJECT_TYPE_WAIT );
315 return object;
318 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
320 return (struct threadpool_group *)group;
323 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
325 return (struct threadpool_instance *)instance;
328 static void CALLBACK threadpool_worker_proc( void *param );
329 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
330 static void tp_object_prepare_shutdown( struct threadpool_object *object );
331 static BOOL tp_object_release( struct threadpool_object *object );
332 static struct threadpool *default_threadpool = NULL;
334 static inline LONG interlocked_inc( PLONG dest )
336 return interlocked_xchg_add( dest, 1 ) + 1;
339 static inline LONG interlocked_dec( PLONG dest )
341 return interlocked_xchg_add( dest, -1 ) - 1;
344 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
346 struct rtl_work_item *item = userdata;
348 TRACE("executing %p(%p)\n", item->function, item->context);
349 item->function( item->context );
351 RtlFreeHeap( GetProcessHeap(), 0, item );
354 /***********************************************************************
355 * RtlQueueWorkItem (NTDLL.@)
357 * Queues a work item into a thread in the thread pool.
359 * PARAMS
360 * function [I] Work function to execute.
361 * context [I] Context to pass to the work function when it is executed.
362 * flags [I] Flags. See notes.
364 * RETURNS
365 * Success: STATUS_SUCCESS.
366 * Failure: Any NTSTATUS code.
368 * NOTES
369 * Flags can be one or more of the following:
370 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
371 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
372 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
373 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
374 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
376 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
378 TP_CALLBACK_ENVIRON environment;
379 struct rtl_work_item *item;
380 NTSTATUS status;
382 TRACE( "%p %p %u\n", function, context, flags );
384 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
385 if (!item)
386 return STATUS_NO_MEMORY;
388 memset( &environment, 0, sizeof(environment) );
389 environment.Version = 1;
390 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
391 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
393 item->function = function;
394 item->context = context;
396 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
397 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
398 return status;
401 /***********************************************************************
402 * iocp_poller - get completion events and run callbacks
404 static DWORD CALLBACK iocp_poller(LPVOID Arg)
406 HANDLE cport = Arg;
408 while( TRUE )
410 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
411 LPVOID overlapped;
412 IO_STATUS_BLOCK iosb;
413 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
414 if (res)
416 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
418 else
420 DWORD transferred = 0;
421 DWORD err = 0;
423 if (iosb.u.Status == STATUS_SUCCESS)
424 transferred = iosb.Information;
425 else
426 err = RtlNtStatusToDosError(iosb.u.Status);
428 callback( err, transferred, overlapped );
431 return 0;
434 /***********************************************************************
435 * RtlSetIoCompletionCallback (NTDLL.@)
437 * Binds a handle to a thread pool's completion port, and possibly
438 * starts a non-I/O thread to monitor this port and call functions back.
440 * PARAMS
441 * FileHandle [I] Handle to bind to a completion port.
442 * Function [I] Callback function to call on I/O completions.
443 * Flags [I] Not used.
445 * RETURNS
446 * Success: STATUS_SUCCESS.
447 * Failure: Any NTSTATUS code.
450 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
452 IO_STATUS_BLOCK iosb;
453 FILE_COMPLETION_INFORMATION info;
455 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
457 if (!old_threadpool.compl_port)
459 NTSTATUS res = STATUS_SUCCESS;
461 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
462 if (!old_threadpool.compl_port)
464 HANDLE cport;
466 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
467 if (!res)
469 /* FIXME native can start additional threads in case of e.g. hung callback function. */
470 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
471 if (!res)
472 old_threadpool.compl_port = cport;
473 else
474 NtClose( cport );
477 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
478 if (res) return res;
481 info.CompletionPort = old_threadpool.compl_port;
482 info.CompletionKey = (ULONG_PTR)Function;
484 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
487 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
489 if (timeout == INFINITE) return NULL;
490 pTime->QuadPart = (ULONGLONG)timeout * -10000;
491 return pTime;
494 static void delete_wait_work_item(struct wait_work_item *wait_work_item)
496 NtClose( wait_work_item->CancelEvent );
497 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
500 static DWORD CALLBACK wait_thread_proc(LPVOID Arg)
502 struct wait_work_item *wait_work_item = Arg;
503 NTSTATUS status;
504 BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0;
505 HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent };
506 LARGE_INTEGER timeout;
507 HANDLE completion_event;
509 TRACE("\n");
511 while (TRUE)
513 status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable,
514 get_nt_timeout( &timeout, wait_work_item->Milliseconds ) );
515 if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT)
517 BOOLEAN TimerOrWaitFired;
519 if (status == STATUS_WAIT_0)
521 TRACE( "object %p signaled, calling callback %p with context %p\n",
522 wait_work_item->Object, wait_work_item->Callback,
523 wait_work_item->Context );
524 TimerOrWaitFired = FALSE;
526 else
528 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
529 wait_work_item->Object, wait_work_item->Callback,
530 wait_work_item->Context );
531 TimerOrWaitFired = TRUE;
533 wait_work_item->CallbackInProgress = TRUE;
534 wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired );
535 wait_work_item->CallbackInProgress = FALSE;
537 if (wait_work_item->Flags & WT_EXECUTEONLYONCE)
538 break;
540 else if (status != STATUS_USER_APC)
541 break;
544 completion_event = wait_work_item->CompletionEvent;
545 if (completion_event) NtSetEvent( completion_event, NULL );
547 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
548 delete_wait_work_item( wait_work_item );
550 return 0;
553 /***********************************************************************
554 * RtlRegisterWait (NTDLL.@)
556 * Registers a wait for a handle to become signaled.
558 * PARAMS
559 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
560 * Object [I] Object to wait to become signaled.
561 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
562 * Context [I] Context to pass to the callback function when it is executed.
563 * Milliseconds [I] Number of milliseconds to wait before timing out.
564 * Flags [I] Flags. See notes.
566 * RETURNS
567 * Success: STATUS_SUCCESS.
568 * Failure: Any NTSTATUS code.
570 * NOTES
571 * Flags can be one or more of the following:
572 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
573 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
574 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
575 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
576 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
578 NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object,
579 RTL_WAITORTIMERCALLBACKFUNC Callback,
580 PVOID Context, ULONG Milliseconds, ULONG Flags)
582 struct wait_work_item *wait_work_item;
583 NTSTATUS status;
585 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags );
587 wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) );
588 if (!wait_work_item)
589 return STATUS_NO_MEMORY;
591 wait_work_item->Object = Object;
592 wait_work_item->Callback = Callback;
593 wait_work_item->Context = Context;
594 wait_work_item->Milliseconds = Milliseconds;
595 wait_work_item->Flags = Flags;
596 wait_work_item->CallbackInProgress = FALSE;
597 wait_work_item->DeleteCount = 0;
598 wait_work_item->CompletionEvent = NULL;
600 status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
601 if (status != STATUS_SUCCESS)
603 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
604 return status;
607 Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD |
608 WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION);
609 status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags );
610 if (status != STATUS_SUCCESS)
612 delete_wait_work_item( wait_work_item );
613 return status;
616 *NewWaitObject = wait_work_item;
617 return status;
620 /***********************************************************************
621 * RtlDeregisterWaitEx (NTDLL.@)
623 * Cancels a wait operation and frees the resources associated with calling
624 * RtlRegisterWait().
626 * PARAMS
627 * WaitObject [I] Handle to the wait object to free.
629 * RETURNS
630 * Success: STATUS_SUCCESS.
631 * Failure: Any NTSTATUS code.
633 NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent)
635 struct wait_work_item *wait_work_item = WaitHandle;
636 NTSTATUS status = STATUS_SUCCESS;
638 TRACE( "(%p)\n", WaitHandle );
640 if (WaitHandle == NULL)
641 return STATUS_INVALID_HANDLE;
643 NtSetEvent( wait_work_item->CancelEvent, NULL );
644 if (wait_work_item->CallbackInProgress)
646 if (CompletionEvent != NULL)
648 if (CompletionEvent == INVALID_HANDLE_VALUE)
650 status = NtCreateEvent( &CompletionEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
651 if (status != STATUS_SUCCESS)
652 return status;
653 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
654 if (wait_work_item->CallbackInProgress)
655 NtWaitForSingleObject( CompletionEvent, FALSE, NULL );
656 NtClose( CompletionEvent );
658 else
660 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
661 if (wait_work_item->CallbackInProgress)
662 status = STATUS_PENDING;
665 else
666 status = STATUS_PENDING;
669 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
671 status = STATUS_SUCCESS;
672 delete_wait_work_item( wait_work_item );
675 return status;
678 /***********************************************************************
679 * RtlDeregisterWait (NTDLL.@)
681 * Cancels a wait operation and frees the resources associated with calling
682 * RtlRegisterWait().
684 * PARAMS
685 * WaitObject [I] Handle to the wait object to free.
687 * RETURNS
688 * Success: STATUS_SUCCESS.
689 * Failure: Any NTSTATUS code.
691 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
693 return RtlDeregisterWaitEx(WaitHandle, NULL);
697 /************************** Timer Queue Impl **************************/
699 static void queue_remove_timer(struct queue_timer *t)
701 /* We MUST hold the queue cs while calling this function. This ensures
702 that we cannot queue another callback for this timer. The runcount
703 being zero makes sure we don't have any already queued. */
704 struct timer_queue *q = t->q;
706 assert(t->runcount == 0);
707 assert(t->destroy);
709 list_remove(&t->entry);
710 if (t->event)
711 NtSetEvent(t->event, NULL);
712 RtlFreeHeap(GetProcessHeap(), 0, t);
714 if (q->quit && list_empty(&q->timers))
715 NtSetEvent(q->event, NULL);
718 static void timer_cleanup_callback(struct queue_timer *t)
720 struct timer_queue *q = t->q;
721 RtlEnterCriticalSection(&q->cs);
723 assert(0 < t->runcount);
724 --t->runcount;
726 if (t->destroy && t->runcount == 0)
727 queue_remove_timer(t);
729 RtlLeaveCriticalSection(&q->cs);
732 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
734 struct queue_timer *t = p;
735 t->callback(t->param, TRUE);
736 timer_cleanup_callback(t);
737 return 0;
740 static inline ULONGLONG queue_current_time(void)
742 LARGE_INTEGER now, freq;
743 NtQueryPerformanceCounter(&now, &freq);
744 return now.QuadPart * 1000 / freq.QuadPart;
747 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
748 BOOL set_event)
750 /* We MUST hold the queue cs while calling this function. */
751 struct timer_queue *q = t->q;
752 struct list *ptr = &q->timers;
754 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
756 if (time != EXPIRE_NEVER)
757 LIST_FOR_EACH(ptr, &q->timers)
759 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
760 if (time < cur->expire)
761 break;
763 list_add_before(ptr, &t->entry);
765 t->expire = time;
767 /* If we insert at the head of the list, we need to expire sooner
768 than expected. */
769 if (set_event && &t->entry == list_head(&q->timers))
770 NtSetEvent(q->event, NULL);
773 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
774 BOOL set_event)
776 /* We MUST hold the queue cs while calling this function. */
777 list_remove(&t->entry);
778 queue_add_timer(t, time, set_event);
781 static void queue_timer_expire(struct timer_queue *q)
783 struct queue_timer *t = NULL;
785 RtlEnterCriticalSection(&q->cs);
786 if (list_head(&q->timers))
788 ULONGLONG now, next;
789 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
790 if (!t->destroy && t->expire <= ((now = queue_current_time())))
792 ++t->runcount;
793 if (t->period)
795 next = t->expire + t->period;
796 /* avoid trigger cascade if overloaded / hibernated */
797 if (next < now)
798 next = now + t->period;
800 else
801 next = EXPIRE_NEVER;
802 queue_move_timer(t, next, FALSE);
804 else
805 t = NULL;
807 RtlLeaveCriticalSection(&q->cs);
809 if (t)
811 if (t->flags & WT_EXECUTEINTIMERTHREAD)
812 timer_callback_wrapper(t);
813 else
815 ULONG flags
816 = (t->flags
817 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
818 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
819 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
820 if (status != STATUS_SUCCESS)
821 timer_cleanup_callback(t);
826 static ULONG queue_get_timeout(struct timer_queue *q)
828 struct queue_timer *t;
829 ULONG timeout = INFINITE;
831 RtlEnterCriticalSection(&q->cs);
832 if (list_head(&q->timers))
834 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
835 assert(!t->destroy || t->expire == EXPIRE_NEVER);
837 if (t->expire != EXPIRE_NEVER)
839 ULONGLONG time = queue_current_time();
840 timeout = t->expire < time ? 0 : t->expire - time;
843 RtlLeaveCriticalSection(&q->cs);
845 return timeout;
848 static void WINAPI timer_queue_thread_proc(LPVOID p)
850 struct timer_queue *q = p;
851 ULONG timeout_ms;
853 timeout_ms = INFINITE;
854 for (;;)
856 LARGE_INTEGER timeout;
857 NTSTATUS status;
858 BOOL done = FALSE;
860 status = NtWaitForSingleObject(
861 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
863 if (status == STATUS_WAIT_0)
865 /* There are two possible ways to trigger the event. Either
866 we are quitting and the last timer got removed, or a new
867 timer got put at the head of the list so we need to adjust
868 our timeout. */
869 RtlEnterCriticalSection(&q->cs);
870 if (q->quit && list_empty(&q->timers))
871 done = TRUE;
872 RtlLeaveCriticalSection(&q->cs);
874 else if (status == STATUS_TIMEOUT)
875 queue_timer_expire(q);
877 if (done)
878 break;
880 timeout_ms = queue_get_timeout(q);
883 NtClose(q->event);
884 RtlDeleteCriticalSection(&q->cs);
885 q->magic = 0;
886 RtlFreeHeap(GetProcessHeap(), 0, q);
887 RtlExitUserThread( 0 );
890 static void queue_destroy_timer(struct queue_timer *t)
892 /* We MUST hold the queue cs while calling this function. */
893 t->destroy = TRUE;
894 if (t->runcount == 0)
895 /* Ensure a timer is promptly removed. If callbacks are pending,
896 it will be removed after the last one finishes by the callback
897 cleanup wrapper. */
898 queue_remove_timer(t);
899 else
900 /* Make sure no destroyed timer masks an active timer at the head
901 of the sorted list. */
902 queue_move_timer(t, EXPIRE_NEVER, FALSE);
905 /***********************************************************************
906 * RtlCreateTimerQueue (NTDLL.@)
908 * Creates a timer queue object and returns a handle to it.
910 * PARAMS
911 * NewTimerQueue [O] The newly created queue.
913 * RETURNS
914 * Success: STATUS_SUCCESS.
915 * Failure: Any NTSTATUS code.
917 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
919 NTSTATUS status;
920 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
921 if (!q)
922 return STATUS_NO_MEMORY;
924 RtlInitializeCriticalSection(&q->cs);
925 list_init(&q->timers);
926 q->quit = FALSE;
927 q->magic = TIMER_QUEUE_MAGIC;
928 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
929 if (status != STATUS_SUCCESS)
931 RtlFreeHeap(GetProcessHeap(), 0, q);
932 return status;
934 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
935 timer_queue_thread_proc, q, &q->thread, NULL);
936 if (status != STATUS_SUCCESS)
938 NtClose(q->event);
939 RtlFreeHeap(GetProcessHeap(), 0, q);
940 return status;
943 *NewTimerQueue = q;
944 return STATUS_SUCCESS;
947 /***********************************************************************
948 * RtlDeleteTimerQueueEx (NTDLL.@)
950 * Deletes a timer queue object.
952 * PARAMS
953 * TimerQueue [I] The timer queue to destroy.
954 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
955 * wait until all timers are finished firing before
956 * returning. Otherwise, return immediately and set the
957 * event when all timers are done.
959 * RETURNS
960 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
961 * Failure: Any NTSTATUS code.
963 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
965 struct timer_queue *q = TimerQueue;
966 struct queue_timer *t, *temp;
967 HANDLE thread;
968 NTSTATUS status;
970 if (!q || q->magic != TIMER_QUEUE_MAGIC)
971 return STATUS_INVALID_HANDLE;
973 thread = q->thread;
975 RtlEnterCriticalSection(&q->cs);
976 q->quit = TRUE;
977 if (list_head(&q->timers))
978 /* When the last timer is removed, it will signal the timer thread to
979 exit... */
980 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
981 queue_destroy_timer(t);
982 else
983 /* However if we have none, we must do it ourselves. */
984 NtSetEvent(q->event, NULL);
985 RtlLeaveCriticalSection(&q->cs);
987 if (CompletionEvent == INVALID_HANDLE_VALUE)
989 NtWaitForSingleObject(thread, FALSE, NULL);
990 status = STATUS_SUCCESS;
992 else
994 if (CompletionEvent)
996 FIXME("asynchronous return on completion event unimplemented\n");
997 NtWaitForSingleObject(thread, FALSE, NULL);
998 NtSetEvent(CompletionEvent, NULL);
1000 status = STATUS_PENDING;
1003 NtClose(thread);
1004 return status;
1007 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
1009 static struct timer_queue *default_timer_queue;
1011 if (TimerQueue)
1012 return TimerQueue;
1013 else
1015 if (!default_timer_queue)
1017 HANDLE q;
1018 NTSTATUS status = RtlCreateTimerQueue(&q);
1019 if (status == STATUS_SUCCESS)
1021 PVOID p = interlocked_cmpxchg_ptr(
1022 (void **) &default_timer_queue, q, NULL);
1023 if (p)
1024 /* Got beat to the punch. */
1025 RtlDeleteTimerQueueEx(q, NULL);
1028 return default_timer_queue;
1032 /***********************************************************************
1033 * RtlCreateTimer (NTDLL.@)
1035 * Creates a new timer associated with the given queue.
1037 * PARAMS
1038 * NewTimer [O] The newly created timer.
1039 * TimerQueue [I] The queue to hold the timer.
1040 * Callback [I] The callback to fire.
1041 * Parameter [I] The argument for the callback.
1042 * DueTime [I] The delay, in milliseconds, before first firing the
1043 * timer.
1044 * Period [I] The period, in milliseconds, at which to fire the timer
1045 * after the first callback. If zero, the timer will only
1046 * fire once. It still needs to be deleted with
1047 * RtlDeleteTimer.
1048 * Flags [I] Flags controlling the execution of the callback. In
1049 * addition to the WT_* thread pool flags (see
1050 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1051 * WT_EXECUTEONLYONCE are supported.
1053 * RETURNS
1054 * Success: STATUS_SUCCESS.
1055 * Failure: Any NTSTATUS code.
1057 NTSTATUS WINAPI RtlCreateTimer(PHANDLE NewTimer, HANDLE TimerQueue,
1058 RTL_WAITORTIMERCALLBACKFUNC Callback,
1059 PVOID Parameter, DWORD DueTime, DWORD Period,
1060 ULONG Flags)
1062 NTSTATUS status;
1063 struct queue_timer *t;
1064 struct timer_queue *q = get_timer_queue(TimerQueue);
1066 if (!q) return STATUS_NO_MEMORY;
1067 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1069 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1070 if (!t)
1071 return STATUS_NO_MEMORY;
1073 t->q = q;
1074 t->runcount = 0;
1075 t->callback = Callback;
1076 t->param = Parameter;
1077 t->period = Period;
1078 t->flags = Flags;
1079 t->destroy = FALSE;
1080 t->event = NULL;
1082 status = STATUS_SUCCESS;
1083 RtlEnterCriticalSection(&q->cs);
1084 if (q->quit)
1085 status = STATUS_INVALID_HANDLE;
1086 else
1087 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
1088 RtlLeaveCriticalSection(&q->cs);
1090 if (status == STATUS_SUCCESS)
1091 *NewTimer = t;
1092 else
1093 RtlFreeHeap(GetProcessHeap(), 0, t);
1095 return status;
1098 /***********************************************************************
1099 * RtlUpdateTimer (NTDLL.@)
1101 * Changes the time at which a timer expires.
1103 * PARAMS
1104 * TimerQueue [I] The queue that holds the timer.
1105 * Timer [I] The timer to update.
1106 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1107 * Period [I] The period, in milliseconds, at which to fire the timer
1108 * after the first callback. If zero, the timer will not
1109 * refire once. It still needs to be deleted with
1110 * RtlDeleteTimer.
1112 * RETURNS
1113 * Success: STATUS_SUCCESS.
1114 * Failure: Any NTSTATUS code.
1116 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
1117 DWORD DueTime, DWORD Period)
1119 struct queue_timer *t = Timer;
1120 struct timer_queue *q = t->q;
1122 RtlEnterCriticalSection(&q->cs);
1123 /* Can't change a timer if it was once-only or destroyed. */
1124 if (t->expire != EXPIRE_NEVER)
1126 t->period = Period;
1127 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
1129 RtlLeaveCriticalSection(&q->cs);
1131 return STATUS_SUCCESS;
1134 /***********************************************************************
1135 * RtlDeleteTimer (NTDLL.@)
1137 * Cancels a timer-queue timer.
1139 * PARAMS
1140 * TimerQueue [I] The queue that holds the timer.
1141 * Timer [I] The timer to update.
1142 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1143 * wait until the timer is finished firing all pending
1144 * callbacks before returning. Otherwise, return
1145 * immediately and set the timer is done.
1147 * RETURNS
1148 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1149 or if the completion event is NULL.
1150 * Failure: Any NTSTATUS code.
1152 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1153 HANDLE CompletionEvent)
1155 struct queue_timer *t = Timer;
1156 struct timer_queue *q;
1157 NTSTATUS status = STATUS_PENDING;
1158 HANDLE event = NULL;
1160 if (!Timer)
1161 return STATUS_INVALID_PARAMETER_1;
1162 q = t->q;
1163 if (CompletionEvent == INVALID_HANDLE_VALUE)
1165 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1166 if (status == STATUS_SUCCESS)
1167 status = STATUS_PENDING;
1169 else if (CompletionEvent)
1170 event = CompletionEvent;
1172 RtlEnterCriticalSection(&q->cs);
1173 t->event = event;
1174 if (t->runcount == 0 && event)
1175 status = STATUS_SUCCESS;
1176 queue_destroy_timer(t);
1177 RtlLeaveCriticalSection(&q->cs);
1179 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1181 if (status == STATUS_PENDING)
1183 NtWaitForSingleObject(event, FALSE, NULL);
1184 status = STATUS_SUCCESS;
1186 NtClose(event);
1189 return status;
1192 /***********************************************************************
1193 * timerqueue_thread_proc (internal)
1195 static void CALLBACK timerqueue_thread_proc( void *param )
1197 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1198 struct threadpool_object *other_timer;
1199 LARGE_INTEGER now, timeout;
1200 struct list *ptr;
1202 TRACE( "starting timer queue thread\n" );
1204 RtlEnterCriticalSection( &timerqueue.cs );
1205 for (;;)
1207 NtQuerySystemTime( &now );
1209 /* Check for expired timers. */
1210 while ((ptr = list_head( &timerqueue.pending_timers )))
1212 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1213 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1214 assert( timer->u.timer.timer_pending );
1215 if (timer->u.timer.timeout > now.QuadPart)
1216 break;
1218 /* Queue a new callback in one of the worker threads. */
1219 list_remove( &timer->u.timer.timer_entry );
1220 timer->u.timer.timer_pending = FALSE;
1221 tp_object_submit( timer, FALSE );
1223 /* Insert the timer back into the queue, except it's marked for shutdown. */
1224 if (timer->u.timer.period && !timer->shutdown)
1226 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1227 if (timer->u.timer.timeout <= now.QuadPart)
1228 timer->u.timer.timeout = now.QuadPart + 1;
1230 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1231 struct threadpool_object, u.timer.timer_entry )
1233 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1234 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1235 break;
1237 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1238 timer->u.timer.timer_pending = TRUE;
1242 timeout_lower = TIMEOUT_INFINITE;
1243 timeout_upper = TIMEOUT_INFINITE;
1245 /* Determine next timeout and use the window length to optimize wakeup times. */
1246 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1247 struct threadpool_object, u.timer.timer_entry )
1249 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1250 if (other_timer->u.timer.timeout >= timeout_upper)
1251 break;
1253 timeout_lower = other_timer->u.timer.timeout;
1254 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1255 if (new_timeout < timeout_upper)
1256 timeout_upper = new_timeout;
1259 /* Wait for timer update events or until the next timer expires. */
1260 if (timerqueue.objcount)
1262 timeout.QuadPart = timeout_lower;
1263 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1264 continue;
1267 /* All timers have been destroyed, if no new timers are created
1268 * within some amount of time, then we can shutdown this thread. */
1269 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1270 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1271 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1273 break;
1277 timerqueue.thread_running = FALSE;
1278 RtlLeaveCriticalSection( &timerqueue.cs );
1280 TRACE( "terminating timer queue thread\n" );
1281 RtlExitUserThread( 0 );
1284 /***********************************************************************
1285 * tp_new_worker_thread (internal)
1287 * Create and account a new worker thread for the desired pool.
1289 static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1291 HANDLE thread;
1292 NTSTATUS status;
1294 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1295 threadpool_worker_proc, pool, &thread, NULL );
1296 if (status == STATUS_SUCCESS)
1298 interlocked_inc( &pool->refcount );
1299 pool->num_workers++;
1300 pool->num_busy_workers++;
1301 NtClose( thread );
1303 return status;
1306 /***********************************************************************
1307 * tp_timerqueue_lock (internal)
1309 * Acquires a lock on the global timerqueue. When the lock is acquired
1310 * successfully, it is guaranteed that the timer thread is running.
1312 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1314 NTSTATUS status = STATUS_SUCCESS;
1315 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1317 timer->u.timer.timer_initialized = FALSE;
1318 timer->u.timer.timer_pending = FALSE;
1319 timer->u.timer.timer_set = FALSE;
1320 timer->u.timer.timeout = 0;
1321 timer->u.timer.period = 0;
1322 timer->u.timer.window_length = 0;
1324 RtlEnterCriticalSection( &timerqueue.cs );
1326 /* Make sure that the timerqueue thread is running. */
1327 if (!timerqueue.thread_running)
1329 HANDLE thread;
1330 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1331 timerqueue_thread_proc, NULL, &thread, NULL );
1332 if (status == STATUS_SUCCESS)
1334 timerqueue.thread_running = TRUE;
1335 NtClose( thread );
1339 if (status == STATUS_SUCCESS)
1341 timer->u.timer.timer_initialized = TRUE;
1342 timerqueue.objcount++;
1345 RtlLeaveCriticalSection( &timerqueue.cs );
1346 return status;
1349 /***********************************************************************
1350 * tp_timerqueue_unlock (internal)
1352 * Releases a lock on the global timerqueue.
1354 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1356 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1358 RtlEnterCriticalSection( &timerqueue.cs );
1359 if (timer->u.timer.timer_initialized)
1361 /* If timer was pending, remove it. */
1362 if (timer->u.timer.timer_pending)
1364 list_remove( &timer->u.timer.timer_entry );
1365 timer->u.timer.timer_pending = FALSE;
1368 /* If the last timer object was destroyed, then wake up the thread. */
1369 if (!--timerqueue.objcount)
1371 assert( list_empty( &timerqueue.pending_timers ) );
1372 RtlWakeAllConditionVariable( &timerqueue.update_event );
1375 timer->u.timer.timer_initialized = FALSE;
1377 RtlLeaveCriticalSection( &timerqueue.cs );
1380 /***********************************************************************
1381 * waitqueue_thread_proc (internal)
1383 static void CALLBACK waitqueue_thread_proc( void *param )
1385 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1386 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1387 struct waitqueue_bucket *bucket = param;
1388 struct threadpool_object *wait, *next;
1389 LARGE_INTEGER now, timeout;
1390 DWORD num_handles;
1391 NTSTATUS status;
1393 TRACE( "starting wait queue thread\n" );
1395 RtlEnterCriticalSection( &waitqueue.cs );
1397 for (;;)
1399 NtQuerySystemTime( &now );
1400 timeout.QuadPart = TIMEOUT_INFINITE;
1401 num_handles = 0;
1403 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1404 u.wait.wait_entry )
1406 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1407 if (wait->u.wait.timeout <= now.QuadPart)
1409 /* Wait object timed out. */
1410 list_remove( &wait->u.wait.wait_entry );
1411 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1412 tp_object_submit( wait, FALSE );
1414 else
1416 if (wait->u.wait.timeout < timeout.QuadPart)
1417 timeout.QuadPart = wait->u.wait.timeout;
1419 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1420 interlocked_inc( &wait->refcount );
1421 objects[num_handles] = wait;
1422 handles[num_handles] = wait->u.wait.handle;
1423 num_handles++;
1427 if (!bucket->objcount)
1429 /* All wait objects have been destroyed, if no new wait objects are created
1430 * within some amount of time, then we can shutdown this thread. */
1431 assert( num_handles == 0 );
1432 RtlLeaveCriticalSection( &waitqueue.cs );
1433 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1434 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout );
1435 RtlEnterCriticalSection( &waitqueue.cs );
1437 if (status == STATUS_TIMEOUT && !bucket->objcount)
1438 break;
1440 else
1442 handles[num_handles] = bucket->update_event;
1443 RtlLeaveCriticalSection( &waitqueue.cs );
1444 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout );
1445 RtlEnterCriticalSection( &waitqueue.cs );
1447 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1449 wait = objects[status - STATUS_WAIT_0];
1450 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1451 if (wait->u.wait.bucket)
1453 /* Wait object signaled. */
1454 assert( wait->u.wait.bucket == bucket );
1455 list_remove( &wait->u.wait.wait_entry );
1456 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1457 tp_object_submit( wait, TRUE );
1459 else
1460 WARN("wait object %p triggered while object was destroyed\n", wait);
1463 /* Release temporary references to wait objects. */
1464 while (num_handles)
1466 wait = objects[--num_handles];
1467 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1468 tp_object_release( wait );
1472 /* Try to merge bucket with other threads. */
1473 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1474 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1476 struct waitqueue_bucket *other_bucket;
1477 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1479 if (other_bucket != bucket && other_bucket->objcount &&
1480 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1482 other_bucket->objcount += bucket->objcount;
1483 bucket->objcount = 0;
1485 /* Update reserved list. */
1486 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1488 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1489 wait->u.wait.bucket = other_bucket;
1491 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1493 /* Update waiting list. */
1494 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1496 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1497 wait->u.wait.bucket = other_bucket;
1499 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1501 /* Move bucket to the end, to keep the probability of
1502 * newly added wait objects as small as possible. */
1503 list_remove( &bucket->bucket_entry );
1504 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1506 NtSetEvent( other_bucket->update_event, NULL );
1507 break;
1513 /* Remove this bucket from the list. */
1514 list_remove( &bucket->bucket_entry );
1515 if (!--waitqueue.num_buckets)
1516 assert( list_empty( &waitqueue.buckets ) );
1518 RtlLeaveCriticalSection( &waitqueue.cs );
1520 TRACE( "terminating wait queue thread\n" );
1522 assert( bucket->objcount == 0 );
1523 assert( list_empty( &bucket->reserved ) );
1524 assert( list_empty( &bucket->waiting ) );
1525 NtClose( bucket->update_event );
1527 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1528 RtlExitUserThread( 0 );
1531 /***********************************************************************
1532 * tp_waitqueue_lock (internal)
1534 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1536 struct waitqueue_bucket *bucket;
1537 NTSTATUS status;
1538 HANDLE thread;
1539 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1541 wait->u.wait.signaled = 0;
1542 wait->u.wait.bucket = NULL;
1543 wait->u.wait.wait_pending = FALSE;
1544 wait->u.wait.timeout = 0;
1545 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1547 RtlEnterCriticalSection( &waitqueue.cs );
1549 /* Try to assign to existing bucket if possible. */
1550 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1552 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS)
1554 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1555 wait->u.wait.bucket = bucket;
1556 bucket->objcount++;
1558 status = STATUS_SUCCESS;
1559 goto out;
1563 /* Create a new bucket and corresponding worker thread. */
1564 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1565 if (!bucket)
1567 status = STATUS_NO_MEMORY;
1568 goto out;
1571 bucket->objcount = 0;
1572 list_init( &bucket->reserved );
1573 list_init( &bucket->waiting );
1575 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1576 NULL, SynchronizationEvent, FALSE );
1577 if (status)
1579 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1580 goto out;
1583 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1584 waitqueue_thread_proc, bucket, &thread, NULL );
1585 if (status == STATUS_SUCCESS)
1587 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1588 waitqueue.num_buckets++;
1590 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1591 wait->u.wait.bucket = bucket;
1592 bucket->objcount++;
1594 NtClose( thread );
1596 else
1598 NtClose( bucket->update_event );
1599 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1602 out:
1603 RtlLeaveCriticalSection( &waitqueue.cs );
1604 return status;
1607 /***********************************************************************
1608 * tp_waitqueue_unlock (internal)
1610 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1612 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1614 RtlEnterCriticalSection( &waitqueue.cs );
1615 if (wait->u.wait.bucket)
1617 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1618 assert( bucket->objcount > 0 );
1620 list_remove( &wait->u.wait.wait_entry );
1621 wait->u.wait.bucket = NULL;
1622 bucket->objcount--;
1624 NtSetEvent( bucket->update_event, NULL );
1626 RtlLeaveCriticalSection( &waitqueue.cs );
1629 /***********************************************************************
1630 * tp_threadpool_alloc (internal)
1632 * Allocates a new threadpool object.
1634 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1636 struct threadpool *pool;
1638 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1639 if (!pool)
1640 return STATUS_NO_MEMORY;
1642 pool->refcount = 1;
1643 pool->objcount = 0;
1644 pool->shutdown = FALSE;
1646 RtlInitializeCriticalSection( &pool->cs );
1647 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1649 list_init( &pool->pool );
1650 RtlInitializeConditionVariable( &pool->update_event );
1652 pool->max_workers = 500;
1653 pool->min_workers = 0;
1654 pool->num_workers = 0;
1655 pool->num_busy_workers = 0;
1657 TRACE( "allocated threadpool %p\n", pool );
1659 *out = pool;
1660 return STATUS_SUCCESS;
1663 /***********************************************************************
1664 * tp_threadpool_shutdown (internal)
1666 * Prepares the shutdown of a threadpool object and notifies all worker
1667 * threads to terminate (after all remaining work items have been
1668 * processed).
1670 static void tp_threadpool_shutdown( struct threadpool *pool )
1672 assert( pool != default_threadpool );
1674 pool->shutdown = TRUE;
1675 RtlWakeAllConditionVariable( &pool->update_event );
1678 /***********************************************************************
1679 * tp_threadpool_release (internal)
1681 * Releases a reference to a threadpool object.
1683 static BOOL tp_threadpool_release( struct threadpool *pool )
1685 if (interlocked_dec( &pool->refcount ))
1686 return FALSE;
1688 TRACE( "destroying threadpool %p\n", pool );
1690 assert( pool->shutdown );
1691 assert( !pool->objcount );
1692 assert( list_empty( &pool->pool ) );
1694 pool->cs.DebugInfo->Spare[0] = 0;
1695 RtlDeleteCriticalSection( &pool->cs );
1697 RtlFreeHeap( GetProcessHeap(), 0, pool );
1698 return TRUE;
1701 /***********************************************************************
1702 * tp_threadpool_lock (internal)
1704 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1705 * block. When the lock is acquired successfully, it is guaranteed that
1706 * there is at least one worker thread to process tasks.
1708 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1710 struct threadpool *pool = NULL;
1711 NTSTATUS status = STATUS_SUCCESS;
1713 if (environment)
1714 pool = (struct threadpool *)environment->Pool;
1716 if (!pool)
1718 if (!default_threadpool)
1720 status = tp_threadpool_alloc( &pool );
1721 if (status != STATUS_SUCCESS)
1722 return status;
1724 if (interlocked_cmpxchg_ptr( (void *)&default_threadpool, pool, NULL ) != NULL)
1726 tp_threadpool_shutdown( pool );
1727 tp_threadpool_release( pool );
1731 pool = default_threadpool;
1734 RtlEnterCriticalSection( &pool->cs );
1736 /* Make sure that the threadpool has at least one thread. */
1737 if (!pool->num_workers)
1738 status = tp_new_worker_thread( pool );
1740 /* Keep a reference, and increment objcount to ensure that the
1741 * last thread doesn't terminate. */
1742 if (status == STATUS_SUCCESS)
1744 interlocked_inc( &pool->refcount );
1745 pool->objcount++;
1748 RtlLeaveCriticalSection( &pool->cs );
1750 if (status != STATUS_SUCCESS)
1751 return status;
1753 *out = pool;
1754 return STATUS_SUCCESS;
1757 /***********************************************************************
1758 * tp_threadpool_unlock (internal)
1760 * Releases a lock on a threadpool.
1762 static void tp_threadpool_unlock( struct threadpool *pool )
1764 RtlEnterCriticalSection( &pool->cs );
1765 pool->objcount--;
1766 RtlLeaveCriticalSection( &pool->cs );
1767 tp_threadpool_release( pool );
1770 /***********************************************************************
1771 * tp_group_alloc (internal)
1773 * Allocates a new threadpool group object.
1775 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1777 struct threadpool_group *group;
1779 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1780 if (!group)
1781 return STATUS_NO_MEMORY;
1783 group->refcount = 1;
1784 group->shutdown = FALSE;
1786 RtlInitializeCriticalSection( &group->cs );
1787 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1789 list_init( &group->members );
1791 TRACE( "allocated group %p\n", group );
1793 *out = group;
1794 return STATUS_SUCCESS;
1797 /***********************************************************************
1798 * tp_group_shutdown (internal)
1800 * Marks the group object for shutdown.
1802 static void tp_group_shutdown( struct threadpool_group *group )
1804 group->shutdown = TRUE;
1807 /***********************************************************************
1808 * tp_group_release (internal)
1810 * Releases a reference to a group object.
1812 static BOOL tp_group_release( struct threadpool_group *group )
1814 if (interlocked_dec( &group->refcount ))
1815 return FALSE;
1817 TRACE( "destroying group %p\n", group );
1819 assert( group->shutdown );
1820 assert( list_empty( &group->members ) );
1822 group->cs.DebugInfo->Spare[0] = 0;
1823 RtlDeleteCriticalSection( &group->cs );
1825 RtlFreeHeap( GetProcessHeap(), 0, group );
1826 return TRUE;
1829 /***********************************************************************
1830 * tp_object_initialize (internal)
1832 * Initializes members of a threadpool object.
1834 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
1835 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
1837 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
1839 object->refcount = 1;
1840 object->shutdown = FALSE;
1842 object->pool = pool;
1843 object->group = NULL;
1844 object->userdata = userdata;
1845 object->group_cancel_callback = NULL;
1846 object->finalization_callback = NULL;
1847 object->may_run_long = 0;
1848 object->race_dll = NULL;
1850 memset( &object->group_entry, 0, sizeof(object->group_entry) );
1851 object->is_group_member = FALSE;
1853 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
1854 RtlInitializeConditionVariable( &object->finished_event );
1855 RtlInitializeConditionVariable( &object->group_finished_event );
1856 object->num_pending_callbacks = 0;
1857 object->num_running_callbacks = 0;
1858 object->num_associated_callbacks = 0;
1860 if (environment)
1862 if (environment->Version != 1 && environment->Version != 3)
1863 FIXME( "unsupported environment version %u\n", environment->Version );
1865 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
1866 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
1867 object->finalization_callback = environment->FinalizationCallback;
1868 object->may_run_long = environment->u.s.LongFunction != 0;
1869 object->race_dll = environment->RaceDll;
1871 if (environment->ActivationContext)
1872 FIXME( "activation context not supported yet\n" );
1874 if (environment->u.s.Persistent)
1875 FIXME( "persistent threads not supported yet\n" );
1878 if (object->race_dll)
1879 LdrAddRefDll( 0, object->race_dll );
1881 TRACE( "allocated object %p of type %u\n", object, object->type );
1883 /* For simple callbacks we have to run tp_object_submit before adding this object
1884 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1885 * will be set, and tp_object_submit would fail with an assertion. */
1887 if (is_simple_callback)
1888 tp_object_submit( object, FALSE );
1890 if (object->group)
1892 struct threadpool_group *group = object->group;
1893 interlocked_inc( &group->refcount );
1895 RtlEnterCriticalSection( &group->cs );
1896 list_add_tail( &group->members, &object->group_entry );
1897 object->is_group_member = TRUE;
1898 RtlLeaveCriticalSection( &group->cs );
1901 if (is_simple_callback)
1902 tp_object_release( object );
1905 /***********************************************************************
1906 * tp_object_submit (internal)
1908 * Submits a threadpool object to the associated threadpool. This
1909 * function has to be VOID because TpPostWork can never fail on Windows.
1911 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
1913 struct threadpool *pool = object->pool;
1914 NTSTATUS status = STATUS_UNSUCCESSFUL;
1916 assert( !object->shutdown );
1917 assert( !pool->shutdown );
1919 RtlEnterCriticalSection( &pool->cs );
1921 /* Start new worker threads if required. */
1922 if (pool->num_busy_workers >= pool->num_workers &&
1923 pool->num_workers < pool->max_workers)
1924 status = tp_new_worker_thread( pool );
1926 /* Queue work item and increment refcount. */
1927 interlocked_inc( &object->refcount );
1928 if (!object->num_pending_callbacks++)
1929 list_add_tail( &pool->pool, &object->pool_entry );
1931 /* Count how often the object was signaled. */
1932 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
1933 object->u.wait.signaled++;
1935 /* No new thread started - wake up one existing thread. */
1936 if (status != STATUS_SUCCESS)
1938 assert( pool->num_workers > 0 );
1939 RtlWakeConditionVariable( &pool->update_event );
1942 RtlLeaveCriticalSection( &pool->cs );
1945 /***********************************************************************
1946 * tp_object_cancel (internal)
1948 * Cancels all currently pending callbacks for a specific object.
1950 static void tp_object_cancel( struct threadpool_object *object )
1952 struct threadpool *pool = object->pool;
1953 LONG pending_callbacks = 0;
1955 RtlEnterCriticalSection( &pool->cs );
1956 if (object->num_pending_callbacks)
1958 pending_callbacks = object->num_pending_callbacks;
1959 object->num_pending_callbacks = 0;
1960 list_remove( &object->pool_entry );
1962 if (object->type == TP_OBJECT_TYPE_WAIT)
1963 object->u.wait.signaled = 0;
1965 RtlLeaveCriticalSection( &pool->cs );
1967 while (pending_callbacks--)
1968 tp_object_release( object );
1971 /***********************************************************************
1972 * tp_object_wait (internal)
1974 * Waits until all pending and running callbacks of a specific object
1975 * have been processed.
1977 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
1979 struct threadpool *pool = object->pool;
1981 RtlEnterCriticalSection( &pool->cs );
1982 if (group_wait)
1984 while (object->num_pending_callbacks || object->num_running_callbacks)
1985 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
1987 else
1989 while (object->num_pending_callbacks || object->num_associated_callbacks)
1990 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
1992 RtlLeaveCriticalSection( &pool->cs );
1995 /***********************************************************************
1996 * tp_object_prepare_shutdown (internal)
1998 * Prepares a threadpool object for shutdown.
2000 static void tp_object_prepare_shutdown( struct threadpool_object *object )
2002 if (object->type == TP_OBJECT_TYPE_TIMER)
2003 tp_timerqueue_unlock( object );
2004 else if (object->type == TP_OBJECT_TYPE_WAIT)
2005 tp_waitqueue_unlock( object );
2008 /***********************************************************************
2009 * tp_object_release (internal)
2011 * Releases a reference to a threadpool object.
2013 static BOOL tp_object_release( struct threadpool_object *object )
2015 if (interlocked_dec( &object->refcount ))
2016 return FALSE;
2018 TRACE( "destroying object %p of type %u\n", object, object->type );
2020 assert( object->shutdown );
2021 assert( !object->num_pending_callbacks );
2022 assert( !object->num_running_callbacks );
2023 assert( !object->num_associated_callbacks );
2025 /* release reference to the group */
2026 if (object->group)
2028 struct threadpool_group *group = object->group;
2030 RtlEnterCriticalSection( &group->cs );
2031 if (object->is_group_member)
2033 list_remove( &object->group_entry );
2034 object->is_group_member = FALSE;
2036 RtlLeaveCriticalSection( &group->cs );
2038 tp_group_release( group );
2041 tp_threadpool_unlock( object->pool );
2043 if (object->race_dll)
2044 LdrUnloadDll( object->race_dll );
2046 RtlFreeHeap( GetProcessHeap(), 0, object );
2047 return TRUE;
2050 /***********************************************************************
2051 * threadpool_worker_proc (internal)
2053 static void CALLBACK threadpool_worker_proc( void *param )
2055 TP_CALLBACK_INSTANCE *callback_instance;
2056 struct threadpool_instance instance;
2057 struct threadpool *pool = param;
2058 TP_WAIT_RESULT wait_result = 0;
2059 LARGE_INTEGER timeout;
2060 struct list *ptr;
2061 NTSTATUS status;
2063 TRACE( "starting worker thread for pool %p\n", pool );
2065 RtlEnterCriticalSection( &pool->cs );
2066 pool->num_busy_workers--;
2067 for (;;)
2069 while ((ptr = list_head( &pool->pool )))
2071 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2072 assert( object->num_pending_callbacks > 0 );
2074 /* If further pending callbacks are queued, move the work item to
2075 * the end of the pool list. Otherwise remove it from the pool. */
2076 list_remove( &object->pool_entry );
2077 if (--object->num_pending_callbacks)
2078 list_add_tail( &pool->pool, &object->pool_entry );
2080 /* For wait objects check if they were signaled or have timed out. */
2081 if (object->type == TP_OBJECT_TYPE_WAIT)
2083 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2084 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2087 /* Leave critical section and do the actual callback. */
2088 object->num_associated_callbacks++;
2089 object->num_running_callbacks++;
2090 pool->num_busy_workers++;
2091 RtlLeaveCriticalSection( &pool->cs );
2093 /* Initialize threadpool instance struct. */
2094 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2095 instance.object = object;
2096 instance.threadid = GetCurrentThreadId();
2097 instance.associated = TRUE;
2098 instance.may_run_long = object->may_run_long;
2099 instance.cleanup.critical_section = NULL;
2100 instance.cleanup.mutex = NULL;
2101 instance.cleanup.semaphore = NULL;
2102 instance.cleanup.semaphore_count = 0;
2103 instance.cleanup.event = NULL;
2104 instance.cleanup.library = NULL;
2106 switch (object->type)
2108 case TP_OBJECT_TYPE_SIMPLE:
2110 TRACE( "executing simple callback %p(%p, %p)\n",
2111 object->u.simple.callback, callback_instance, object->userdata );
2112 object->u.simple.callback( callback_instance, object->userdata );
2113 TRACE( "callback %p returned\n", object->u.simple.callback );
2114 break;
2117 case TP_OBJECT_TYPE_WORK:
2119 TRACE( "executing work callback %p(%p, %p, %p)\n",
2120 object->u.work.callback, callback_instance, object->userdata, object );
2121 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2122 TRACE( "callback %p returned\n", object->u.work.callback );
2123 break;
2126 case TP_OBJECT_TYPE_TIMER:
2128 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2129 object->u.timer.callback, callback_instance, object->userdata, object );
2130 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2131 TRACE( "callback %p returned\n", object->u.timer.callback );
2132 break;
2135 case TP_OBJECT_TYPE_WAIT:
2137 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2138 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2139 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2140 TRACE( "callback %p returned\n", object->u.wait.callback );
2141 break;
2144 default:
2145 assert(0);
2146 break;
2149 /* Execute finalization callback. */
2150 if (object->finalization_callback)
2152 TRACE( "executing finalization callback %p(%p, %p)\n",
2153 object->finalization_callback, callback_instance, object->userdata );
2154 object->finalization_callback( callback_instance, object->userdata );
2155 TRACE( "callback %p returned\n", object->finalization_callback );
2158 /* Execute cleanup tasks. */
2159 if (instance.cleanup.critical_section)
2161 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2163 if (instance.cleanup.mutex)
2165 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2166 if (status != STATUS_SUCCESS) goto skip_cleanup;
2168 if (instance.cleanup.semaphore)
2170 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2171 if (status != STATUS_SUCCESS) goto skip_cleanup;
2173 if (instance.cleanup.event)
2175 status = NtSetEvent( instance.cleanup.event, NULL );
2176 if (status != STATUS_SUCCESS) goto skip_cleanup;
2178 if (instance.cleanup.library)
2180 LdrUnloadDll( instance.cleanup.library );
2183 skip_cleanup:
2184 RtlEnterCriticalSection( &pool->cs );
2185 pool->num_busy_workers--;
2187 /* Simple callbacks are automatically shutdown after execution. */
2188 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2190 tp_object_prepare_shutdown( object );
2191 object->shutdown = TRUE;
2194 object->num_running_callbacks--;
2195 if (!object->num_pending_callbacks && !object->num_running_callbacks)
2196 RtlWakeAllConditionVariable( &object->group_finished_event );
2198 if (instance.associated)
2200 object->num_associated_callbacks--;
2201 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2202 RtlWakeAllConditionVariable( &object->finished_event );
2205 tp_object_release( object );
2208 /* Shutdown worker thread if requested. */
2209 if (pool->shutdown)
2210 break;
2212 /* Wait for new tasks or until the timeout expires. A thread only terminates
2213 * when no new tasks are available, and the number of threads can be
2214 * decreased without violating the min_workers limit. An exception is when
2215 * min_workers == 0, then objcount is used to detect if the last thread
2216 * can be terminated. */
2217 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2218 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2219 !list_head( &pool->pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2220 (!pool->min_workers && !pool->objcount)))
2222 break;
2225 pool->num_workers--;
2226 RtlLeaveCriticalSection( &pool->cs );
2228 TRACE( "terminating worker thread for pool %p\n", pool );
2229 tp_threadpool_release( pool );
2230 RtlExitUserThread( 0 );
2233 /***********************************************************************
2234 * TpAllocCleanupGroup (NTDLL.@)
2236 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2238 TRACE( "%p\n", out );
2240 return tp_group_alloc( (struct threadpool_group **)out );
2243 /***********************************************************************
2244 * TpAllocPool (NTDLL.@)
2246 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2248 TRACE( "%p %p\n", out, reserved );
2250 if (reserved)
2251 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2253 return tp_threadpool_alloc( (struct threadpool **)out );
2256 /***********************************************************************
2257 * TpAllocTimer (NTDLL.@)
2259 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2260 TP_CALLBACK_ENVIRON *environment )
2262 struct threadpool_object *object;
2263 struct threadpool *pool;
2264 NTSTATUS status;
2266 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2268 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2269 if (!object)
2270 return STATUS_NO_MEMORY;
2272 status = tp_threadpool_lock( &pool, environment );
2273 if (status)
2275 RtlFreeHeap( GetProcessHeap(), 0, object );
2276 return status;
2279 object->type = TP_OBJECT_TYPE_TIMER;
2280 object->u.timer.callback = callback;
2282 status = tp_timerqueue_lock( object );
2283 if (status)
2285 tp_threadpool_unlock( pool );
2286 RtlFreeHeap( GetProcessHeap(), 0, object );
2287 return status;
2290 tp_object_initialize( object, pool, userdata, environment );
2292 *out = (TP_TIMER *)object;
2293 return STATUS_SUCCESS;
2296 /***********************************************************************
2297 * TpAllocWait (NTDLL.@)
2299 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2300 TP_CALLBACK_ENVIRON *environment )
2302 struct threadpool_object *object;
2303 struct threadpool *pool;
2304 NTSTATUS status;
2306 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2308 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2309 if (!object)
2310 return STATUS_NO_MEMORY;
2312 status = tp_threadpool_lock( &pool, environment );
2313 if (status)
2315 RtlFreeHeap( GetProcessHeap(), 0, object );
2316 return status;
2319 object->type = TP_OBJECT_TYPE_WAIT;
2320 object->u.wait.callback = callback;
2322 status = tp_waitqueue_lock( object );
2323 if (status)
2325 tp_threadpool_unlock( pool );
2326 RtlFreeHeap( GetProcessHeap(), 0, object );
2327 return status;
2330 tp_object_initialize( object, pool, userdata, environment );
2332 *out = (TP_WAIT *)object;
2333 return STATUS_SUCCESS;
2336 /***********************************************************************
2337 * TpAllocWork (NTDLL.@)
2339 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2340 TP_CALLBACK_ENVIRON *environment )
2342 struct threadpool_object *object;
2343 struct threadpool *pool;
2344 NTSTATUS status;
2346 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2348 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2349 if (!object)
2350 return STATUS_NO_MEMORY;
2352 status = tp_threadpool_lock( &pool, environment );
2353 if (status)
2355 RtlFreeHeap( GetProcessHeap(), 0, object );
2356 return status;
2359 object->type = TP_OBJECT_TYPE_WORK;
2360 object->u.work.callback = callback;
2361 tp_object_initialize( object, pool, userdata, environment );
2363 *out = (TP_WORK *)object;
2364 return STATUS_SUCCESS;
2367 /***********************************************************************
2368 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2370 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2372 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2374 TRACE( "%p %p\n", instance, crit );
2376 if (!this->cleanup.critical_section)
2377 this->cleanup.critical_section = crit;
2380 /***********************************************************************
2381 * TpCallbackMayRunLong (NTDLL.@)
2383 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2385 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2386 struct threadpool_object *object = this->object;
2387 struct threadpool *pool;
2388 NTSTATUS status = STATUS_SUCCESS;
2390 TRACE( "%p\n", instance );
2392 if (this->threadid != GetCurrentThreadId())
2394 ERR("called from wrong thread, ignoring\n");
2395 return STATUS_UNSUCCESSFUL; /* FIXME */
2398 if (this->may_run_long)
2399 return STATUS_SUCCESS;
2401 pool = object->pool;
2402 RtlEnterCriticalSection( &pool->cs );
2404 /* Start new worker threads if required. */
2405 if (pool->num_busy_workers >= pool->num_workers)
2407 if (pool->num_workers < pool->max_workers)
2409 status = tp_new_worker_thread( pool );
2411 else
2413 status = STATUS_TOO_MANY_THREADS;
2417 RtlLeaveCriticalSection( &pool->cs );
2418 this->may_run_long = TRUE;
2419 return status;
2422 /***********************************************************************
2423 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2425 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2427 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2429 TRACE( "%p %p\n", instance, mutex );
2431 if (!this->cleanup.mutex)
2432 this->cleanup.mutex = mutex;
2435 /***********************************************************************
2436 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2438 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2440 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2442 TRACE( "%p %p %u\n", instance, semaphore, count );
2444 if (!this->cleanup.semaphore)
2446 this->cleanup.semaphore = semaphore;
2447 this->cleanup.semaphore_count = count;
2451 /***********************************************************************
2452 * TpCallbackSetEventOnCompletion (NTDLL.@)
2454 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2456 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2458 TRACE( "%p %p\n", instance, event );
2460 if (!this->cleanup.event)
2461 this->cleanup.event = event;
2464 /***********************************************************************
2465 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2467 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2469 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2471 TRACE( "%p %p\n", instance, module );
2473 if (!this->cleanup.library)
2474 this->cleanup.library = module;
2477 /***********************************************************************
2478 * TpDisassociateCallback (NTDLL.@)
2480 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2482 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2483 struct threadpool_object *object = this->object;
2484 struct threadpool *pool;
2486 TRACE( "%p\n", instance );
2488 if (this->threadid != GetCurrentThreadId())
2490 ERR("called from wrong thread, ignoring\n");
2491 return;
2494 if (!this->associated)
2495 return;
2497 pool = object->pool;
2498 RtlEnterCriticalSection( &pool->cs );
2500 object->num_associated_callbacks--;
2501 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2502 RtlWakeAllConditionVariable( &object->finished_event );
2504 RtlLeaveCriticalSection( &pool->cs );
2505 this->associated = FALSE;
2508 /***********************************************************************
2509 * TpIsTimerSet (NTDLL.@)
2511 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2513 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2515 TRACE( "%p\n", timer );
2517 return this->u.timer.timer_set;
2520 /***********************************************************************
2521 * TpPostWork (NTDLL.@)
2523 VOID WINAPI TpPostWork( TP_WORK *work )
2525 struct threadpool_object *this = impl_from_TP_WORK( work );
2527 TRACE( "%p\n", work );
2529 tp_object_submit( this, FALSE );
2532 /***********************************************************************
2533 * TpReleaseCleanupGroup (NTDLL.@)
2535 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2537 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2539 TRACE( "%p\n", group );
2541 tp_group_shutdown( this );
2542 tp_group_release( this );
2545 /***********************************************************************
2546 * TpReleaseCleanupGroupMembers (NTDLL.@)
2548 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2550 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2551 struct threadpool_object *object, *next;
2552 struct list members;
2554 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2556 RtlEnterCriticalSection( &this->cs );
2558 /* Unset group, increase references, and mark objects for shutdown */
2559 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2561 assert( object->group == this );
2562 assert( object->is_group_member );
2564 if (interlocked_inc( &object->refcount ) == 1)
2566 /* Object is basically already destroyed, but group reference
2567 * was not deleted yet. We can safely ignore this object. */
2568 interlocked_dec( &object->refcount );
2569 list_remove( &object->group_entry );
2570 object->is_group_member = FALSE;
2571 continue;
2574 object->is_group_member = FALSE;
2575 tp_object_prepare_shutdown( object );
2578 /* Move members to a new temporary list */
2579 list_init( &members );
2580 list_move_tail( &members, &this->members );
2582 RtlLeaveCriticalSection( &this->cs );
2584 /* Cancel pending callbacks if requested */
2585 if (cancel_pending)
2587 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2589 tp_object_cancel( object );
2593 /* Wait for remaining callbacks to finish */
2594 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2596 tp_object_wait( object, TRUE );
2598 if (!object->shutdown)
2600 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2601 if (cancel_pending && object->group_cancel_callback)
2603 TRACE( "executing group cancel callback %p(%p, %p)\n",
2604 object->group_cancel_callback, object->userdata, userdata );
2605 object->group_cancel_callback( object->userdata, userdata );
2606 TRACE( "callback %p returned\n", object->group_cancel_callback );
2609 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2610 tp_object_release( object );
2613 object->shutdown = TRUE;
2614 tp_object_release( object );
2618 /***********************************************************************
2619 * TpReleasePool (NTDLL.@)
2621 VOID WINAPI TpReleasePool( TP_POOL *pool )
2623 struct threadpool *this = impl_from_TP_POOL( pool );
2625 TRACE( "%p\n", pool );
2627 tp_threadpool_shutdown( this );
2628 tp_threadpool_release( this );
2631 /***********************************************************************
2632 * TpReleaseTimer (NTDLL.@)
2634 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2636 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2638 TRACE( "%p\n", timer );
2640 tp_object_prepare_shutdown( this );
2641 this->shutdown = TRUE;
2642 tp_object_release( this );
2645 /***********************************************************************
2646 * TpReleaseWait (NTDLL.@)
2648 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
2650 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2652 TRACE( "%p\n", wait );
2654 tp_object_prepare_shutdown( this );
2655 this->shutdown = TRUE;
2656 tp_object_release( this );
2659 /***********************************************************************
2660 * TpReleaseWork (NTDLL.@)
2662 VOID WINAPI TpReleaseWork( TP_WORK *work )
2664 struct threadpool_object *this = impl_from_TP_WORK( work );
2666 TRACE( "%p\n", work );
2668 tp_object_prepare_shutdown( this );
2669 this->shutdown = TRUE;
2670 tp_object_release( this );
2673 /***********************************************************************
2674 * TpSetPoolMaxThreads (NTDLL.@)
2676 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
2678 struct threadpool *this = impl_from_TP_POOL( pool );
2680 TRACE( "%p %u\n", pool, maximum );
2682 RtlEnterCriticalSection( &this->cs );
2683 this->max_workers = max( maximum, 1 );
2684 this->min_workers = min( this->min_workers, this->max_workers );
2685 RtlLeaveCriticalSection( &this->cs );
2688 /***********************************************************************
2689 * TpSetPoolMinThreads (NTDLL.@)
2691 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
2693 struct threadpool *this = impl_from_TP_POOL( pool );
2694 NTSTATUS status = STATUS_SUCCESS;
2696 TRACE( "%p %u\n", pool, minimum );
2698 RtlEnterCriticalSection( &this->cs );
2700 while (this->num_workers < minimum)
2702 status = tp_new_worker_thread( this );
2703 if (status != STATUS_SUCCESS)
2704 break;
2707 if (status == STATUS_SUCCESS)
2709 this->min_workers = minimum;
2710 this->max_workers = max( this->min_workers, this->max_workers );
2713 RtlLeaveCriticalSection( &this->cs );
2714 return !status;
2717 /***********************************************************************
2718 * TpSetTimer (NTDLL.@)
2720 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
2722 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2723 struct threadpool_object *other_timer;
2724 BOOL submit_timer = FALSE;
2725 ULONGLONG timestamp;
2727 TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
2729 RtlEnterCriticalSection( &timerqueue.cs );
2731 assert( this->u.timer.timer_initialized );
2732 this->u.timer.timer_set = timeout != NULL;
2734 /* Convert relative timeout to absolute timestamp and handle a timeout
2735 * of zero, which means that the timer is submitted immediately. */
2736 if (timeout)
2738 timestamp = timeout->QuadPart;
2739 if ((LONGLONG)timestamp < 0)
2741 LARGE_INTEGER now;
2742 NtQuerySystemTime( &now );
2743 timestamp = now.QuadPart - timestamp;
2745 else if (!timestamp)
2747 if (!period)
2748 timeout = NULL;
2749 else
2751 LARGE_INTEGER now;
2752 NtQuerySystemTime( &now );
2753 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
2755 submit_timer = TRUE;
2759 /* First remove existing timeout. */
2760 if (this->u.timer.timer_pending)
2762 list_remove( &this->u.timer.timer_entry );
2763 this->u.timer.timer_pending = FALSE;
2766 /* If the timer was enabled, then add it back to the queue. */
2767 if (timeout)
2769 this->u.timer.timeout = timestamp;
2770 this->u.timer.period = period;
2771 this->u.timer.window_length = window_length;
2773 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
2774 struct threadpool_object, u.timer.timer_entry )
2776 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
2777 if (this->u.timer.timeout < other_timer->u.timer.timeout)
2778 break;
2780 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
2782 /* Wake up the timer thread when the timeout has to be updated. */
2783 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
2784 RtlWakeAllConditionVariable( &timerqueue.update_event );
2786 this->u.timer.timer_pending = TRUE;
2789 RtlLeaveCriticalSection( &timerqueue.cs );
2791 if (submit_timer)
2792 tp_object_submit( this, FALSE );
2795 /***********************************************************************
2796 * TpSetWait (NTDLL.@)
2798 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
2800 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2801 ULONGLONG timestamp = TIMEOUT_INFINITE;
2802 BOOL submit_wait = FALSE;
2804 TRACE( "%p %p %p\n", wait, handle, timeout );
2806 RtlEnterCriticalSection( &waitqueue.cs );
2808 assert( this->u.wait.bucket );
2809 this->u.wait.handle = handle;
2811 if (handle || this->u.wait.wait_pending)
2813 struct waitqueue_bucket *bucket = this->u.wait.bucket;
2814 list_remove( &this->u.wait.wait_entry );
2816 /* Convert relative timeout to absolute timestamp. */
2817 if (handle && timeout)
2819 timestamp = timeout->QuadPart;
2820 if ((LONGLONG)timestamp < 0)
2822 LARGE_INTEGER now;
2823 NtQuerySystemTime( &now );
2824 timestamp = now.QuadPart - timestamp;
2826 else if (!timestamp)
2828 submit_wait = TRUE;
2829 handle = NULL;
2833 /* Add wait object back into one of the queues. */
2834 if (handle)
2836 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
2837 this->u.wait.wait_pending = TRUE;
2838 this->u.wait.timeout = timestamp;
2840 else
2842 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
2843 this->u.wait.wait_pending = FALSE;
2846 /* Wake up the wait queue thread. */
2847 NtSetEvent( bucket->update_event, NULL );
2850 RtlLeaveCriticalSection( &waitqueue.cs );
2852 if (submit_wait)
2853 tp_object_submit( this, FALSE );
2856 /***********************************************************************
2857 * TpSimpleTryPost (NTDLL.@)
2859 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
2860 TP_CALLBACK_ENVIRON *environment )
2862 struct threadpool_object *object;
2863 struct threadpool *pool;
2864 NTSTATUS status;
2866 TRACE( "%p %p %p\n", callback, userdata, environment );
2868 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2869 if (!object)
2870 return STATUS_NO_MEMORY;
2872 status = tp_threadpool_lock( &pool, environment );
2873 if (status)
2875 RtlFreeHeap( GetProcessHeap(), 0, object );
2876 return status;
2879 object->type = TP_OBJECT_TYPE_SIMPLE;
2880 object->u.simple.callback = callback;
2881 tp_object_initialize( object, pool, userdata, environment );
2883 return STATUS_SUCCESS;
2886 /***********************************************************************
2887 * TpWaitForTimer (NTDLL.@)
2889 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
2891 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2893 TRACE( "%p %d\n", timer, cancel_pending );
2895 if (cancel_pending)
2896 tp_object_cancel( this );
2897 tp_object_wait( this, FALSE );
2900 /***********************************************************************
2901 * TpWaitForWait (NTDLL.@)
2903 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
2905 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2907 TRACE( "%p %d\n", wait, cancel_pending );
2909 if (cancel_pending)
2910 tp_object_cancel( this );
2911 tp_object_wait( this, FALSE );
2914 /***********************************************************************
2915 * TpWaitForWork (NTDLL.@)
2917 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
2919 struct threadpool_object *this = impl_from_TP_WORK( work );
2921 TRACE( "%p %u\n", work, cancel_pending );
2923 if (cancel_pending)
2924 tp_object_cancel( this );
2925 tp_object_wait( this, FALSE );