wow64: Add thunks for the I/O completion syscalls.
[wine.git] / dlls / ntdll / threadpool.c
blob9e99398bdeea21cf0b2c84cb3c9f9622d113bce6
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include <assert.h>
23 #include <stdarg.h>
24 #include <limits.h>
26 #define NONAMELESSUNION
27 #include "ntstatus.h"
28 #define WIN32_NO_STATUS
29 #include "winternl.h"
31 #include "wine/debug.h"
32 #include "wine/list.h"
34 #include "ntdll_misc.h"
36 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
39 * Old thread pooling API
42 struct rtl_work_item
44 PRTL_WORK_ITEM_ROUTINE function;
45 PVOID context;
48 #define EXPIRE_NEVER (~(ULONGLONG)0)
49 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
51 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
53 static struct
55 HANDLE compl_port;
56 RTL_CRITICAL_SECTION threadpool_compl_cs;
58 old_threadpool =
60 NULL, /* compl_port */
61 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
64 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
66 0, 0, &old_threadpool.threadpool_compl_cs,
67 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
68 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
71 struct timer_queue;
72 struct queue_timer
74 struct timer_queue *q;
75 struct list entry;
76 ULONG runcount; /* number of callbacks pending execution */
77 RTL_WAITORTIMERCALLBACKFUNC callback;
78 PVOID param;
79 DWORD period;
80 ULONG flags;
81 ULONGLONG expire;
82 BOOL destroy; /* timer should be deleted; once set, never unset */
83 HANDLE event; /* removal event */
86 struct timer_queue
88 DWORD magic;
89 RTL_CRITICAL_SECTION cs;
90 struct list timers; /* sorted by expiration time */
91 BOOL quit; /* queue should be deleted; once set, never unset */
92 HANDLE event;
93 HANDLE thread;
97 * Object-oriented thread pooling API
100 #define THREADPOOL_WORKER_TIMEOUT 5000
101 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
103 /* internal threadpool representation */
104 struct threadpool
106 LONG refcount;
107 LONG objcount;
108 BOOL shutdown;
109 CRITICAL_SECTION cs;
110 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
111 struct list pools[3];
112 RTL_CONDITION_VARIABLE update_event;
113 /* information about worker threads, locked via .cs */
114 int max_workers;
115 int min_workers;
116 int num_workers;
117 int num_busy_workers;
118 HANDLE compl_port;
119 TP_POOL_STACK_INFORMATION stack_info;
122 enum threadpool_objtype
124 TP_OBJECT_TYPE_SIMPLE,
125 TP_OBJECT_TYPE_WORK,
126 TP_OBJECT_TYPE_TIMER,
127 TP_OBJECT_TYPE_WAIT,
128 TP_OBJECT_TYPE_IO,
131 struct io_completion
133 IO_STATUS_BLOCK iosb;
134 ULONG_PTR cvalue;
137 /* internal threadpool object representation */
138 struct threadpool_object
140 void *win32_callback; /* leave space for kernelbase to store win32 callback */
141 LONG refcount;
142 BOOL shutdown;
143 /* read-only information */
144 enum threadpool_objtype type;
145 struct threadpool *pool;
146 struct threadpool_group *group;
147 PVOID userdata;
148 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
149 PTP_SIMPLE_CALLBACK finalization_callback;
150 BOOL may_run_long;
151 HMODULE race_dll;
152 TP_CALLBACK_PRIORITY priority;
153 /* information about the group, locked via .group->cs */
154 struct list group_entry;
155 BOOL is_group_member;
156 /* information about the pool, locked via .pool->cs */
157 struct list pool_entry;
158 RTL_CONDITION_VARIABLE finished_event;
159 RTL_CONDITION_VARIABLE group_finished_event;
160 HANDLE completed_event;
161 LONG num_pending_callbacks;
162 LONG num_running_callbacks;
163 LONG num_associated_callbacks;
164 /* arguments for callback */
165 union
167 struct
169 PTP_SIMPLE_CALLBACK callback;
170 } simple;
171 struct
173 PTP_WORK_CALLBACK callback;
174 } work;
175 struct
177 PTP_TIMER_CALLBACK callback;
178 /* information about the timer, locked via timerqueue.cs */
179 BOOL timer_initialized;
180 BOOL timer_pending;
181 struct list timer_entry;
182 BOOL timer_set;
183 ULONGLONG timeout;
184 LONG period;
185 LONG window_length;
186 } timer;
187 struct
189 PTP_WAIT_CALLBACK callback;
190 LONG signaled;
191 /* information about the wait object, locked via waitqueue.cs */
192 struct waitqueue_bucket *bucket;
193 BOOL wait_pending;
194 struct list wait_entry;
195 ULONGLONG timeout;
196 HANDLE handle;
197 DWORD flags;
198 RTL_WAITORTIMERCALLBACKFUNC rtl_callback;
199 } wait;
200 struct
202 PTP_IO_CALLBACK callback;
203 /* locked via .pool->cs */
204 unsigned int pending_count, completion_count, completion_max;
205 struct io_completion *completions;
206 } io;
207 } u;
210 /* internal threadpool instance representation */
211 struct threadpool_instance
213 struct threadpool_object *object;
214 DWORD threadid;
215 BOOL associated;
216 BOOL may_run_long;
217 struct
219 CRITICAL_SECTION *critical_section;
220 HANDLE mutex;
221 HANDLE semaphore;
222 LONG semaphore_count;
223 HANDLE event;
224 HMODULE library;
225 } cleanup;
228 /* internal threadpool group representation */
229 struct threadpool_group
231 LONG refcount;
232 BOOL shutdown;
233 CRITICAL_SECTION cs;
234 /* list of group members, locked via .cs */
235 struct list members;
238 /* global timerqueue object */
239 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
241 static struct
243 CRITICAL_SECTION cs;
244 LONG objcount;
245 BOOL thread_running;
246 struct list pending_timers;
247 RTL_CONDITION_VARIABLE update_event;
249 timerqueue =
251 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
252 0, /* objcount */
253 FALSE, /* thread_running */
254 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
255 RTL_CONDITION_VARIABLE_INIT /* update_event */
258 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
260 0, 0, &timerqueue.cs,
261 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
262 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
265 /* global waitqueue object */
266 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
268 static struct
270 CRITICAL_SECTION cs;
271 LONG num_buckets;
272 struct list buckets;
274 waitqueue =
276 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
277 0, /* num_buckets */
278 LIST_INIT( waitqueue.buckets ) /* buckets */
281 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
283 0, 0, &waitqueue.cs,
284 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
285 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
288 struct waitqueue_bucket
290 struct list bucket_entry;
291 LONG objcount;
292 struct list reserved;
293 struct list waiting;
294 HANDLE update_event;
295 BOOL alertable;
298 /* global I/O completion queue object */
299 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
301 static struct
303 CRITICAL_SECTION cs;
304 LONG objcount;
305 BOOL thread_running;
306 HANDLE port;
307 RTL_CONDITION_VARIABLE update_event;
309 ioqueue =
311 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
314 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
316 0, 0, &ioqueue.cs,
317 { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
318 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
321 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
323 return (struct threadpool *)pool;
326 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
328 struct threadpool_object *object = (struct threadpool_object *)work;
329 assert( object->type == TP_OBJECT_TYPE_WORK );
330 return object;
333 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
335 struct threadpool_object *object = (struct threadpool_object *)timer;
336 assert( object->type == TP_OBJECT_TYPE_TIMER );
337 return object;
340 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
342 struct threadpool_object *object = (struct threadpool_object *)wait;
343 assert( object->type == TP_OBJECT_TYPE_WAIT );
344 return object;
347 static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
349 struct threadpool_object *object = (struct threadpool_object *)io;
350 assert( object->type == TP_OBJECT_TYPE_IO );
351 return object;
354 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
356 return (struct threadpool_group *)group;
359 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
361 return (struct threadpool_instance *)instance;
364 static void CALLBACK threadpool_worker_proc( void *param );
365 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
366 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
367 static void tp_object_prepare_shutdown( struct threadpool_object *object );
368 static BOOL tp_object_release( struct threadpool_object *object );
369 static struct threadpool *default_threadpool = NULL;
371 static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
373 unsigned int new_capacity, max_capacity;
374 void *new_elements;
376 if (count <= *capacity)
377 return TRUE;
379 max_capacity = ~(SIZE_T)0 / size;
380 if (count > max_capacity)
381 return FALSE;
383 new_capacity = max(4, *capacity);
384 while (new_capacity < count && new_capacity <= max_capacity / 2)
385 new_capacity *= 2;
386 if (new_capacity < count)
387 new_capacity = max_capacity;
389 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
390 return FALSE;
392 *elements = new_elements;
393 *capacity = new_capacity;
395 return TRUE;
398 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
400 struct rtl_work_item *item = userdata;
402 TRACE("executing %p(%p)\n", item->function, item->context);
403 item->function( item->context );
405 RtlFreeHeap( GetProcessHeap(), 0, item );
408 /***********************************************************************
409 * RtlQueueWorkItem (NTDLL.@)
411 * Queues a work item into a thread in the thread pool.
413 * PARAMS
414 * function [I] Work function to execute.
415 * context [I] Context to pass to the work function when it is executed.
416 * flags [I] Flags. See notes.
418 * RETURNS
419 * Success: STATUS_SUCCESS.
420 * Failure: Any NTSTATUS code.
422 * NOTES
423 * Flags can be one or more of the following:
424 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
425 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
426 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
427 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
428 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
430 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
432 TP_CALLBACK_ENVIRON environment;
433 struct rtl_work_item *item;
434 NTSTATUS status;
436 TRACE( "%p %p %u\n", function, context, flags );
438 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
439 if (!item)
440 return STATUS_NO_MEMORY;
442 memset( &environment, 0, sizeof(environment) );
443 environment.Version = 1;
444 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
445 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
447 item->function = function;
448 item->context = context;
450 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
451 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
452 return status;
455 /***********************************************************************
456 * iocp_poller - get completion events and run callbacks
458 static DWORD CALLBACK iocp_poller(LPVOID Arg)
460 HANDLE cport = Arg;
462 while( TRUE )
464 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
465 LPVOID overlapped;
466 IO_STATUS_BLOCK iosb;
467 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
468 if (res)
470 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
472 else
474 DWORD transferred = 0;
475 DWORD err = 0;
477 if (iosb.u.Status == STATUS_SUCCESS)
478 transferred = iosb.Information;
479 else
480 err = RtlNtStatusToDosError(iosb.u.Status);
482 callback( err, transferred, overlapped );
485 return 0;
488 /***********************************************************************
489 * RtlSetIoCompletionCallback (NTDLL.@)
491 * Binds a handle to a thread pool's completion port, and possibly
492 * starts a non-I/O thread to monitor this port and call functions back.
494 * PARAMS
495 * FileHandle [I] Handle to bind to a completion port.
496 * Function [I] Callback function to call on I/O completions.
497 * Flags [I] Not used.
499 * RETURNS
500 * Success: STATUS_SUCCESS.
501 * Failure: Any NTSTATUS code.
504 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
506 IO_STATUS_BLOCK iosb;
507 FILE_COMPLETION_INFORMATION info;
509 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
511 if (!old_threadpool.compl_port)
513 NTSTATUS res = STATUS_SUCCESS;
515 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
516 if (!old_threadpool.compl_port)
518 HANDLE cport;
520 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
521 if (!res)
523 /* FIXME native can start additional threads in case of e.g. hung callback function. */
524 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
525 if (!res)
526 old_threadpool.compl_port = cport;
527 else
528 NtClose( cport );
531 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
532 if (res) return res;
535 info.CompletionPort = old_threadpool.compl_port;
536 info.CompletionKey = (ULONG_PTR)Function;
538 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
541 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
543 if (timeout == INFINITE) return NULL;
544 pTime->QuadPart = (ULONGLONG)timeout * -10000;
545 return pTime;
549 /************************** Timer Queue Impl **************************/
551 static void queue_remove_timer(struct queue_timer *t)
553 /* We MUST hold the queue cs while calling this function. This ensures
554 that we cannot queue another callback for this timer. The runcount
555 being zero makes sure we don't have any already queued. */
556 struct timer_queue *q = t->q;
558 assert(t->runcount == 0);
559 assert(t->destroy);
561 list_remove(&t->entry);
562 if (t->event)
563 NtSetEvent(t->event, NULL);
564 RtlFreeHeap(GetProcessHeap(), 0, t);
566 if (q->quit && list_empty(&q->timers))
567 NtSetEvent(q->event, NULL);
570 static void timer_cleanup_callback(struct queue_timer *t)
572 struct timer_queue *q = t->q;
573 RtlEnterCriticalSection(&q->cs);
575 assert(0 < t->runcount);
576 --t->runcount;
578 if (t->destroy && t->runcount == 0)
579 queue_remove_timer(t);
581 RtlLeaveCriticalSection(&q->cs);
584 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
586 struct queue_timer *t = p;
587 t->callback(t->param, TRUE);
588 timer_cleanup_callback(t);
589 return 0;
592 static inline ULONGLONG queue_current_time(void)
594 LARGE_INTEGER now, freq;
595 NtQueryPerformanceCounter(&now, &freq);
596 return now.QuadPart * 1000 / freq.QuadPart;
599 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
600 BOOL set_event)
602 /* We MUST hold the queue cs while calling this function. */
603 struct timer_queue *q = t->q;
604 struct list *ptr = &q->timers;
606 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
608 if (time != EXPIRE_NEVER)
609 LIST_FOR_EACH(ptr, &q->timers)
611 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
612 if (time < cur->expire)
613 break;
615 list_add_before(ptr, &t->entry);
617 t->expire = time;
619 /* If we insert at the head of the list, we need to expire sooner
620 than expected. */
621 if (set_event && &t->entry == list_head(&q->timers))
622 NtSetEvent(q->event, NULL);
625 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
626 BOOL set_event)
628 /* We MUST hold the queue cs while calling this function. */
629 list_remove(&t->entry);
630 queue_add_timer(t, time, set_event);
633 static void queue_timer_expire(struct timer_queue *q)
635 struct queue_timer *t = NULL;
637 RtlEnterCriticalSection(&q->cs);
638 if (list_head(&q->timers))
640 ULONGLONG now, next;
641 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
642 if (!t->destroy && t->expire <= ((now = queue_current_time())))
644 ++t->runcount;
645 if (t->period)
647 next = t->expire + t->period;
648 /* avoid trigger cascade if overloaded / hibernated */
649 if (next < now)
650 next = now + t->period;
652 else
653 next = EXPIRE_NEVER;
654 queue_move_timer(t, next, FALSE);
656 else
657 t = NULL;
659 RtlLeaveCriticalSection(&q->cs);
661 if (t)
663 if (t->flags & WT_EXECUTEINTIMERTHREAD)
664 timer_callback_wrapper(t);
665 else
667 ULONG flags
668 = (t->flags
669 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
670 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
671 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
672 if (status != STATUS_SUCCESS)
673 timer_cleanup_callback(t);
678 static ULONG queue_get_timeout(struct timer_queue *q)
680 struct queue_timer *t;
681 ULONG timeout = INFINITE;
683 RtlEnterCriticalSection(&q->cs);
684 if (list_head(&q->timers))
686 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
687 assert(!t->destroy || t->expire == EXPIRE_NEVER);
689 if (t->expire != EXPIRE_NEVER)
691 ULONGLONG time = queue_current_time();
692 timeout = t->expire < time ? 0 : t->expire - time;
695 RtlLeaveCriticalSection(&q->cs);
697 return timeout;
700 static void WINAPI timer_queue_thread_proc(LPVOID p)
702 struct timer_queue *q = p;
703 ULONG timeout_ms;
705 timeout_ms = INFINITE;
706 for (;;)
708 LARGE_INTEGER timeout;
709 NTSTATUS status;
710 BOOL done = FALSE;
712 status = NtWaitForSingleObject(
713 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
715 if (status == STATUS_WAIT_0)
717 /* There are two possible ways to trigger the event. Either
718 we are quitting and the last timer got removed, or a new
719 timer got put at the head of the list so we need to adjust
720 our timeout. */
721 RtlEnterCriticalSection(&q->cs);
722 if (q->quit && list_empty(&q->timers))
723 done = TRUE;
724 RtlLeaveCriticalSection(&q->cs);
726 else if (status == STATUS_TIMEOUT)
727 queue_timer_expire(q);
729 if (done)
730 break;
732 timeout_ms = queue_get_timeout(q);
735 NtClose(q->event);
736 RtlDeleteCriticalSection(&q->cs);
737 q->magic = 0;
738 RtlFreeHeap(GetProcessHeap(), 0, q);
739 RtlExitUserThread( 0 );
742 static void queue_destroy_timer(struct queue_timer *t)
744 /* We MUST hold the queue cs while calling this function. */
745 t->destroy = TRUE;
746 if (t->runcount == 0)
747 /* Ensure a timer is promptly removed. If callbacks are pending,
748 it will be removed after the last one finishes by the callback
749 cleanup wrapper. */
750 queue_remove_timer(t);
751 else
752 /* Make sure no destroyed timer masks an active timer at the head
753 of the sorted list. */
754 queue_move_timer(t, EXPIRE_NEVER, FALSE);
757 /***********************************************************************
758 * RtlCreateTimerQueue (NTDLL.@)
760 * Creates a timer queue object and returns a handle to it.
762 * PARAMS
763 * NewTimerQueue [O] The newly created queue.
765 * RETURNS
766 * Success: STATUS_SUCCESS.
767 * Failure: Any NTSTATUS code.
769 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
771 NTSTATUS status;
772 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
773 if (!q)
774 return STATUS_NO_MEMORY;
776 RtlInitializeCriticalSection(&q->cs);
777 list_init(&q->timers);
778 q->quit = FALSE;
779 q->magic = TIMER_QUEUE_MAGIC;
780 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
781 if (status != STATUS_SUCCESS)
783 RtlFreeHeap(GetProcessHeap(), 0, q);
784 return status;
786 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
787 timer_queue_thread_proc, q, &q->thread, NULL);
788 if (status != STATUS_SUCCESS)
790 NtClose(q->event);
791 RtlFreeHeap(GetProcessHeap(), 0, q);
792 return status;
795 *NewTimerQueue = q;
796 return STATUS_SUCCESS;
799 /***********************************************************************
800 * RtlDeleteTimerQueueEx (NTDLL.@)
802 * Deletes a timer queue object.
804 * PARAMS
805 * TimerQueue [I] The timer queue to destroy.
806 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
807 * wait until all timers are finished firing before
808 * returning. Otherwise, return immediately and set the
809 * event when all timers are done.
811 * RETURNS
812 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
813 * Failure: Any NTSTATUS code.
815 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
817 struct timer_queue *q = TimerQueue;
818 struct queue_timer *t, *temp;
819 HANDLE thread;
820 NTSTATUS status;
822 if (!q || q->magic != TIMER_QUEUE_MAGIC)
823 return STATUS_INVALID_HANDLE;
825 thread = q->thread;
827 RtlEnterCriticalSection(&q->cs);
828 q->quit = TRUE;
829 if (list_head(&q->timers))
830 /* When the last timer is removed, it will signal the timer thread to
831 exit... */
832 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
833 queue_destroy_timer(t);
834 else
835 /* However if we have none, we must do it ourselves. */
836 NtSetEvent(q->event, NULL);
837 RtlLeaveCriticalSection(&q->cs);
839 if (CompletionEvent == INVALID_HANDLE_VALUE)
841 NtWaitForSingleObject(thread, FALSE, NULL);
842 status = STATUS_SUCCESS;
844 else
846 if (CompletionEvent)
848 FIXME("asynchronous return on completion event unimplemented\n");
849 NtWaitForSingleObject(thread, FALSE, NULL);
850 NtSetEvent(CompletionEvent, NULL);
852 status = STATUS_PENDING;
855 NtClose(thread);
856 return status;
859 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
861 static struct timer_queue *default_timer_queue;
863 if (TimerQueue)
864 return TimerQueue;
865 else
867 if (!default_timer_queue)
869 HANDLE q;
870 NTSTATUS status = RtlCreateTimerQueue(&q);
871 if (status == STATUS_SUCCESS)
873 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
874 if (p)
875 /* Got beat to the punch. */
876 RtlDeleteTimerQueueEx(q, NULL);
879 return default_timer_queue;
883 /***********************************************************************
884 * RtlCreateTimer (NTDLL.@)
886 * Creates a new timer associated with the given queue.
888 * PARAMS
889 * NewTimer [O] The newly created timer.
890 * TimerQueue [I] The queue to hold the timer.
891 * Callback [I] The callback to fire.
892 * Parameter [I] The argument for the callback.
893 * DueTime [I] The delay, in milliseconds, before first firing the
894 * timer.
895 * Period [I] The period, in milliseconds, at which to fire the timer
896 * after the first callback. If zero, the timer will only
897 * fire once. It still needs to be deleted with
898 * RtlDeleteTimer.
899 * Flags [I] Flags controlling the execution of the callback. In
900 * addition to the WT_* thread pool flags (see
901 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
902 * WT_EXECUTEONLYONCE are supported.
904 * RETURNS
905 * Success: STATUS_SUCCESS.
906 * Failure: Any NTSTATUS code.
908 NTSTATUS WINAPI RtlCreateTimer(PHANDLE NewTimer, HANDLE TimerQueue,
909 RTL_WAITORTIMERCALLBACKFUNC Callback,
910 PVOID Parameter, DWORD DueTime, DWORD Period,
911 ULONG Flags)
913 NTSTATUS status;
914 struct queue_timer *t;
915 struct timer_queue *q = get_timer_queue(TimerQueue);
917 if (!q) return STATUS_NO_MEMORY;
918 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
920 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
921 if (!t)
922 return STATUS_NO_MEMORY;
924 t->q = q;
925 t->runcount = 0;
926 t->callback = Callback;
927 t->param = Parameter;
928 t->period = Period;
929 t->flags = Flags;
930 t->destroy = FALSE;
931 t->event = NULL;
933 status = STATUS_SUCCESS;
934 RtlEnterCriticalSection(&q->cs);
935 if (q->quit)
936 status = STATUS_INVALID_HANDLE;
937 else
938 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
939 RtlLeaveCriticalSection(&q->cs);
941 if (status == STATUS_SUCCESS)
942 *NewTimer = t;
943 else
944 RtlFreeHeap(GetProcessHeap(), 0, t);
946 return status;
949 /***********************************************************************
950 * RtlUpdateTimer (NTDLL.@)
952 * Changes the time at which a timer expires.
954 * PARAMS
955 * TimerQueue [I] The queue that holds the timer.
956 * Timer [I] The timer to update.
957 * DueTime [I] The delay, in milliseconds, before next firing the timer.
958 * Period [I] The period, in milliseconds, at which to fire the timer
959 * after the first callback. If zero, the timer will not
960 * refire once. It still needs to be deleted with
961 * RtlDeleteTimer.
963 * RETURNS
964 * Success: STATUS_SUCCESS.
965 * Failure: Any NTSTATUS code.
967 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
968 DWORD DueTime, DWORD Period)
970 struct queue_timer *t = Timer;
971 struct timer_queue *q = t->q;
973 RtlEnterCriticalSection(&q->cs);
974 /* Can't change a timer if it was once-only or destroyed. */
975 if (t->expire != EXPIRE_NEVER)
977 t->period = Period;
978 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
980 RtlLeaveCriticalSection(&q->cs);
982 return STATUS_SUCCESS;
985 /***********************************************************************
986 * RtlDeleteTimer (NTDLL.@)
988 * Cancels a timer-queue timer.
990 * PARAMS
991 * TimerQueue [I] The queue that holds the timer.
992 * Timer [I] The timer to update.
993 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
994 * wait until the timer is finished firing all pending
995 * callbacks before returning. Otherwise, return
996 * immediately and set the timer is done.
998 * RETURNS
999 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1000 or if the completion event is NULL.
1001 * Failure: Any NTSTATUS code.
1003 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1004 HANDLE CompletionEvent)
1006 struct queue_timer *t = Timer;
1007 struct timer_queue *q;
1008 NTSTATUS status = STATUS_PENDING;
1009 HANDLE event = NULL;
1011 if (!Timer)
1012 return STATUS_INVALID_PARAMETER_1;
1013 q = t->q;
1014 if (CompletionEvent == INVALID_HANDLE_VALUE)
1016 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1017 if (status == STATUS_SUCCESS)
1018 status = STATUS_PENDING;
1020 else if (CompletionEvent)
1021 event = CompletionEvent;
1023 RtlEnterCriticalSection(&q->cs);
1024 t->event = event;
1025 if (t->runcount == 0 && event)
1026 status = STATUS_SUCCESS;
1027 queue_destroy_timer(t);
1028 RtlLeaveCriticalSection(&q->cs);
1030 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1032 if (status == STATUS_PENDING)
1034 NtWaitForSingleObject(event, FALSE, NULL);
1035 status = STATUS_SUCCESS;
1037 NtClose(event);
1040 return status;
1043 /***********************************************************************
1044 * timerqueue_thread_proc (internal)
1046 static void CALLBACK timerqueue_thread_proc( void *param )
1048 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1049 struct threadpool_object *other_timer;
1050 LARGE_INTEGER now, timeout;
1051 struct list *ptr;
1053 TRACE( "starting timer queue thread\n" );
1055 RtlEnterCriticalSection( &timerqueue.cs );
1056 for (;;)
1058 NtQuerySystemTime( &now );
1060 /* Check for expired timers. */
1061 while ((ptr = list_head( &timerqueue.pending_timers )))
1063 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1064 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1065 assert( timer->u.timer.timer_pending );
1066 if (timer->u.timer.timeout > now.QuadPart)
1067 break;
1069 /* Queue a new callback in one of the worker threads. */
1070 list_remove( &timer->u.timer.timer_entry );
1071 timer->u.timer.timer_pending = FALSE;
1072 tp_object_submit( timer, FALSE );
1074 /* Insert the timer back into the queue, except it's marked for shutdown. */
1075 if (timer->u.timer.period && !timer->shutdown)
1077 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1078 if (timer->u.timer.timeout <= now.QuadPart)
1079 timer->u.timer.timeout = now.QuadPart + 1;
1081 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1082 struct threadpool_object, u.timer.timer_entry )
1084 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1085 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1086 break;
1088 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1089 timer->u.timer.timer_pending = TRUE;
1093 timeout_lower = timeout_upper = MAXLONGLONG;
1095 /* Determine next timeout and use the window length to optimize wakeup times. */
1096 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1097 struct threadpool_object, u.timer.timer_entry )
1099 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1100 if (other_timer->u.timer.timeout >= timeout_upper)
1101 break;
1103 timeout_lower = other_timer->u.timer.timeout;
1104 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1105 if (new_timeout < timeout_upper)
1106 timeout_upper = new_timeout;
1109 /* Wait for timer update events or until the next timer expires. */
1110 if (timerqueue.objcount)
1112 timeout.QuadPart = timeout_lower;
1113 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1114 continue;
1117 /* All timers have been destroyed, if no new timers are created
1118 * within some amount of time, then we can shutdown this thread. */
1119 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1120 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1121 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1123 break;
1127 timerqueue.thread_running = FALSE;
1128 RtlLeaveCriticalSection( &timerqueue.cs );
1130 TRACE( "terminating timer queue thread\n" );
1131 RtlExitUserThread( 0 );
1134 /***********************************************************************
1135 * tp_new_worker_thread (internal)
1137 * Create and account a new worker thread for the desired pool.
1139 static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1141 HANDLE thread;
1142 NTSTATUS status;
1144 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1145 threadpool_worker_proc, pool, &thread, NULL );
1146 if (status == STATUS_SUCCESS)
1148 InterlockedIncrement( &pool->refcount );
1149 pool->num_workers++;
1150 NtClose( thread );
1152 return status;
1155 /***********************************************************************
1156 * tp_timerqueue_lock (internal)
1158 * Acquires a lock on the global timerqueue. When the lock is acquired
1159 * successfully, it is guaranteed that the timer thread is running.
1161 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1163 NTSTATUS status = STATUS_SUCCESS;
1164 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1166 timer->u.timer.timer_initialized = FALSE;
1167 timer->u.timer.timer_pending = FALSE;
1168 timer->u.timer.timer_set = FALSE;
1169 timer->u.timer.timeout = 0;
1170 timer->u.timer.period = 0;
1171 timer->u.timer.window_length = 0;
1173 RtlEnterCriticalSection( &timerqueue.cs );
1175 /* Make sure that the timerqueue thread is running. */
1176 if (!timerqueue.thread_running)
1178 HANDLE thread;
1179 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1180 timerqueue_thread_proc, NULL, &thread, NULL );
1181 if (status == STATUS_SUCCESS)
1183 timerqueue.thread_running = TRUE;
1184 NtClose( thread );
1188 if (status == STATUS_SUCCESS)
1190 timer->u.timer.timer_initialized = TRUE;
1191 timerqueue.objcount++;
1194 RtlLeaveCriticalSection( &timerqueue.cs );
1195 return status;
1198 /***********************************************************************
1199 * tp_timerqueue_unlock (internal)
1201 * Releases a lock on the global timerqueue.
1203 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1205 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1207 RtlEnterCriticalSection( &timerqueue.cs );
1208 if (timer->u.timer.timer_initialized)
1210 /* If timer was pending, remove it. */
1211 if (timer->u.timer.timer_pending)
1213 list_remove( &timer->u.timer.timer_entry );
1214 timer->u.timer.timer_pending = FALSE;
1217 /* If the last timer object was destroyed, then wake up the thread. */
1218 if (!--timerqueue.objcount)
1220 assert( list_empty( &timerqueue.pending_timers ) );
1221 RtlWakeAllConditionVariable( &timerqueue.update_event );
1224 timer->u.timer.timer_initialized = FALSE;
1226 RtlLeaveCriticalSection( &timerqueue.cs );
1229 /***********************************************************************
1230 * waitqueue_thread_proc (internal)
1232 static void CALLBACK waitqueue_thread_proc( void *param )
1234 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1235 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1236 struct waitqueue_bucket *bucket = param;
1237 struct threadpool_object *wait, *next;
1238 LARGE_INTEGER now, timeout;
1239 DWORD num_handles;
1240 NTSTATUS status;
1242 TRACE( "starting wait queue thread\n" );
1244 RtlEnterCriticalSection( &waitqueue.cs );
1246 for (;;)
1248 NtQuerySystemTime( &now );
1249 timeout.QuadPart = MAXLONGLONG;
1250 num_handles = 0;
1252 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1253 u.wait.wait_entry )
1255 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1256 if (wait->u.wait.timeout <= now.QuadPart)
1258 /* Wait object timed out. */
1259 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1261 list_remove( &wait->u.wait.wait_entry );
1262 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1264 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1266 InterlockedIncrement( &wait->refcount );
1267 wait->num_pending_callbacks++;
1268 RtlEnterCriticalSection( &wait->pool->cs );
1269 tp_object_execute( wait, TRUE );
1270 RtlLeaveCriticalSection( &wait->pool->cs );
1271 tp_object_release( wait );
1273 else tp_object_submit( wait, FALSE );
1275 else
1277 if (wait->u.wait.timeout < timeout.QuadPart)
1278 timeout.QuadPart = wait->u.wait.timeout;
1280 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1281 InterlockedIncrement( &wait->refcount );
1282 objects[num_handles] = wait;
1283 handles[num_handles] = wait->u.wait.handle;
1284 num_handles++;
1288 if (!bucket->objcount)
1290 /* All wait objects have been destroyed, if no new wait objects are created
1291 * within some amount of time, then we can shutdown this thread. */
1292 assert( num_handles == 0 );
1293 RtlLeaveCriticalSection( &waitqueue.cs );
1294 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1295 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1296 RtlEnterCriticalSection( &waitqueue.cs );
1298 if (status == STATUS_TIMEOUT && !bucket->objcount)
1299 break;
1301 else
1303 handles[num_handles] = bucket->update_event;
1304 RtlLeaveCriticalSection( &waitqueue.cs );
1305 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1306 RtlEnterCriticalSection( &waitqueue.cs );
1308 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1310 wait = objects[status - STATUS_WAIT_0];
1311 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1312 if (wait->u.wait.bucket)
1314 /* Wait object signaled. */
1315 assert( wait->u.wait.bucket == bucket );
1316 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1318 list_remove( &wait->u.wait.wait_entry );
1319 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1321 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1323 wait->u.wait.signaled++;
1324 wait->num_pending_callbacks++;
1325 RtlEnterCriticalSection( &wait->pool->cs );
1326 tp_object_execute( wait, TRUE );
1327 RtlLeaveCriticalSection( &wait->pool->cs );
1329 else tp_object_submit( wait, TRUE );
1331 else
1332 WARN("wait object %p triggered while object was destroyed\n", wait);
1335 /* Release temporary references to wait objects. */
1336 while (num_handles)
1338 wait = objects[--num_handles];
1339 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1340 tp_object_release( wait );
1344 /* Try to merge bucket with other threads. */
1345 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1346 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1348 struct waitqueue_bucket *other_bucket;
1349 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1351 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1352 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1354 other_bucket->objcount += bucket->objcount;
1355 bucket->objcount = 0;
1357 /* Update reserved list. */
1358 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1360 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1361 wait->u.wait.bucket = other_bucket;
1363 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1365 /* Update waiting list. */
1366 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1368 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1369 wait->u.wait.bucket = other_bucket;
1371 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1373 /* Move bucket to the end, to keep the probability of
1374 * newly added wait objects as small as possible. */
1375 list_remove( &bucket->bucket_entry );
1376 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1378 NtSetEvent( other_bucket->update_event, NULL );
1379 break;
1385 /* Remove this bucket from the list. */
1386 list_remove( &bucket->bucket_entry );
1387 if (!--waitqueue.num_buckets)
1388 assert( list_empty( &waitqueue.buckets ) );
1390 RtlLeaveCriticalSection( &waitqueue.cs );
1392 TRACE( "terminating wait queue thread\n" );
1394 assert( bucket->objcount == 0 );
1395 assert( list_empty( &bucket->reserved ) );
1396 assert( list_empty( &bucket->waiting ) );
1397 NtClose( bucket->update_event );
1399 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1400 RtlExitUserThread( 0 );
1403 /***********************************************************************
1404 * tp_waitqueue_lock (internal)
1406 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1408 struct waitqueue_bucket *bucket;
1409 NTSTATUS status;
1410 HANDLE thread;
1411 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1412 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1414 wait->u.wait.signaled = 0;
1415 wait->u.wait.bucket = NULL;
1416 wait->u.wait.wait_pending = FALSE;
1417 wait->u.wait.timeout = 0;
1418 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1420 RtlEnterCriticalSection( &waitqueue.cs );
1422 /* Try to assign to existing bucket if possible. */
1423 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1425 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1427 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1428 wait->u.wait.bucket = bucket;
1429 bucket->objcount++;
1431 status = STATUS_SUCCESS;
1432 goto out;
1436 /* Create a new bucket and corresponding worker thread. */
1437 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1438 if (!bucket)
1440 status = STATUS_NO_MEMORY;
1441 goto out;
1444 bucket->objcount = 0;
1445 bucket->alertable = alertable;
1446 list_init( &bucket->reserved );
1447 list_init( &bucket->waiting );
1449 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1450 NULL, SynchronizationEvent, FALSE );
1451 if (status)
1453 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1454 goto out;
1457 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1458 waitqueue_thread_proc, bucket, &thread, NULL );
1459 if (status == STATUS_SUCCESS)
1461 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1462 waitqueue.num_buckets++;
1464 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1465 wait->u.wait.bucket = bucket;
1466 bucket->objcount++;
1468 NtClose( thread );
1470 else
1472 NtClose( bucket->update_event );
1473 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1476 out:
1477 RtlLeaveCriticalSection( &waitqueue.cs );
1478 return status;
1481 /***********************************************************************
1482 * tp_waitqueue_unlock (internal)
1484 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1486 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1488 RtlEnterCriticalSection( &waitqueue.cs );
1489 if (wait->u.wait.bucket)
1491 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1492 assert( bucket->objcount > 0 );
1494 list_remove( &wait->u.wait.wait_entry );
1495 wait->u.wait.bucket = NULL;
1496 bucket->objcount--;
1498 NtSetEvent( bucket->update_event, NULL );
1500 RtlLeaveCriticalSection( &waitqueue.cs );
1503 static void CALLBACK ioqueue_thread_proc( void *param )
1505 struct io_completion *completion;
1506 struct threadpool_object *io;
1507 IO_STATUS_BLOCK iosb;
1508 ULONG_PTR key, value;
1509 NTSTATUS status;
1511 TRACE( "starting I/O completion thread\n" );
1513 RtlEnterCriticalSection( &ioqueue.cs );
1515 for (;;)
1517 RtlLeaveCriticalSection( &ioqueue.cs );
1518 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1519 ERR("NtRemoveIoCompletion failed, status %#x.\n", status);
1520 RtlEnterCriticalSection( &ioqueue.cs );
1522 io = (struct threadpool_object *)key;
1524 if (io && io->shutdown)
1526 if (iosb.u.Status != STATUS_THREADPOOL_RELEASED_DURING_OPERATION)
1528 /* Skip remaining completions until the final one. */
1529 continue;
1531 --ioqueue.objcount;
1532 TRACE( "Releasing io %p.\n", io );
1533 tp_object_release( io );
1535 else if (io)
1537 RtlEnterCriticalSection( &io->pool->cs );
1539 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1540 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1542 ERR("Failed to allocate memory.\n");
1543 RtlLeaveCriticalSection( &io->pool->cs );
1544 continue;
1547 completion = &io->u.io.completions[io->u.io.completion_count++];
1548 completion->iosb = iosb;
1549 completion->cvalue = value;
1551 tp_object_submit( io, FALSE );
1553 RtlLeaveCriticalSection( &io->pool->cs );
1556 if (!ioqueue.objcount)
1558 /* All I/O objects have been destroyed; if no new objects are
1559 * created within some amount of time, then we can shutdown this
1560 * thread. */
1561 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1562 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1563 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1564 break;
1568 RtlLeaveCriticalSection( &ioqueue.cs );
1570 TRACE( "terminating I/O completion thread\n" );
1572 RtlExitUserThread( 0 );
1575 static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
1577 NTSTATUS status = STATUS_SUCCESS;
1579 assert( io->type == TP_OBJECT_TYPE_IO );
1581 RtlEnterCriticalSection( &ioqueue.cs );
1583 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1584 IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
1586 RtlLeaveCriticalSection( &ioqueue.cs );
1587 return status;
1590 if (!ioqueue.thread_running)
1592 HANDLE thread;
1594 if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
1595 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1597 ioqueue.thread_running = TRUE;
1598 NtClose( thread );
1602 if (status == STATUS_SUCCESS)
1604 FILE_COMPLETION_INFORMATION info;
1605 IO_STATUS_BLOCK iosb;
1607 info.CompletionPort = ioqueue.port;
1608 info.CompletionKey = (ULONG_PTR)io;
1610 status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
1613 if (status == STATUS_SUCCESS)
1615 if (!ioqueue.objcount++)
1616 RtlWakeConditionVariable( &ioqueue.update_event );
1619 RtlLeaveCriticalSection( &ioqueue.cs );
1620 return status;
1623 /***********************************************************************
1624 * tp_threadpool_alloc (internal)
1626 * Allocates a new threadpool object.
1628 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1630 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress );
1631 struct threadpool *pool;
1632 unsigned int i;
1634 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1635 if (!pool)
1636 return STATUS_NO_MEMORY;
1638 pool->refcount = 1;
1639 pool->objcount = 0;
1640 pool->shutdown = FALSE;
1642 RtlInitializeCriticalSection( &pool->cs );
1643 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1645 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1646 list_init( &pool->pools[i] );
1647 RtlInitializeConditionVariable( &pool->update_event );
1649 pool->max_workers = 500;
1650 pool->min_workers = 0;
1651 pool->num_workers = 0;
1652 pool->num_busy_workers = 0;
1653 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1654 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1656 TRACE( "allocated threadpool %p\n", pool );
1658 *out = pool;
1659 return STATUS_SUCCESS;
1662 /***********************************************************************
1663 * tp_threadpool_shutdown (internal)
1665 * Prepares the shutdown of a threadpool object and notifies all worker
1666 * threads to terminate (after all remaining work items have been
1667 * processed).
1669 static void tp_threadpool_shutdown( struct threadpool *pool )
1671 assert( pool != default_threadpool );
1673 pool->shutdown = TRUE;
1674 RtlWakeAllConditionVariable( &pool->update_event );
1677 /***********************************************************************
1678 * tp_threadpool_release (internal)
1680 * Releases a reference to a threadpool object.
1682 static BOOL tp_threadpool_release( struct threadpool *pool )
1684 unsigned int i;
1686 if (InterlockedDecrement( &pool->refcount ))
1687 return FALSE;
1689 TRACE( "destroying threadpool %p\n", pool );
1691 assert( pool->shutdown );
1692 assert( !pool->objcount );
1693 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1694 assert( list_empty( &pool->pools[i] ) );
1696 pool->cs.DebugInfo->Spare[0] = 0;
1697 RtlDeleteCriticalSection( &pool->cs );
1699 RtlFreeHeap( GetProcessHeap(), 0, pool );
1700 return TRUE;
1703 /***********************************************************************
1704 * tp_threadpool_lock (internal)
1706 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1707 * block. When the lock is acquired successfully, it is guaranteed that
1708 * there is at least one worker thread to process tasks.
1710 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1712 struct threadpool *pool = NULL;
1713 NTSTATUS status = STATUS_SUCCESS;
1715 if (environment)
1717 /* Validate environment parameters. */
1718 if (environment->Version == 3)
1720 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1722 switch (environment3->CallbackPriority)
1724 case TP_CALLBACK_PRIORITY_HIGH:
1725 case TP_CALLBACK_PRIORITY_NORMAL:
1726 case TP_CALLBACK_PRIORITY_LOW:
1727 break;
1728 default:
1729 return STATUS_INVALID_PARAMETER;
1733 pool = (struct threadpool *)environment->Pool;
1736 if (!pool)
1738 if (!default_threadpool)
1740 status = tp_threadpool_alloc( &pool );
1741 if (status != STATUS_SUCCESS)
1742 return status;
1744 if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool, NULL ) != NULL)
1746 tp_threadpool_shutdown( pool );
1747 tp_threadpool_release( pool );
1751 pool = default_threadpool;
1754 RtlEnterCriticalSection( &pool->cs );
1756 /* Make sure that the threadpool has at least one thread. */
1757 if (!pool->num_workers)
1758 status = tp_new_worker_thread( pool );
1760 /* Keep a reference, and increment objcount to ensure that the
1761 * last thread doesn't terminate. */
1762 if (status == STATUS_SUCCESS)
1764 InterlockedIncrement( &pool->refcount );
1765 pool->objcount++;
1768 RtlLeaveCriticalSection( &pool->cs );
1770 if (status != STATUS_SUCCESS)
1771 return status;
1773 *out = pool;
1774 return STATUS_SUCCESS;
1777 /***********************************************************************
1778 * tp_threadpool_unlock (internal)
1780 * Releases a lock on a threadpool.
1782 static void tp_threadpool_unlock( struct threadpool *pool )
1784 RtlEnterCriticalSection( &pool->cs );
1785 pool->objcount--;
1786 RtlLeaveCriticalSection( &pool->cs );
1787 tp_threadpool_release( pool );
1790 /***********************************************************************
1791 * tp_group_alloc (internal)
1793 * Allocates a new threadpool group object.
1795 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1797 struct threadpool_group *group;
1799 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1800 if (!group)
1801 return STATUS_NO_MEMORY;
1803 group->refcount = 1;
1804 group->shutdown = FALSE;
1806 RtlInitializeCriticalSection( &group->cs );
1807 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1809 list_init( &group->members );
1811 TRACE( "allocated group %p\n", group );
1813 *out = group;
1814 return STATUS_SUCCESS;
1817 /***********************************************************************
1818 * tp_group_shutdown (internal)
1820 * Marks the group object for shutdown.
1822 static void tp_group_shutdown( struct threadpool_group *group )
1824 group->shutdown = TRUE;
1827 /***********************************************************************
1828 * tp_group_release (internal)
1830 * Releases a reference to a group object.
1832 static BOOL tp_group_release( struct threadpool_group *group )
1834 if (InterlockedDecrement( &group->refcount ))
1835 return FALSE;
1837 TRACE( "destroying group %p\n", group );
1839 assert( group->shutdown );
1840 assert( list_empty( &group->members ) );
1842 group->cs.DebugInfo->Spare[0] = 0;
1843 RtlDeleteCriticalSection( &group->cs );
1845 RtlFreeHeap( GetProcessHeap(), 0, group );
1846 return TRUE;
1849 /***********************************************************************
1850 * tp_object_initialize (internal)
1852 * Initializes members of a threadpool object.
1854 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
1855 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
1857 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
1859 object->refcount = 1;
1860 object->shutdown = FALSE;
1862 object->pool = pool;
1863 object->group = NULL;
1864 object->userdata = userdata;
1865 object->group_cancel_callback = NULL;
1866 object->finalization_callback = NULL;
1867 object->may_run_long = 0;
1868 object->race_dll = NULL;
1869 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
1871 memset( &object->group_entry, 0, sizeof(object->group_entry) );
1872 object->is_group_member = FALSE;
1874 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
1875 RtlInitializeConditionVariable( &object->finished_event );
1876 RtlInitializeConditionVariable( &object->group_finished_event );
1877 object->completed_event = NULL;
1878 object->num_pending_callbacks = 0;
1879 object->num_running_callbacks = 0;
1880 object->num_associated_callbacks = 0;
1882 if (environment)
1884 if (environment->Version != 1 && environment->Version != 3)
1885 FIXME( "unsupported environment version %u\n", environment->Version );
1887 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
1888 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
1889 object->finalization_callback = environment->FinalizationCallback;
1890 object->may_run_long = environment->u.s.LongFunction != 0;
1891 object->race_dll = environment->RaceDll;
1892 if (environment->Version == 3)
1894 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1896 object->priority = environment_v3->CallbackPriority;
1897 assert( object->priority < ARRAY_SIZE(pool->pools) );
1900 if (environment->ActivationContext)
1901 FIXME( "activation context not supported yet\n" );
1903 if (environment->u.s.Persistent)
1904 FIXME( "persistent threads not supported yet\n" );
1907 if (object->race_dll)
1908 LdrAddRefDll( 0, object->race_dll );
1910 TRACE( "allocated object %p of type %u\n", object, object->type );
1912 /* For simple callbacks we have to run tp_object_submit before adding this object
1913 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1914 * will be set, and tp_object_submit would fail with an assertion. */
1916 if (is_simple_callback)
1917 tp_object_submit( object, FALSE );
1919 if (object->group)
1921 struct threadpool_group *group = object->group;
1922 InterlockedIncrement( &group->refcount );
1924 RtlEnterCriticalSection( &group->cs );
1925 list_add_tail( &group->members, &object->group_entry );
1926 object->is_group_member = TRUE;
1927 RtlLeaveCriticalSection( &group->cs );
1930 if (is_simple_callback)
1931 tp_object_release( object );
1934 static void tp_object_prio_queue( struct threadpool_object *object )
1936 ++object->pool->num_busy_workers;
1937 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
1940 /***********************************************************************
1941 * tp_object_submit (internal)
1943 * Submits a threadpool object to the associated threadpool. This
1944 * function has to be VOID because TpPostWork can never fail on Windows.
1946 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
1948 struct threadpool *pool = object->pool;
1949 NTSTATUS status = STATUS_UNSUCCESSFUL;
1951 assert( !object->shutdown );
1952 assert( !pool->shutdown );
1954 RtlEnterCriticalSection( &pool->cs );
1956 /* Start new worker threads if required. */
1957 if (pool->num_busy_workers >= pool->num_workers &&
1958 pool->num_workers < pool->max_workers)
1959 status = tp_new_worker_thread( pool );
1961 /* Queue work item and increment refcount. */
1962 InterlockedIncrement( &object->refcount );
1963 if (!object->num_pending_callbacks++)
1964 tp_object_prio_queue( object );
1966 /* Count how often the object was signaled. */
1967 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
1968 object->u.wait.signaled++;
1970 /* No new thread started - wake up one existing thread. */
1971 if (status != STATUS_SUCCESS)
1973 assert( pool->num_workers > 0 );
1974 RtlWakeConditionVariable( &pool->update_event );
1977 RtlLeaveCriticalSection( &pool->cs );
1980 /***********************************************************************
1981 * tp_object_cancel (internal)
1983 * Cancels all currently pending callbacks for a specific object.
1985 static void tp_object_cancel( struct threadpool_object *object )
1987 struct threadpool *pool = object->pool;
1988 LONG pending_callbacks = 0;
1990 RtlEnterCriticalSection( &pool->cs );
1991 if (object->num_pending_callbacks)
1993 pending_callbacks = object->num_pending_callbacks;
1994 object->num_pending_callbacks = 0;
1995 list_remove( &object->pool_entry );
1997 if (object->type == TP_OBJECT_TYPE_WAIT)
1998 object->u.wait.signaled = 0;
2000 if (object->type == TP_OBJECT_TYPE_IO)
2001 object->u.io.pending_count = 0;
2002 RtlLeaveCriticalSection( &pool->cs );
2004 while (pending_callbacks--)
2005 tp_object_release( object );
2008 static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
2010 if (object->num_pending_callbacks)
2011 return FALSE;
2012 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2013 return FALSE;
2015 if (group)
2016 return !object->num_running_callbacks;
2017 else
2018 return !object->num_associated_callbacks;
2021 /***********************************************************************
2022 * tp_object_wait (internal)
2024 * Waits until all pending and running callbacks of a specific object
2025 * have been processed.
2027 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2029 struct threadpool *pool = object->pool;
2031 RtlEnterCriticalSection( &pool->cs );
2032 while (!object_is_finished( object, group_wait ))
2034 if (group_wait)
2035 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2036 else
2037 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2039 RtlLeaveCriticalSection( &pool->cs );
2042 /***********************************************************************
2043 * tp_object_prepare_shutdown (internal)
2045 * Prepares a threadpool object for shutdown.
2047 static void tp_object_prepare_shutdown( struct threadpool_object *object )
2049 if (object->type == TP_OBJECT_TYPE_TIMER)
2050 tp_timerqueue_unlock( object );
2051 else if (object->type == TP_OBJECT_TYPE_WAIT)
2052 tp_waitqueue_unlock( object );
2055 /***********************************************************************
2056 * tp_object_release (internal)
2058 * Releases a reference to a threadpool object.
2060 static BOOL tp_object_release( struct threadpool_object *object )
2062 if (InterlockedDecrement( &object->refcount ))
2063 return FALSE;
2065 TRACE( "destroying object %p of type %u\n", object, object->type );
2067 assert( object->shutdown );
2068 assert( !object->num_pending_callbacks );
2069 assert( !object->num_running_callbacks );
2070 assert( !object->num_associated_callbacks );
2072 /* release reference to the group */
2073 if (object->group)
2075 struct threadpool_group *group = object->group;
2077 RtlEnterCriticalSection( &group->cs );
2078 if (object->is_group_member)
2080 list_remove( &object->group_entry );
2081 object->is_group_member = FALSE;
2083 RtlLeaveCriticalSection( &group->cs );
2085 tp_group_release( group );
2088 tp_threadpool_unlock( object->pool );
2090 if (object->race_dll)
2091 LdrUnloadDll( object->race_dll );
2093 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2094 NtSetEvent( object->completed_event, NULL );
2096 RtlFreeHeap( GetProcessHeap(), 0, object );
2097 return TRUE;
2100 static struct list *threadpool_get_next_item( const struct threadpool *pool )
2102 struct list *ptr;
2103 unsigned int i;
2105 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2107 if ((ptr = list_head( &pool->pools[i] )))
2108 break;
2111 return ptr;
2114 /***********************************************************************
2115 * tp_object_execute (internal)
2117 * Executes a threadpool object callback, object->pool->cs has to be
2118 * held.
2120 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2122 TP_CALLBACK_INSTANCE *callback_instance;
2123 struct threadpool_instance instance;
2124 struct io_completion completion;
2125 struct threadpool *pool = object->pool;
2126 TP_WAIT_RESULT wait_result = 0;
2127 NTSTATUS status;
2129 object->num_pending_callbacks--;
2131 /* For wait objects check if they were signaled or have timed out. */
2132 if (object->type == TP_OBJECT_TYPE_WAIT)
2134 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2135 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2137 else if (object->type == TP_OBJECT_TYPE_IO)
2139 assert( object->u.io.completion_count );
2140 completion = object->u.io.completions[--object->u.io.completion_count];
2141 object->u.io.pending_count--;
2144 /* Leave critical section and do the actual callback. */
2145 object->num_associated_callbacks++;
2146 object->num_running_callbacks++;
2147 RtlLeaveCriticalSection( &pool->cs );
2148 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2150 /* Initialize threadpool instance struct. */
2151 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2152 instance.object = object;
2153 instance.threadid = GetCurrentThreadId();
2154 instance.associated = TRUE;
2155 instance.may_run_long = object->may_run_long;
2156 instance.cleanup.critical_section = NULL;
2157 instance.cleanup.mutex = NULL;
2158 instance.cleanup.semaphore = NULL;
2159 instance.cleanup.semaphore_count = 0;
2160 instance.cleanup.event = NULL;
2161 instance.cleanup.library = NULL;
2163 switch (object->type)
2165 case TP_OBJECT_TYPE_SIMPLE:
2167 TRACE( "executing simple callback %p(%p, %p)\n",
2168 object->u.simple.callback, callback_instance, object->userdata );
2169 object->u.simple.callback( callback_instance, object->userdata );
2170 TRACE( "callback %p returned\n", object->u.simple.callback );
2171 break;
2174 case TP_OBJECT_TYPE_WORK:
2176 TRACE( "executing work callback %p(%p, %p, %p)\n",
2177 object->u.work.callback, callback_instance, object->userdata, object );
2178 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2179 TRACE( "callback %p returned\n", object->u.work.callback );
2180 break;
2183 case TP_OBJECT_TYPE_TIMER:
2185 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2186 object->u.timer.callback, callback_instance, object->userdata, object );
2187 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2188 TRACE( "callback %p returned\n", object->u.timer.callback );
2189 break;
2192 case TP_OBJECT_TYPE_WAIT:
2194 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2195 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2196 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2197 TRACE( "callback %p returned\n", object->u.wait.callback );
2198 break;
2201 case TP_OBJECT_TYPE_IO:
2203 TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
2204 object->u.io.callback, callback_instance, object->userdata,
2205 completion.cvalue, &completion.iosb, (TP_IO *)object );
2206 object->u.io.callback( callback_instance, object->userdata,
2207 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2208 TRACE( "callback %p returned\n", object->u.io.callback );
2209 break;
2212 default:
2213 assert(0);
2214 break;
2217 /* Execute finalization callback. */
2218 if (object->finalization_callback)
2220 TRACE( "executing finalization callback %p(%p, %p)\n",
2221 object->finalization_callback, callback_instance, object->userdata );
2222 object->finalization_callback( callback_instance, object->userdata );
2223 TRACE( "callback %p returned\n", object->finalization_callback );
2226 /* Execute cleanup tasks. */
2227 if (instance.cleanup.critical_section)
2229 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2231 if (instance.cleanup.mutex)
2233 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2234 if (status != STATUS_SUCCESS) goto skip_cleanup;
2236 if (instance.cleanup.semaphore)
2238 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2239 if (status != STATUS_SUCCESS) goto skip_cleanup;
2241 if (instance.cleanup.event)
2243 status = NtSetEvent( instance.cleanup.event, NULL );
2244 if (status != STATUS_SUCCESS) goto skip_cleanup;
2246 if (instance.cleanup.library)
2248 LdrUnloadDll( instance.cleanup.library );
2251 skip_cleanup:
2252 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2253 RtlEnterCriticalSection( &pool->cs );
2255 /* Simple callbacks are automatically shutdown after execution. */
2256 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2258 tp_object_prepare_shutdown( object );
2259 object->shutdown = TRUE;
2262 object->num_running_callbacks--;
2263 if (object_is_finished( object, TRUE ))
2264 RtlWakeAllConditionVariable( &object->group_finished_event );
2266 if (instance.associated)
2268 object->num_associated_callbacks--;
2269 if (object_is_finished( object, FALSE ))
2270 RtlWakeAllConditionVariable( &object->finished_event );
2274 /***********************************************************************
2275 * threadpool_worker_proc (internal)
2277 static void CALLBACK threadpool_worker_proc( void *param )
2279 struct threadpool *pool = param;
2280 LARGE_INTEGER timeout;
2281 struct list *ptr;
2283 TRACE( "starting worker thread for pool %p\n", pool );
2285 RtlEnterCriticalSection( &pool->cs );
2286 for (;;)
2288 while ((ptr = threadpool_get_next_item( pool )))
2290 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2291 assert( object->num_pending_callbacks > 0 );
2293 /* If further pending callbacks are queued, move the work item to
2294 * the end of the pool list. Otherwise remove it from the pool. */
2295 list_remove( &object->pool_entry );
2296 if (object->num_pending_callbacks > 1)
2297 tp_object_prio_queue( object );
2299 tp_object_execute( object, FALSE );
2301 assert(pool->num_busy_workers);
2302 pool->num_busy_workers--;
2304 tp_object_release( object );
2307 /* Shutdown worker thread if requested. */
2308 if (pool->shutdown)
2309 break;
2311 /* Wait for new tasks or until the timeout expires. A thread only terminates
2312 * when no new tasks are available, and the number of threads can be
2313 * decreased without violating the min_workers limit. An exception is when
2314 * min_workers == 0, then objcount is used to detect if the last thread
2315 * can be terminated. */
2316 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2317 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2318 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2319 (!pool->min_workers && !pool->objcount)))
2321 break;
2324 pool->num_workers--;
2325 RtlLeaveCriticalSection( &pool->cs );
2327 TRACE( "terminating worker thread for pool %p\n", pool );
2328 tp_threadpool_release( pool );
2329 RtlExitUserThread( 0 );
2332 /***********************************************************************
2333 * TpAllocCleanupGroup (NTDLL.@)
2335 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2337 TRACE( "%p\n", out );
2339 return tp_group_alloc( (struct threadpool_group **)out );
2342 /***********************************************************************
2343 * TpAllocIoCompletion (NTDLL.@)
2345 NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
2346 void *userdata, TP_CALLBACK_ENVIRON *environment )
2348 struct threadpool_object *object;
2349 struct threadpool *pool;
2350 NTSTATUS status;
2352 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2354 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2355 return STATUS_NO_MEMORY;
2357 if ((status = tp_threadpool_lock( &pool, environment )))
2359 RtlFreeHeap( GetProcessHeap(), 0, object );
2360 return status;
2363 object->type = TP_OBJECT_TYPE_IO;
2364 object->u.io.callback = callback;
2365 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2367 tp_threadpool_unlock( pool );
2368 RtlFreeHeap( GetProcessHeap(), 0, object );
2369 return status;
2372 if ((status = tp_ioqueue_lock( object, file )))
2374 tp_threadpool_unlock( pool );
2375 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2376 RtlFreeHeap( GetProcessHeap(), 0, object );
2377 return status;
2380 tp_object_initialize( object, pool, userdata, environment );
2382 *out = (TP_IO *)object;
2383 return STATUS_SUCCESS;
2386 /***********************************************************************
2387 * TpAllocPool (NTDLL.@)
2389 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2391 TRACE( "%p %p\n", out, reserved );
2393 if (reserved)
2394 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2396 return tp_threadpool_alloc( (struct threadpool **)out );
2399 /***********************************************************************
2400 * TpAllocTimer (NTDLL.@)
2402 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2403 TP_CALLBACK_ENVIRON *environment )
2405 struct threadpool_object *object;
2406 struct threadpool *pool;
2407 NTSTATUS status;
2409 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2411 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2412 if (!object)
2413 return STATUS_NO_MEMORY;
2415 status = tp_threadpool_lock( &pool, environment );
2416 if (status)
2418 RtlFreeHeap( GetProcessHeap(), 0, object );
2419 return status;
2422 object->type = TP_OBJECT_TYPE_TIMER;
2423 object->u.timer.callback = callback;
2425 status = tp_timerqueue_lock( object );
2426 if (status)
2428 tp_threadpool_unlock( pool );
2429 RtlFreeHeap( GetProcessHeap(), 0, object );
2430 return status;
2433 tp_object_initialize( object, pool, userdata, environment );
2435 *out = (TP_TIMER *)object;
2436 return STATUS_SUCCESS;
2439 static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2440 TP_CALLBACK_ENVIRON *environment, DWORD flags )
2442 struct threadpool_object *object;
2443 struct threadpool *pool;
2444 NTSTATUS status;
2446 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2447 if (!object)
2448 return STATUS_NO_MEMORY;
2450 status = tp_threadpool_lock( &pool, environment );
2451 if (status)
2453 RtlFreeHeap( GetProcessHeap(), 0, object );
2454 return status;
2457 object->type = TP_OBJECT_TYPE_WAIT;
2458 object->u.wait.callback = callback;
2459 object->u.wait.flags = flags;
2461 status = tp_waitqueue_lock( object );
2462 if (status)
2464 tp_threadpool_unlock( pool );
2465 RtlFreeHeap( GetProcessHeap(), 0, object );
2466 return status;
2469 tp_object_initialize( object, pool, userdata, environment );
2471 *out = (TP_WAIT *)object;
2472 return STATUS_SUCCESS;
2475 /***********************************************************************
2476 * TpAllocWait (NTDLL.@)
2478 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2479 TP_CALLBACK_ENVIRON *environment )
2481 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2482 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2485 /***********************************************************************
2486 * TpAllocWork (NTDLL.@)
2488 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2489 TP_CALLBACK_ENVIRON *environment )
2491 struct threadpool_object *object;
2492 struct threadpool *pool;
2493 NTSTATUS status;
2495 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2497 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2498 if (!object)
2499 return STATUS_NO_MEMORY;
2501 status = tp_threadpool_lock( &pool, environment );
2502 if (status)
2504 RtlFreeHeap( GetProcessHeap(), 0, object );
2505 return status;
2508 object->type = TP_OBJECT_TYPE_WORK;
2509 object->u.work.callback = callback;
2510 tp_object_initialize( object, pool, userdata, environment );
2512 *out = (TP_WORK *)object;
2513 return STATUS_SUCCESS;
2516 /***********************************************************************
2517 * TpCancelAsyncIoOperation (NTDLL.@)
2519 void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
2521 struct threadpool_object *this = impl_from_TP_IO( io );
2523 TRACE( "%p\n", io );
2525 RtlEnterCriticalSection( &this->pool->cs );
2527 this->u.io.pending_count--;
2528 if (object_is_finished( this, TRUE ))
2529 RtlWakeAllConditionVariable( &this->group_finished_event );
2530 if (object_is_finished( this, FALSE ))
2531 RtlWakeAllConditionVariable( &this->finished_event );
2533 RtlLeaveCriticalSection( &this->pool->cs );
2536 /***********************************************************************
2537 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2539 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2541 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2543 TRACE( "%p %p\n", instance, crit );
2545 if (!this->cleanup.critical_section)
2546 this->cleanup.critical_section = crit;
2549 /***********************************************************************
2550 * TpCallbackMayRunLong (NTDLL.@)
2552 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2554 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2555 struct threadpool_object *object = this->object;
2556 struct threadpool *pool;
2557 NTSTATUS status = STATUS_SUCCESS;
2559 TRACE( "%p\n", instance );
2561 if (this->threadid != GetCurrentThreadId())
2563 ERR("called from wrong thread, ignoring\n");
2564 return STATUS_UNSUCCESSFUL; /* FIXME */
2567 if (this->may_run_long)
2568 return STATUS_SUCCESS;
2570 pool = object->pool;
2571 RtlEnterCriticalSection( &pool->cs );
2573 /* Start new worker threads if required. */
2574 if (pool->num_busy_workers >= pool->num_workers)
2576 if (pool->num_workers < pool->max_workers)
2578 status = tp_new_worker_thread( pool );
2580 else
2582 status = STATUS_TOO_MANY_THREADS;
2586 RtlLeaveCriticalSection( &pool->cs );
2587 this->may_run_long = TRUE;
2588 return status;
2591 /***********************************************************************
2592 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2594 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2596 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2598 TRACE( "%p %p\n", instance, mutex );
2600 if (!this->cleanup.mutex)
2601 this->cleanup.mutex = mutex;
2604 /***********************************************************************
2605 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2607 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2609 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2611 TRACE( "%p %p %u\n", instance, semaphore, count );
2613 if (!this->cleanup.semaphore)
2615 this->cleanup.semaphore = semaphore;
2616 this->cleanup.semaphore_count = count;
2620 /***********************************************************************
2621 * TpCallbackSetEventOnCompletion (NTDLL.@)
2623 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2625 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2627 TRACE( "%p %p\n", instance, event );
2629 if (!this->cleanup.event)
2630 this->cleanup.event = event;
2633 /***********************************************************************
2634 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2636 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2638 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2640 TRACE( "%p %p\n", instance, module );
2642 if (!this->cleanup.library)
2643 this->cleanup.library = module;
2646 /***********************************************************************
2647 * TpDisassociateCallback (NTDLL.@)
2649 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2651 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2652 struct threadpool_object *object = this->object;
2653 struct threadpool *pool;
2655 TRACE( "%p\n", instance );
2657 if (this->threadid != GetCurrentThreadId())
2659 ERR("called from wrong thread, ignoring\n");
2660 return;
2663 if (!this->associated)
2664 return;
2666 pool = object->pool;
2667 RtlEnterCriticalSection( &pool->cs );
2669 object->num_associated_callbacks--;
2670 if (object_is_finished( object, FALSE ))
2671 RtlWakeAllConditionVariable( &object->finished_event );
2673 RtlLeaveCriticalSection( &pool->cs );
2674 this->associated = FALSE;
2677 /***********************************************************************
2678 * TpIsTimerSet (NTDLL.@)
2680 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2682 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2684 TRACE( "%p\n", timer );
2686 return this->u.timer.timer_set;
2689 /***********************************************************************
2690 * TpPostWork (NTDLL.@)
2692 VOID WINAPI TpPostWork( TP_WORK *work )
2694 struct threadpool_object *this = impl_from_TP_WORK( work );
2696 TRACE( "%p\n", work );
2698 tp_object_submit( this, FALSE );
2701 /***********************************************************************
2702 * TpReleaseCleanupGroup (NTDLL.@)
2704 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2706 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2708 TRACE( "%p\n", group );
2710 tp_group_shutdown( this );
2711 tp_group_release( this );
2714 /***********************************************************************
2715 * TpReleaseCleanupGroupMembers (NTDLL.@)
2717 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2719 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2720 struct threadpool_object *object, *next;
2721 struct list members;
2723 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2725 RtlEnterCriticalSection( &this->cs );
2727 /* Unset group, increase references, and mark objects for shutdown */
2728 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2730 assert( object->group == this );
2731 assert( object->is_group_member );
2733 if (InterlockedIncrement( &object->refcount ) == 1)
2735 /* Object is basically already destroyed, but group reference
2736 * was not deleted yet. We can safely ignore this object. */
2737 InterlockedDecrement( &object->refcount );
2738 list_remove( &object->group_entry );
2739 object->is_group_member = FALSE;
2740 continue;
2743 object->is_group_member = FALSE;
2744 tp_object_prepare_shutdown( object );
2747 /* Move members to a new temporary list */
2748 list_init( &members );
2749 list_move_tail( &members, &this->members );
2751 RtlLeaveCriticalSection( &this->cs );
2753 /* Cancel pending callbacks if requested */
2754 if (cancel_pending)
2756 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2758 tp_object_cancel( object );
2762 /* Wait for remaining callbacks to finish */
2763 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2765 tp_object_wait( object, TRUE );
2767 if (!object->shutdown)
2769 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2770 if (cancel_pending && object->group_cancel_callback)
2772 TRACE( "executing group cancel callback %p(%p, %p)\n",
2773 object->group_cancel_callback, object->userdata, userdata );
2774 object->group_cancel_callback( object->userdata, userdata );
2775 TRACE( "callback %p returned\n", object->group_cancel_callback );
2778 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2779 tp_object_release( object );
2782 object->shutdown = TRUE;
2783 tp_object_release( object );
2787 /***********************************************************************
2788 * TpReleaseIoCompletion (NTDLL.@)
2790 void WINAPI TpReleaseIoCompletion( TP_IO *io )
2792 struct threadpool_object *this = impl_from_TP_IO( io );
2794 TRACE( "%p\n", io );
2796 RtlEnterCriticalSection( &ioqueue.cs );
2798 assert( ioqueue.objcount );
2799 this->shutdown = TRUE;
2800 NtSetIoCompletion( ioqueue.port, (ULONG_PTR)this, 0, STATUS_THREADPOOL_RELEASED_DURING_OPERATION, 1 );
2801 RtlLeaveCriticalSection( &ioqueue.cs );
2804 /***********************************************************************
2805 * TpReleasePool (NTDLL.@)
2807 VOID WINAPI TpReleasePool( TP_POOL *pool )
2809 struct threadpool *this = impl_from_TP_POOL( pool );
2811 TRACE( "%p\n", pool );
2813 tp_threadpool_shutdown( this );
2814 tp_threadpool_release( this );
2817 /***********************************************************************
2818 * TpReleaseTimer (NTDLL.@)
2820 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2822 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2824 TRACE( "%p\n", timer );
2826 tp_object_prepare_shutdown( this );
2827 this->shutdown = TRUE;
2828 tp_object_release( this );
2831 /***********************************************************************
2832 * TpReleaseWait (NTDLL.@)
2834 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
2836 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2838 TRACE( "%p\n", wait );
2840 tp_object_prepare_shutdown( this );
2841 this->shutdown = TRUE;
2842 tp_object_release( this );
2845 /***********************************************************************
2846 * TpReleaseWork (NTDLL.@)
2848 VOID WINAPI TpReleaseWork( TP_WORK *work )
2850 struct threadpool_object *this = impl_from_TP_WORK( work );
2852 TRACE( "%p\n", work );
2854 tp_object_prepare_shutdown( this );
2855 this->shutdown = TRUE;
2856 tp_object_release( this );
2859 /***********************************************************************
2860 * TpSetPoolMaxThreads (NTDLL.@)
2862 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
2864 struct threadpool *this = impl_from_TP_POOL( pool );
2866 TRACE( "%p %u\n", pool, maximum );
2868 RtlEnterCriticalSection( &this->cs );
2869 this->max_workers = max( maximum, 1 );
2870 this->min_workers = min( this->min_workers, this->max_workers );
2871 RtlLeaveCriticalSection( &this->cs );
2874 /***********************************************************************
2875 * TpSetPoolMinThreads (NTDLL.@)
2877 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
2879 struct threadpool *this = impl_from_TP_POOL( pool );
2880 NTSTATUS status = STATUS_SUCCESS;
2882 TRACE( "%p %u\n", pool, minimum );
2884 RtlEnterCriticalSection( &this->cs );
2886 while (this->num_workers < minimum)
2888 status = tp_new_worker_thread( this );
2889 if (status != STATUS_SUCCESS)
2890 break;
2893 if (status == STATUS_SUCCESS)
2895 this->min_workers = minimum;
2896 this->max_workers = max( this->min_workers, this->max_workers );
2899 RtlLeaveCriticalSection( &this->cs );
2900 return !status;
2903 /***********************************************************************
2904 * TpSetTimer (NTDLL.@)
2906 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
2908 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2909 struct threadpool_object *other_timer;
2910 BOOL submit_timer = FALSE;
2911 ULONGLONG timestamp;
2913 TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
2915 RtlEnterCriticalSection( &timerqueue.cs );
2917 assert( this->u.timer.timer_initialized );
2918 this->u.timer.timer_set = timeout != NULL;
2920 /* Convert relative timeout to absolute timestamp and handle a timeout
2921 * of zero, which means that the timer is submitted immediately. */
2922 if (timeout)
2924 timestamp = timeout->QuadPart;
2925 if ((LONGLONG)timestamp < 0)
2927 LARGE_INTEGER now;
2928 NtQuerySystemTime( &now );
2929 timestamp = now.QuadPart - timestamp;
2931 else if (!timestamp)
2933 if (!period)
2934 timeout = NULL;
2935 else
2937 LARGE_INTEGER now;
2938 NtQuerySystemTime( &now );
2939 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
2941 submit_timer = TRUE;
2945 /* First remove existing timeout. */
2946 if (this->u.timer.timer_pending)
2948 list_remove( &this->u.timer.timer_entry );
2949 this->u.timer.timer_pending = FALSE;
2952 /* If the timer was enabled, then add it back to the queue. */
2953 if (timeout)
2955 this->u.timer.timeout = timestamp;
2956 this->u.timer.period = period;
2957 this->u.timer.window_length = window_length;
2959 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
2960 struct threadpool_object, u.timer.timer_entry )
2962 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
2963 if (this->u.timer.timeout < other_timer->u.timer.timeout)
2964 break;
2966 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
2968 /* Wake up the timer thread when the timeout has to be updated. */
2969 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
2970 RtlWakeAllConditionVariable( &timerqueue.update_event );
2972 this->u.timer.timer_pending = TRUE;
2975 RtlLeaveCriticalSection( &timerqueue.cs );
2977 if (submit_timer)
2978 tp_object_submit( this, FALSE );
2981 /***********************************************************************
2982 * TpSetWait (NTDLL.@)
2984 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
2986 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2987 ULONGLONG timestamp = MAXLONGLONG;
2989 TRACE( "%p %p %p\n", wait, handle, timeout );
2991 RtlEnterCriticalSection( &waitqueue.cs );
2993 assert( this->u.wait.bucket );
2994 this->u.wait.handle = handle;
2996 if (handle || this->u.wait.wait_pending)
2998 struct waitqueue_bucket *bucket = this->u.wait.bucket;
2999 list_remove( &this->u.wait.wait_entry );
3001 /* Convert relative timeout to absolute timestamp. */
3002 if (handle && timeout)
3004 timestamp = timeout->QuadPart;
3005 if ((LONGLONG)timestamp < 0)
3007 LARGE_INTEGER now;
3008 NtQuerySystemTime( &now );
3009 timestamp = now.QuadPart - timestamp;
3013 /* Add wait object back into one of the queues. */
3014 if (handle)
3016 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3017 this->u.wait.wait_pending = TRUE;
3018 this->u.wait.timeout = timestamp;
3020 else
3022 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3023 this->u.wait.wait_pending = FALSE;
3026 /* Wake up the wait queue thread. */
3027 NtSetEvent( bucket->update_event, NULL );
3030 RtlLeaveCriticalSection( &waitqueue.cs );
3033 /***********************************************************************
3034 * TpSimpleTryPost (NTDLL.@)
3036 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
3037 TP_CALLBACK_ENVIRON *environment )
3039 struct threadpool_object *object;
3040 struct threadpool *pool;
3041 NTSTATUS status;
3043 TRACE( "%p %p %p\n", callback, userdata, environment );
3045 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3046 if (!object)
3047 return STATUS_NO_MEMORY;
3049 status = tp_threadpool_lock( &pool, environment );
3050 if (status)
3052 RtlFreeHeap( GetProcessHeap(), 0, object );
3053 return status;
3056 object->type = TP_OBJECT_TYPE_SIMPLE;
3057 object->u.simple.callback = callback;
3058 tp_object_initialize( object, pool, userdata, environment );
3060 return STATUS_SUCCESS;
3063 /***********************************************************************
3064 * TpStartAsyncIoOperation (NTDLL.@)
3066 void WINAPI TpStartAsyncIoOperation( TP_IO *io )
3068 struct threadpool_object *this = impl_from_TP_IO( io );
3070 TRACE( "%p\n", io );
3072 RtlEnterCriticalSection( &this->pool->cs );
3074 this->u.io.pending_count++;
3076 RtlLeaveCriticalSection( &this->pool->cs );
3079 /***********************************************************************
3080 * TpWaitForIoCompletion (NTDLL.@)
3082 void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3084 struct threadpool_object *this = impl_from_TP_IO( io );
3086 TRACE( "%p %d\n", io, cancel_pending );
3088 if (cancel_pending)
3089 tp_object_cancel( this );
3090 tp_object_wait( this, FALSE );
3093 /***********************************************************************
3094 * TpWaitForTimer (NTDLL.@)
3096 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
3098 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3100 TRACE( "%p %d\n", timer, cancel_pending );
3102 if (cancel_pending)
3103 tp_object_cancel( this );
3104 tp_object_wait( this, FALSE );
3107 /***********************************************************************
3108 * TpWaitForWait (NTDLL.@)
3110 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
3112 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3114 TRACE( "%p %d\n", wait, cancel_pending );
3116 if (cancel_pending)
3117 tp_object_cancel( this );
3118 tp_object_wait( this, FALSE );
3121 /***********************************************************************
3122 * TpWaitForWork (NTDLL.@)
3124 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
3126 struct threadpool_object *this = impl_from_TP_WORK( work );
3128 TRACE( "%p %u\n", work, cancel_pending );
3130 if (cancel_pending)
3131 tp_object_cancel( this );
3132 tp_object_wait( this, FALSE );
3135 /***********************************************************************
3136 * TpSetPoolStackInformation (NTDLL.@)
3138 NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3140 struct threadpool *this = impl_from_TP_POOL( pool );
3142 TRACE( "%p %p\n", pool, stack_info );
3144 if (!stack_info)
3145 return STATUS_INVALID_PARAMETER;
3147 RtlEnterCriticalSection( &this->cs );
3148 this->stack_info = *stack_info;
3149 RtlLeaveCriticalSection( &this->cs );
3151 return STATUS_SUCCESS;
3154 /***********************************************************************
3155 * TpQueryPoolStackInformation (NTDLL.@)
3157 NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3159 struct threadpool *this = impl_from_TP_POOL( pool );
3161 TRACE( "%p %p\n", pool, stack_info );
3163 if (!stack_info)
3164 return STATUS_INVALID_PARAMETER;
3166 RtlEnterCriticalSection( &this->cs );
3167 *stack_info = this->stack_info;
3168 RtlLeaveCriticalSection( &this->cs );
3170 return STATUS_SUCCESS;
3173 static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result )
3175 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3176 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3179 /***********************************************************************
3180 * RtlRegisterWait (NTDLL.@)
3182 * Registers a wait for a handle to become signaled.
3184 * PARAMS
3185 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3186 * Object [I] Object to wait to become signaled.
3187 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3188 * Context [I] Context to pass to the callback function when it is executed.
3189 * Milliseconds [I] Number of milliseconds to wait before timing out.
3190 * Flags [I] Flags. See notes.
3192 * RETURNS
3193 * Success: STATUS_SUCCESS.
3194 * Failure: Any NTSTATUS code.
3196 * NOTES
3197 * Flags can be one or more of the following:
3198 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3199 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3200 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3201 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3202 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3204 NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback,
3205 void *context, ULONG milliseconds, ULONG flags )
3207 struct threadpool_object *object;
3208 TP_CALLBACK_ENVIRON environment;
3209 LARGE_INTEGER timeout;
3210 NTSTATUS status;
3211 TP_WAIT *wait;
3213 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %u, flags %x\n",
3214 out, handle, callback, context, milliseconds, flags );
3216 memset( &environment, 0, sizeof(environment) );
3217 environment.Version = 1;
3218 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3219 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3221 flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD);
3222 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3223 return status;
3225 object = impl_from_TP_WAIT(wait);
3226 object->u.wait.rtl_callback = callback;
3228 RtlEnterCriticalSection( &waitqueue.cs );
3229 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3231 *out = object;
3232 RtlLeaveCriticalSection( &waitqueue.cs );
3234 return STATUS_SUCCESS;
3237 /***********************************************************************
3238 * RtlDeregisterWaitEx (NTDLL.@)
3240 * Cancels a wait operation and frees the resources associated with calling
3241 * RtlRegisterWait().
3243 * PARAMS
3244 * WaitObject [I] Handle to the wait object to free.
3246 * RETURNS
3247 * Success: STATUS_SUCCESS.
3248 * Failure: Any NTSTATUS code.
3250 NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event )
3252 struct threadpool_object *object = handle;
3253 NTSTATUS status;
3255 TRACE( "handle %p, event %p\n", handle, event );
3257 if (!object) return STATUS_INVALID_HANDLE;
3259 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3261 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3262 else
3264 assert( object->completed_event == NULL );
3265 object->completed_event = event;
3268 RtlEnterCriticalSection( &object->pool->cs );
3269 if (object->num_pending_callbacks + object->num_running_callbacks
3270 + object->num_associated_callbacks) status = STATUS_PENDING;
3271 else status = STATUS_SUCCESS;
3272 RtlLeaveCriticalSection( &object->pool->cs );
3274 TpReleaseWait( (TP_WAIT *)object );
3275 return status;
3278 /***********************************************************************
3279 * RtlDeregisterWait (NTDLL.@)
3281 * Cancels a wait operation and frees the resources associated with calling
3282 * RtlRegisterWait().
3284 * PARAMS
3285 * WaitObject [I] Handle to the wait object to free.
3287 * RETURNS
3288 * Success: STATUS_SUCCESS.
3289 * Failure: Any NTSTATUS code.
3291 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
3293 return RtlDeregisterWaitEx(WaitHandle, NULL);