kernel32/tests/pipe: Enable compilation with long types.
[wine.git] / dlls / ntdll / threadpool.c
blob827232b21435c792b2646317b2abdd3cd6a03514
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include <assert.h>
23 #include <stdarg.h>
24 #include <limits.h>
26 #define NONAMELESSUNION
27 #include "ntstatus.h"
28 #define WIN32_NO_STATUS
29 #include "winternl.h"
31 #include "wine/debug.h"
32 #include "wine/list.h"
34 #include "ntdll_misc.h"
36 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
39 * Old thread pooling API
42 struct rtl_work_item
44 PRTL_WORK_ITEM_ROUTINE function;
45 PVOID context;
48 #define EXPIRE_NEVER (~(ULONGLONG)0)
49 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
51 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
53 static struct
55 HANDLE compl_port;
56 RTL_CRITICAL_SECTION threadpool_compl_cs;
58 old_threadpool =
60 NULL, /* compl_port */
61 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
64 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
66 0, 0, &old_threadpool.threadpool_compl_cs,
67 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
68 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
71 struct timer_queue;
72 struct queue_timer
74 struct timer_queue *q;
75 struct list entry;
76 ULONG runcount; /* number of callbacks pending execution */
77 RTL_WAITORTIMERCALLBACKFUNC callback;
78 PVOID param;
79 DWORD period;
80 ULONG flags;
81 ULONGLONG expire;
82 BOOL destroy; /* timer should be deleted; once set, never unset */
83 HANDLE event; /* removal event */
86 struct timer_queue
88 DWORD magic;
89 RTL_CRITICAL_SECTION cs;
90 struct list timers; /* sorted by expiration time */
91 BOOL quit; /* queue should be deleted; once set, never unset */
92 HANDLE event;
93 HANDLE thread;
97 * Object-oriented thread pooling API
100 #define THREADPOOL_WORKER_TIMEOUT 5000
101 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
103 /* internal threadpool representation */
104 struct threadpool
106 LONG refcount;
107 LONG objcount;
108 BOOL shutdown;
109 CRITICAL_SECTION cs;
110 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
111 struct list pools[3];
112 RTL_CONDITION_VARIABLE update_event;
113 /* information about worker threads, locked via .cs */
114 int max_workers;
115 int min_workers;
116 int num_workers;
117 int num_busy_workers;
118 HANDLE compl_port;
119 TP_POOL_STACK_INFORMATION stack_info;
122 enum threadpool_objtype
124 TP_OBJECT_TYPE_SIMPLE,
125 TP_OBJECT_TYPE_WORK,
126 TP_OBJECT_TYPE_TIMER,
127 TP_OBJECT_TYPE_WAIT,
128 TP_OBJECT_TYPE_IO,
131 struct io_completion
133 IO_STATUS_BLOCK iosb;
134 ULONG_PTR cvalue;
137 /* internal threadpool object representation */
138 struct threadpool_object
140 void *win32_callback; /* leave space for kernelbase to store win32 callback */
141 LONG refcount;
142 BOOL shutdown;
143 /* read-only information */
144 enum threadpool_objtype type;
145 struct threadpool *pool;
146 struct threadpool_group *group;
147 PVOID userdata;
148 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
149 PTP_SIMPLE_CALLBACK finalization_callback;
150 BOOL may_run_long;
151 HMODULE race_dll;
152 TP_CALLBACK_PRIORITY priority;
153 /* information about the group, locked via .group->cs */
154 struct list group_entry;
155 BOOL is_group_member;
156 /* information about the pool, locked via .pool->cs */
157 struct list pool_entry;
158 RTL_CONDITION_VARIABLE finished_event;
159 RTL_CONDITION_VARIABLE group_finished_event;
160 HANDLE completed_event;
161 LONG num_pending_callbacks;
162 LONG num_running_callbacks;
163 LONG num_associated_callbacks;
164 /* arguments for callback */
165 union
167 struct
169 PTP_SIMPLE_CALLBACK callback;
170 } simple;
171 struct
173 PTP_WORK_CALLBACK callback;
174 } work;
175 struct
177 PTP_TIMER_CALLBACK callback;
178 /* information about the timer, locked via timerqueue.cs */
179 BOOL timer_initialized;
180 BOOL timer_pending;
181 struct list timer_entry;
182 BOOL timer_set;
183 ULONGLONG timeout;
184 LONG period;
185 LONG window_length;
186 } timer;
187 struct
189 PTP_WAIT_CALLBACK callback;
190 LONG signaled;
191 /* information about the wait object, locked via waitqueue.cs */
192 struct waitqueue_bucket *bucket;
193 BOOL wait_pending;
194 struct list wait_entry;
195 ULONGLONG timeout;
196 HANDLE handle;
197 DWORD flags;
198 RTL_WAITORTIMERCALLBACKFUNC rtl_callback;
199 } wait;
200 struct
202 PTP_IO_CALLBACK callback;
203 /* locked via .pool->cs */
204 unsigned int pending_count, skipped_count, completion_count, completion_max;
205 BOOL shutting_down;
206 struct io_completion *completions;
207 } io;
208 } u;
211 /* internal threadpool instance representation */
212 struct threadpool_instance
214 struct threadpool_object *object;
215 DWORD threadid;
216 BOOL associated;
217 BOOL may_run_long;
218 struct
220 CRITICAL_SECTION *critical_section;
221 HANDLE mutex;
222 HANDLE semaphore;
223 LONG semaphore_count;
224 HANDLE event;
225 HMODULE library;
226 } cleanup;
229 /* internal threadpool group representation */
230 struct threadpool_group
232 LONG refcount;
233 BOOL shutdown;
234 CRITICAL_SECTION cs;
235 /* list of group members, locked via .cs */
236 struct list members;
239 /* global timerqueue object */
240 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
242 static struct
244 CRITICAL_SECTION cs;
245 LONG objcount;
246 BOOL thread_running;
247 struct list pending_timers;
248 RTL_CONDITION_VARIABLE update_event;
250 timerqueue =
252 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
253 0, /* objcount */
254 FALSE, /* thread_running */
255 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
256 RTL_CONDITION_VARIABLE_INIT /* update_event */
259 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
261 0, 0, &timerqueue.cs,
262 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
263 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
266 /* global waitqueue object */
267 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
269 static struct
271 CRITICAL_SECTION cs;
272 LONG num_buckets;
273 struct list buckets;
275 waitqueue =
277 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
278 0, /* num_buckets */
279 LIST_INIT( waitqueue.buckets ) /* buckets */
282 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
284 0, 0, &waitqueue.cs,
285 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
286 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
289 struct waitqueue_bucket
291 struct list bucket_entry;
292 LONG objcount;
293 struct list reserved;
294 struct list waiting;
295 HANDLE update_event;
296 BOOL alertable;
299 /* global I/O completion queue object */
300 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
302 static struct
304 CRITICAL_SECTION cs;
305 LONG objcount;
306 BOOL thread_running;
307 HANDLE port;
308 RTL_CONDITION_VARIABLE update_event;
310 ioqueue =
312 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
315 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
317 0, 0, &ioqueue.cs,
318 { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
319 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
322 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
324 return (struct threadpool *)pool;
327 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
329 struct threadpool_object *object = (struct threadpool_object *)work;
330 assert( object->type == TP_OBJECT_TYPE_WORK );
331 return object;
334 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
336 struct threadpool_object *object = (struct threadpool_object *)timer;
337 assert( object->type == TP_OBJECT_TYPE_TIMER );
338 return object;
341 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
343 struct threadpool_object *object = (struct threadpool_object *)wait;
344 assert( object->type == TP_OBJECT_TYPE_WAIT );
345 return object;
348 static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
350 struct threadpool_object *object = (struct threadpool_object *)io;
351 assert( object->type == TP_OBJECT_TYPE_IO );
352 return object;
355 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
357 return (struct threadpool_group *)group;
360 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
362 return (struct threadpool_instance *)instance;
365 static void CALLBACK threadpool_worker_proc( void *param );
366 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
367 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
368 static void tp_object_prepare_shutdown( struct threadpool_object *object );
369 static BOOL tp_object_release( struct threadpool_object *object );
370 static struct threadpool *default_threadpool = NULL;
372 static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
374 unsigned int new_capacity, max_capacity;
375 void *new_elements;
377 if (count <= *capacity)
378 return TRUE;
380 max_capacity = ~(SIZE_T)0 / size;
381 if (count > max_capacity)
382 return FALSE;
384 new_capacity = max(4, *capacity);
385 while (new_capacity < count && new_capacity <= max_capacity / 2)
386 new_capacity *= 2;
387 if (new_capacity < count)
388 new_capacity = max_capacity;
390 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
391 return FALSE;
393 *elements = new_elements;
394 *capacity = new_capacity;
396 return TRUE;
399 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
401 struct rtl_work_item *item = userdata;
403 TRACE("executing %p(%p)\n", item->function, item->context);
404 item->function( item->context );
406 RtlFreeHeap( GetProcessHeap(), 0, item );
409 /***********************************************************************
410 * RtlQueueWorkItem (NTDLL.@)
412 * Queues a work item into a thread in the thread pool.
414 * PARAMS
415 * function [I] Work function to execute.
416 * context [I] Context to pass to the work function when it is executed.
417 * flags [I] Flags. See notes.
419 * RETURNS
420 * Success: STATUS_SUCCESS.
421 * Failure: Any NTSTATUS code.
423 * NOTES
424 * Flags can be one or more of the following:
425 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
426 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
427 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
428 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
429 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
431 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
433 TP_CALLBACK_ENVIRON environment;
434 struct rtl_work_item *item;
435 NTSTATUS status;
437 TRACE( "%p %p %u\n", function, context, flags );
439 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
440 if (!item)
441 return STATUS_NO_MEMORY;
443 memset( &environment, 0, sizeof(environment) );
444 environment.Version = 1;
445 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
446 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
448 item->function = function;
449 item->context = context;
451 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
452 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
453 return status;
456 /***********************************************************************
457 * iocp_poller - get completion events and run callbacks
459 static DWORD CALLBACK iocp_poller(LPVOID Arg)
461 HANDLE cport = Arg;
463 while( TRUE )
465 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
466 LPVOID overlapped;
467 IO_STATUS_BLOCK iosb;
468 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
469 if (res)
471 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
473 else
475 DWORD transferred = 0;
476 DWORD err = 0;
478 if (iosb.u.Status == STATUS_SUCCESS)
479 transferred = iosb.Information;
480 else
481 err = RtlNtStatusToDosError(iosb.u.Status);
483 callback( err, transferred, overlapped );
486 return 0;
489 /***********************************************************************
490 * RtlSetIoCompletionCallback (NTDLL.@)
492 * Binds a handle to a thread pool's completion port, and possibly
493 * starts a non-I/O thread to monitor this port and call functions back.
495 * PARAMS
496 * FileHandle [I] Handle to bind to a completion port.
497 * Function [I] Callback function to call on I/O completions.
498 * Flags [I] Not used.
500 * RETURNS
501 * Success: STATUS_SUCCESS.
502 * Failure: Any NTSTATUS code.
505 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
507 IO_STATUS_BLOCK iosb;
508 FILE_COMPLETION_INFORMATION info;
510 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
512 if (!old_threadpool.compl_port)
514 NTSTATUS res = STATUS_SUCCESS;
516 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
517 if (!old_threadpool.compl_port)
519 HANDLE cport;
521 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
522 if (!res)
524 /* FIXME native can start additional threads in case of e.g. hung callback function. */
525 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
526 if (!res)
527 old_threadpool.compl_port = cport;
528 else
529 NtClose( cport );
532 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
533 if (res) return res;
536 info.CompletionPort = old_threadpool.compl_port;
537 info.CompletionKey = (ULONG_PTR)Function;
539 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
542 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
544 if (timeout == INFINITE) return NULL;
545 pTime->QuadPart = (ULONGLONG)timeout * -10000;
546 return pTime;
550 /************************** Timer Queue Impl **************************/
552 static void queue_remove_timer(struct queue_timer *t)
554 /* We MUST hold the queue cs while calling this function. This ensures
555 that we cannot queue another callback for this timer. The runcount
556 being zero makes sure we don't have any already queued. */
557 struct timer_queue *q = t->q;
559 assert(t->runcount == 0);
560 assert(t->destroy);
562 list_remove(&t->entry);
563 if (t->event)
564 NtSetEvent(t->event, NULL);
565 RtlFreeHeap(GetProcessHeap(), 0, t);
567 if (q->quit && list_empty(&q->timers))
568 NtSetEvent(q->event, NULL);
571 static void timer_cleanup_callback(struct queue_timer *t)
573 struct timer_queue *q = t->q;
574 RtlEnterCriticalSection(&q->cs);
576 assert(0 < t->runcount);
577 --t->runcount;
579 if (t->destroy && t->runcount == 0)
580 queue_remove_timer(t);
582 RtlLeaveCriticalSection(&q->cs);
585 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
587 struct queue_timer *t = p;
588 t->callback(t->param, TRUE);
589 timer_cleanup_callback(t);
590 return 0;
593 static inline ULONGLONG queue_current_time(void)
595 LARGE_INTEGER now, freq;
596 NtQueryPerformanceCounter(&now, &freq);
597 return now.QuadPart * 1000 / freq.QuadPart;
600 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
601 BOOL set_event)
603 /* We MUST hold the queue cs while calling this function. */
604 struct timer_queue *q = t->q;
605 struct list *ptr = &q->timers;
607 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
609 if (time != EXPIRE_NEVER)
610 LIST_FOR_EACH(ptr, &q->timers)
612 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
613 if (time < cur->expire)
614 break;
616 list_add_before(ptr, &t->entry);
618 t->expire = time;
620 /* If we insert at the head of the list, we need to expire sooner
621 than expected. */
622 if (set_event && &t->entry == list_head(&q->timers))
623 NtSetEvent(q->event, NULL);
626 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
627 BOOL set_event)
629 /* We MUST hold the queue cs while calling this function. */
630 list_remove(&t->entry);
631 queue_add_timer(t, time, set_event);
634 static void queue_timer_expire(struct timer_queue *q)
636 struct queue_timer *t = NULL;
638 RtlEnterCriticalSection(&q->cs);
639 if (list_head(&q->timers))
641 ULONGLONG now, next;
642 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
643 if (!t->destroy && t->expire <= ((now = queue_current_time())))
645 ++t->runcount;
646 if (t->period)
648 next = t->expire + t->period;
649 /* avoid trigger cascade if overloaded / hibernated */
650 if (next < now)
651 next = now + t->period;
653 else
654 next = EXPIRE_NEVER;
655 queue_move_timer(t, next, FALSE);
657 else
658 t = NULL;
660 RtlLeaveCriticalSection(&q->cs);
662 if (t)
664 if (t->flags & WT_EXECUTEINTIMERTHREAD)
665 timer_callback_wrapper(t);
666 else
668 ULONG flags
669 = (t->flags
670 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
671 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
672 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
673 if (status != STATUS_SUCCESS)
674 timer_cleanup_callback(t);
679 static ULONG queue_get_timeout(struct timer_queue *q)
681 struct queue_timer *t;
682 ULONG timeout = INFINITE;
684 RtlEnterCriticalSection(&q->cs);
685 if (list_head(&q->timers))
687 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
688 assert(!t->destroy || t->expire == EXPIRE_NEVER);
690 if (t->expire != EXPIRE_NEVER)
692 ULONGLONG time = queue_current_time();
693 timeout = t->expire < time ? 0 : t->expire - time;
696 RtlLeaveCriticalSection(&q->cs);
698 return timeout;
701 static void WINAPI timer_queue_thread_proc(LPVOID p)
703 struct timer_queue *q = p;
704 ULONG timeout_ms;
706 timeout_ms = INFINITE;
707 for (;;)
709 LARGE_INTEGER timeout;
710 NTSTATUS status;
711 BOOL done = FALSE;
713 status = NtWaitForSingleObject(
714 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
716 if (status == STATUS_WAIT_0)
718 /* There are two possible ways to trigger the event. Either
719 we are quitting and the last timer got removed, or a new
720 timer got put at the head of the list so we need to adjust
721 our timeout. */
722 RtlEnterCriticalSection(&q->cs);
723 if (q->quit && list_empty(&q->timers))
724 done = TRUE;
725 RtlLeaveCriticalSection(&q->cs);
727 else if (status == STATUS_TIMEOUT)
728 queue_timer_expire(q);
730 if (done)
731 break;
733 timeout_ms = queue_get_timeout(q);
736 NtClose(q->event);
737 RtlDeleteCriticalSection(&q->cs);
738 q->magic = 0;
739 RtlFreeHeap(GetProcessHeap(), 0, q);
740 RtlExitUserThread( 0 );
743 static void queue_destroy_timer(struct queue_timer *t)
745 /* We MUST hold the queue cs while calling this function. */
746 t->destroy = TRUE;
747 if (t->runcount == 0)
748 /* Ensure a timer is promptly removed. If callbacks are pending,
749 it will be removed after the last one finishes by the callback
750 cleanup wrapper. */
751 queue_remove_timer(t);
752 else
753 /* Make sure no destroyed timer masks an active timer at the head
754 of the sorted list. */
755 queue_move_timer(t, EXPIRE_NEVER, FALSE);
758 /***********************************************************************
759 * RtlCreateTimerQueue (NTDLL.@)
761 * Creates a timer queue object and returns a handle to it.
763 * PARAMS
764 * NewTimerQueue [O] The newly created queue.
766 * RETURNS
767 * Success: STATUS_SUCCESS.
768 * Failure: Any NTSTATUS code.
770 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
772 NTSTATUS status;
773 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
774 if (!q)
775 return STATUS_NO_MEMORY;
777 RtlInitializeCriticalSection(&q->cs);
778 list_init(&q->timers);
779 q->quit = FALSE;
780 q->magic = TIMER_QUEUE_MAGIC;
781 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
782 if (status != STATUS_SUCCESS)
784 RtlFreeHeap(GetProcessHeap(), 0, q);
785 return status;
787 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
788 timer_queue_thread_proc, q, &q->thread, NULL);
789 if (status != STATUS_SUCCESS)
791 NtClose(q->event);
792 RtlFreeHeap(GetProcessHeap(), 0, q);
793 return status;
796 *NewTimerQueue = q;
797 return STATUS_SUCCESS;
800 /***********************************************************************
801 * RtlDeleteTimerQueueEx (NTDLL.@)
803 * Deletes a timer queue object.
805 * PARAMS
806 * TimerQueue [I] The timer queue to destroy.
807 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
808 * wait until all timers are finished firing before
809 * returning. Otherwise, return immediately and set the
810 * event when all timers are done.
812 * RETURNS
813 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
814 * Failure: Any NTSTATUS code.
816 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
818 struct timer_queue *q = TimerQueue;
819 struct queue_timer *t, *temp;
820 HANDLE thread;
821 NTSTATUS status;
823 if (!q || q->magic != TIMER_QUEUE_MAGIC)
824 return STATUS_INVALID_HANDLE;
826 thread = q->thread;
828 RtlEnterCriticalSection(&q->cs);
829 q->quit = TRUE;
830 if (list_head(&q->timers))
831 /* When the last timer is removed, it will signal the timer thread to
832 exit... */
833 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
834 queue_destroy_timer(t);
835 else
836 /* However if we have none, we must do it ourselves. */
837 NtSetEvent(q->event, NULL);
838 RtlLeaveCriticalSection(&q->cs);
840 if (CompletionEvent == INVALID_HANDLE_VALUE)
842 NtWaitForSingleObject(thread, FALSE, NULL);
843 status = STATUS_SUCCESS;
845 else
847 if (CompletionEvent)
849 FIXME("asynchronous return on completion event unimplemented\n");
850 NtWaitForSingleObject(thread, FALSE, NULL);
851 NtSetEvent(CompletionEvent, NULL);
853 status = STATUS_PENDING;
856 NtClose(thread);
857 return status;
860 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
862 static struct timer_queue *default_timer_queue;
864 if (TimerQueue)
865 return TimerQueue;
866 else
868 if (!default_timer_queue)
870 HANDLE q;
871 NTSTATUS status = RtlCreateTimerQueue(&q);
872 if (status == STATUS_SUCCESS)
874 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
875 if (p)
876 /* Got beat to the punch. */
877 RtlDeleteTimerQueueEx(q, NULL);
880 return default_timer_queue;
884 /***********************************************************************
885 * RtlCreateTimer (NTDLL.@)
887 * Creates a new timer associated with the given queue.
889 * PARAMS
890 * TimerQueue [I] The queue to hold the timer.
891 * NewTimer [O] The newly created timer.
892 * Callback [I] The callback to fire.
893 * Parameter [I] The argument for the callback.
894 * DueTime [I] The delay, in milliseconds, before first firing the
895 * timer.
896 * Period [I] The period, in milliseconds, at which to fire the timer
897 * after the first callback. If zero, the timer will only
898 * fire once. It still needs to be deleted with
899 * RtlDeleteTimer.
900 * Flags [I] Flags controlling the execution of the callback. In
901 * addition to the WT_* thread pool flags (see
902 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
903 * WT_EXECUTEONLYONCE are supported.
905 * RETURNS
906 * Success: STATUS_SUCCESS.
907 * Failure: Any NTSTATUS code.
909 NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer,
910 RTL_WAITORTIMERCALLBACKFUNC Callback,
911 PVOID Parameter, DWORD DueTime, DWORD Period,
912 ULONG Flags)
914 NTSTATUS status;
915 struct queue_timer *t;
916 struct timer_queue *q = get_timer_queue(TimerQueue);
918 if (!q) return STATUS_NO_MEMORY;
919 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
921 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
922 if (!t)
923 return STATUS_NO_MEMORY;
925 t->q = q;
926 t->runcount = 0;
927 t->callback = Callback;
928 t->param = Parameter;
929 t->period = Period;
930 t->flags = Flags;
931 t->destroy = FALSE;
932 t->event = NULL;
934 status = STATUS_SUCCESS;
935 RtlEnterCriticalSection(&q->cs);
936 if (q->quit)
937 status = STATUS_INVALID_HANDLE;
938 else
939 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
940 RtlLeaveCriticalSection(&q->cs);
942 if (status == STATUS_SUCCESS)
943 *NewTimer = t;
944 else
945 RtlFreeHeap(GetProcessHeap(), 0, t);
947 return status;
950 /***********************************************************************
951 * RtlUpdateTimer (NTDLL.@)
953 * Changes the time at which a timer expires.
955 * PARAMS
956 * TimerQueue [I] The queue that holds the timer.
957 * Timer [I] The timer to update.
958 * DueTime [I] The delay, in milliseconds, before next firing the timer.
959 * Period [I] The period, in milliseconds, at which to fire the timer
960 * after the first callback. If zero, the timer will not
961 * refire once. It still needs to be deleted with
962 * RtlDeleteTimer.
964 * RETURNS
965 * Success: STATUS_SUCCESS.
966 * Failure: Any NTSTATUS code.
968 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
969 DWORD DueTime, DWORD Period)
971 struct queue_timer *t = Timer;
972 struct timer_queue *q = t->q;
974 RtlEnterCriticalSection(&q->cs);
975 /* Can't change a timer if it was once-only or destroyed. */
976 if (t->expire != EXPIRE_NEVER)
978 t->period = Period;
979 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
981 RtlLeaveCriticalSection(&q->cs);
983 return STATUS_SUCCESS;
986 /***********************************************************************
987 * RtlDeleteTimer (NTDLL.@)
989 * Cancels a timer-queue timer.
991 * PARAMS
992 * TimerQueue [I] The queue that holds the timer.
993 * Timer [I] The timer to update.
994 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
995 * wait until the timer is finished firing all pending
996 * callbacks before returning. Otherwise, return
997 * immediately and set the timer is done.
999 * RETURNS
1000 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1001 or if the completion event is NULL.
1002 * Failure: Any NTSTATUS code.
1004 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1005 HANDLE CompletionEvent)
1007 struct queue_timer *t = Timer;
1008 struct timer_queue *q;
1009 NTSTATUS status = STATUS_PENDING;
1010 HANDLE event = NULL;
1012 if (!Timer)
1013 return STATUS_INVALID_PARAMETER_1;
1014 q = t->q;
1015 if (CompletionEvent == INVALID_HANDLE_VALUE)
1017 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1018 if (status == STATUS_SUCCESS)
1019 status = STATUS_PENDING;
1021 else if (CompletionEvent)
1022 event = CompletionEvent;
1024 RtlEnterCriticalSection(&q->cs);
1025 t->event = event;
1026 if (t->runcount == 0 && event)
1027 status = STATUS_SUCCESS;
1028 queue_destroy_timer(t);
1029 RtlLeaveCriticalSection(&q->cs);
1031 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1033 if (status == STATUS_PENDING)
1035 NtWaitForSingleObject(event, FALSE, NULL);
1036 status = STATUS_SUCCESS;
1038 NtClose(event);
1041 return status;
1044 /***********************************************************************
1045 * timerqueue_thread_proc (internal)
1047 static void CALLBACK timerqueue_thread_proc( void *param )
1049 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1050 struct threadpool_object *other_timer;
1051 LARGE_INTEGER now, timeout;
1052 struct list *ptr;
1054 TRACE( "starting timer queue thread\n" );
1056 RtlEnterCriticalSection( &timerqueue.cs );
1057 for (;;)
1059 NtQuerySystemTime( &now );
1061 /* Check for expired timers. */
1062 while ((ptr = list_head( &timerqueue.pending_timers )))
1064 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1065 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1066 assert( timer->u.timer.timer_pending );
1067 if (timer->u.timer.timeout > now.QuadPart)
1068 break;
1070 /* Queue a new callback in one of the worker threads. */
1071 list_remove( &timer->u.timer.timer_entry );
1072 timer->u.timer.timer_pending = FALSE;
1073 tp_object_submit( timer, FALSE );
1075 /* Insert the timer back into the queue, except it's marked for shutdown. */
1076 if (timer->u.timer.period && !timer->shutdown)
1078 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1079 if (timer->u.timer.timeout <= now.QuadPart)
1080 timer->u.timer.timeout = now.QuadPart + 1;
1082 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1083 struct threadpool_object, u.timer.timer_entry )
1085 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1086 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1087 break;
1089 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1090 timer->u.timer.timer_pending = TRUE;
1094 timeout_lower = timeout_upper = MAXLONGLONG;
1096 /* Determine next timeout and use the window length to optimize wakeup times. */
1097 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1098 struct threadpool_object, u.timer.timer_entry )
1100 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1101 if (other_timer->u.timer.timeout >= timeout_upper)
1102 break;
1104 timeout_lower = other_timer->u.timer.timeout;
1105 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1106 if (new_timeout < timeout_upper)
1107 timeout_upper = new_timeout;
1110 /* Wait for timer update events or until the next timer expires. */
1111 if (timerqueue.objcount)
1113 timeout.QuadPart = timeout_lower;
1114 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1115 continue;
1118 /* All timers have been destroyed, if no new timers are created
1119 * within some amount of time, then we can shutdown this thread. */
1120 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1121 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1122 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1124 break;
1128 timerqueue.thread_running = FALSE;
1129 RtlLeaveCriticalSection( &timerqueue.cs );
1131 TRACE( "terminating timer queue thread\n" );
1132 RtlExitUserThread( 0 );
1135 /***********************************************************************
1136 * tp_new_worker_thread (internal)
1138 * Create and account a new worker thread for the desired pool.
1140 static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1142 HANDLE thread;
1143 NTSTATUS status;
1145 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1146 threadpool_worker_proc, pool, &thread, NULL );
1147 if (status == STATUS_SUCCESS)
1149 InterlockedIncrement( &pool->refcount );
1150 pool->num_workers++;
1151 NtClose( thread );
1153 return status;
1156 /***********************************************************************
1157 * tp_timerqueue_lock (internal)
1159 * Acquires a lock on the global timerqueue. When the lock is acquired
1160 * successfully, it is guaranteed that the timer thread is running.
1162 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1164 NTSTATUS status = STATUS_SUCCESS;
1165 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1167 timer->u.timer.timer_initialized = FALSE;
1168 timer->u.timer.timer_pending = FALSE;
1169 timer->u.timer.timer_set = FALSE;
1170 timer->u.timer.timeout = 0;
1171 timer->u.timer.period = 0;
1172 timer->u.timer.window_length = 0;
1174 RtlEnterCriticalSection( &timerqueue.cs );
1176 /* Make sure that the timerqueue thread is running. */
1177 if (!timerqueue.thread_running)
1179 HANDLE thread;
1180 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1181 timerqueue_thread_proc, NULL, &thread, NULL );
1182 if (status == STATUS_SUCCESS)
1184 timerqueue.thread_running = TRUE;
1185 NtClose( thread );
1189 if (status == STATUS_SUCCESS)
1191 timer->u.timer.timer_initialized = TRUE;
1192 timerqueue.objcount++;
1195 RtlLeaveCriticalSection( &timerqueue.cs );
1196 return status;
1199 /***********************************************************************
1200 * tp_timerqueue_unlock (internal)
1202 * Releases a lock on the global timerqueue.
1204 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1206 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1208 RtlEnterCriticalSection( &timerqueue.cs );
1209 if (timer->u.timer.timer_initialized)
1211 /* If timer was pending, remove it. */
1212 if (timer->u.timer.timer_pending)
1214 list_remove( &timer->u.timer.timer_entry );
1215 timer->u.timer.timer_pending = FALSE;
1218 /* If the last timer object was destroyed, then wake up the thread. */
1219 if (!--timerqueue.objcount)
1221 assert( list_empty( &timerqueue.pending_timers ) );
1222 RtlWakeAllConditionVariable( &timerqueue.update_event );
1225 timer->u.timer.timer_initialized = FALSE;
1227 RtlLeaveCriticalSection( &timerqueue.cs );
1230 /***********************************************************************
1231 * waitqueue_thread_proc (internal)
1233 static void CALLBACK waitqueue_thread_proc( void *param )
1235 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1236 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1237 struct waitqueue_bucket *bucket = param;
1238 struct threadpool_object *wait, *next;
1239 LARGE_INTEGER now, timeout;
1240 DWORD num_handles;
1241 NTSTATUS status;
1243 TRACE( "starting wait queue thread\n" );
1245 RtlEnterCriticalSection( &waitqueue.cs );
1247 for (;;)
1249 NtQuerySystemTime( &now );
1250 timeout.QuadPart = MAXLONGLONG;
1251 num_handles = 0;
1253 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1254 u.wait.wait_entry )
1256 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1257 if (wait->u.wait.timeout <= now.QuadPart)
1259 /* Wait object timed out. */
1260 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1262 list_remove( &wait->u.wait.wait_entry );
1263 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1265 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1267 InterlockedIncrement( &wait->refcount );
1268 wait->num_pending_callbacks++;
1269 RtlEnterCriticalSection( &wait->pool->cs );
1270 tp_object_execute( wait, TRUE );
1271 RtlLeaveCriticalSection( &wait->pool->cs );
1272 tp_object_release( wait );
1274 else tp_object_submit( wait, FALSE );
1276 else
1278 if (wait->u.wait.timeout < timeout.QuadPart)
1279 timeout.QuadPart = wait->u.wait.timeout;
1281 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1282 InterlockedIncrement( &wait->refcount );
1283 objects[num_handles] = wait;
1284 handles[num_handles] = wait->u.wait.handle;
1285 num_handles++;
1289 if (!bucket->objcount)
1291 /* All wait objects have been destroyed, if no new wait objects are created
1292 * within some amount of time, then we can shutdown this thread. */
1293 assert( num_handles == 0 );
1294 RtlLeaveCriticalSection( &waitqueue.cs );
1295 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1296 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1297 RtlEnterCriticalSection( &waitqueue.cs );
1299 if (status == STATUS_TIMEOUT && !bucket->objcount)
1300 break;
1302 else
1304 handles[num_handles] = bucket->update_event;
1305 RtlLeaveCriticalSection( &waitqueue.cs );
1306 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1307 RtlEnterCriticalSection( &waitqueue.cs );
1309 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1311 wait = objects[status - STATUS_WAIT_0];
1312 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1313 if (wait->u.wait.bucket)
1315 /* Wait object signaled. */
1316 assert( wait->u.wait.bucket == bucket );
1317 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1319 list_remove( &wait->u.wait.wait_entry );
1320 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1322 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1324 wait->u.wait.signaled++;
1325 wait->num_pending_callbacks++;
1326 RtlEnterCriticalSection( &wait->pool->cs );
1327 tp_object_execute( wait, TRUE );
1328 RtlLeaveCriticalSection( &wait->pool->cs );
1330 else tp_object_submit( wait, TRUE );
1332 else
1333 WARN("wait object %p triggered while object was destroyed\n", wait);
1336 /* Release temporary references to wait objects. */
1337 while (num_handles)
1339 wait = objects[--num_handles];
1340 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1341 tp_object_release( wait );
1345 /* Try to merge bucket with other threads. */
1346 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1347 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1349 struct waitqueue_bucket *other_bucket;
1350 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1352 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1353 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1355 other_bucket->objcount += bucket->objcount;
1356 bucket->objcount = 0;
1358 /* Update reserved list. */
1359 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1361 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1362 wait->u.wait.bucket = other_bucket;
1364 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1366 /* Update waiting list. */
1367 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1369 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1370 wait->u.wait.bucket = other_bucket;
1372 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1374 /* Move bucket to the end, to keep the probability of
1375 * newly added wait objects as small as possible. */
1376 list_remove( &bucket->bucket_entry );
1377 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1379 NtSetEvent( other_bucket->update_event, NULL );
1380 break;
1386 /* Remove this bucket from the list. */
1387 list_remove( &bucket->bucket_entry );
1388 if (!--waitqueue.num_buckets)
1389 assert( list_empty( &waitqueue.buckets ) );
1391 RtlLeaveCriticalSection( &waitqueue.cs );
1393 TRACE( "terminating wait queue thread\n" );
1395 assert( bucket->objcount == 0 );
1396 assert( list_empty( &bucket->reserved ) );
1397 assert( list_empty( &bucket->waiting ) );
1398 NtClose( bucket->update_event );
1400 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1401 RtlExitUserThread( 0 );
1404 /***********************************************************************
1405 * tp_waitqueue_lock (internal)
1407 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1409 struct waitqueue_bucket *bucket;
1410 NTSTATUS status;
1411 HANDLE thread;
1412 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1413 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1415 wait->u.wait.signaled = 0;
1416 wait->u.wait.bucket = NULL;
1417 wait->u.wait.wait_pending = FALSE;
1418 wait->u.wait.timeout = 0;
1419 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1421 RtlEnterCriticalSection( &waitqueue.cs );
1423 /* Try to assign to existing bucket if possible. */
1424 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1426 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1428 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1429 wait->u.wait.bucket = bucket;
1430 bucket->objcount++;
1432 status = STATUS_SUCCESS;
1433 goto out;
1437 /* Create a new bucket and corresponding worker thread. */
1438 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1439 if (!bucket)
1441 status = STATUS_NO_MEMORY;
1442 goto out;
1445 bucket->objcount = 0;
1446 bucket->alertable = alertable;
1447 list_init( &bucket->reserved );
1448 list_init( &bucket->waiting );
1450 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1451 NULL, SynchronizationEvent, FALSE );
1452 if (status)
1454 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1455 goto out;
1458 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1459 waitqueue_thread_proc, bucket, &thread, NULL );
1460 if (status == STATUS_SUCCESS)
1462 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1463 waitqueue.num_buckets++;
1465 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1466 wait->u.wait.bucket = bucket;
1467 bucket->objcount++;
1469 NtClose( thread );
1471 else
1473 NtClose( bucket->update_event );
1474 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1477 out:
1478 RtlLeaveCriticalSection( &waitqueue.cs );
1479 return status;
1482 /***********************************************************************
1483 * tp_waitqueue_unlock (internal)
1485 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1487 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1489 RtlEnterCriticalSection( &waitqueue.cs );
1490 if (wait->u.wait.bucket)
1492 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1493 assert( bucket->objcount > 0 );
1495 list_remove( &wait->u.wait.wait_entry );
1496 wait->u.wait.bucket = NULL;
1497 bucket->objcount--;
1499 NtSetEvent( bucket->update_event, NULL );
1501 RtlLeaveCriticalSection( &waitqueue.cs );
1504 static void CALLBACK ioqueue_thread_proc( void *param )
1506 struct io_completion *completion;
1507 struct threadpool_object *io;
1508 IO_STATUS_BLOCK iosb;
1509 ULONG_PTR key, value;
1510 BOOL destroy, skip;
1511 NTSTATUS status;
1513 TRACE( "starting I/O completion thread\n" );
1515 RtlEnterCriticalSection( &ioqueue.cs );
1517 for (;;)
1519 RtlLeaveCriticalSection( &ioqueue.cs );
1520 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1521 ERR("NtRemoveIoCompletion failed, status %#x.\n", status);
1522 RtlEnterCriticalSection( &ioqueue.cs );
1524 destroy = skip = FALSE;
1525 io = (struct threadpool_object *)key;
1527 TRACE( "io %p, iosb.Status %#x.\n", io, iosb.u.Status );
1529 if (io && (io->shutdown || io->u.io.shutting_down))
1531 RtlEnterCriticalSection( &io->pool->cs );
1532 if (!io->u.io.pending_count)
1534 if (io->u.io.skipped_count)
1535 --io->u.io.skipped_count;
1537 if (io->u.io.skipped_count)
1538 skip = TRUE;
1539 else
1540 destroy = TRUE;
1542 RtlLeaveCriticalSection( &io->pool->cs );
1543 if (skip) continue;
1546 if (destroy)
1548 --ioqueue.objcount;
1549 TRACE( "Releasing io %p.\n", io );
1550 io->shutdown = TRUE;
1551 tp_object_release( io );
1553 else if (io)
1555 RtlEnterCriticalSection( &io->pool->cs );
1557 TRACE( "pending_count %u.\n", io->u.io.pending_count );
1559 if (io->u.io.pending_count)
1561 --io->u.io.pending_count;
1562 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1563 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1565 ERR( "Failed to allocate memory.\n" );
1566 RtlLeaveCriticalSection( &io->pool->cs );
1567 continue;
1570 completion = &io->u.io.completions[io->u.io.completion_count++];
1571 completion->iosb = iosb;
1572 completion->cvalue = value;
1574 tp_object_submit( io, FALSE );
1576 RtlLeaveCriticalSection( &io->pool->cs );
1579 if (!ioqueue.objcount)
1581 /* All I/O objects have been destroyed; if no new objects are
1582 * created within some amount of time, then we can shutdown this
1583 * thread. */
1584 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1585 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1586 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1587 break;
1591 ioqueue.thread_running = FALSE;
1592 RtlLeaveCriticalSection( &ioqueue.cs );
1594 TRACE( "terminating I/O completion thread\n" );
1596 RtlExitUserThread( 0 );
1599 static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
1601 NTSTATUS status = STATUS_SUCCESS;
1603 assert( io->type == TP_OBJECT_TYPE_IO );
1605 RtlEnterCriticalSection( &ioqueue.cs );
1607 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1608 IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
1610 RtlLeaveCriticalSection( &ioqueue.cs );
1611 return status;
1614 if (!ioqueue.thread_running)
1616 HANDLE thread;
1618 if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
1619 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1621 ioqueue.thread_running = TRUE;
1622 NtClose( thread );
1626 if (status == STATUS_SUCCESS)
1628 FILE_COMPLETION_INFORMATION info;
1629 IO_STATUS_BLOCK iosb;
1631 info.CompletionPort = ioqueue.port;
1632 info.CompletionKey = (ULONG_PTR)io;
1634 status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
1637 if (status == STATUS_SUCCESS)
1639 if (!ioqueue.objcount++)
1640 RtlWakeConditionVariable( &ioqueue.update_event );
1643 RtlLeaveCriticalSection( &ioqueue.cs );
1644 return status;
1647 /***********************************************************************
1648 * tp_threadpool_alloc (internal)
1650 * Allocates a new threadpool object.
1652 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1654 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress );
1655 struct threadpool *pool;
1656 unsigned int i;
1658 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1659 if (!pool)
1660 return STATUS_NO_MEMORY;
1662 pool->refcount = 1;
1663 pool->objcount = 0;
1664 pool->shutdown = FALSE;
1666 RtlInitializeCriticalSection( &pool->cs );
1667 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1669 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1670 list_init( &pool->pools[i] );
1671 RtlInitializeConditionVariable( &pool->update_event );
1673 pool->max_workers = 500;
1674 pool->min_workers = 0;
1675 pool->num_workers = 0;
1676 pool->num_busy_workers = 0;
1677 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1678 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1680 TRACE( "allocated threadpool %p\n", pool );
1682 *out = pool;
1683 return STATUS_SUCCESS;
1686 /***********************************************************************
1687 * tp_threadpool_shutdown (internal)
1689 * Prepares the shutdown of a threadpool object and notifies all worker
1690 * threads to terminate (after all remaining work items have been
1691 * processed).
1693 static void tp_threadpool_shutdown( struct threadpool *pool )
1695 assert( pool != default_threadpool );
1697 pool->shutdown = TRUE;
1698 RtlWakeAllConditionVariable( &pool->update_event );
1701 /***********************************************************************
1702 * tp_threadpool_release (internal)
1704 * Releases a reference to a threadpool object.
1706 static BOOL tp_threadpool_release( struct threadpool *pool )
1708 unsigned int i;
1710 if (InterlockedDecrement( &pool->refcount ))
1711 return FALSE;
1713 TRACE( "destroying threadpool %p\n", pool );
1715 assert( pool->shutdown );
1716 assert( !pool->objcount );
1717 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1718 assert( list_empty( &pool->pools[i] ) );
1720 pool->cs.DebugInfo->Spare[0] = 0;
1721 RtlDeleteCriticalSection( &pool->cs );
1723 RtlFreeHeap( GetProcessHeap(), 0, pool );
1724 return TRUE;
1727 /***********************************************************************
1728 * tp_threadpool_lock (internal)
1730 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1731 * block. When the lock is acquired successfully, it is guaranteed that
1732 * there is at least one worker thread to process tasks.
1734 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1736 struct threadpool *pool = NULL;
1737 NTSTATUS status = STATUS_SUCCESS;
1739 if (environment)
1741 /* Validate environment parameters. */
1742 if (environment->Version == 3)
1744 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1746 switch (environment3->CallbackPriority)
1748 case TP_CALLBACK_PRIORITY_HIGH:
1749 case TP_CALLBACK_PRIORITY_NORMAL:
1750 case TP_CALLBACK_PRIORITY_LOW:
1751 break;
1752 default:
1753 return STATUS_INVALID_PARAMETER;
1757 pool = (struct threadpool *)environment->Pool;
1760 if (!pool)
1762 if (!default_threadpool)
1764 status = tp_threadpool_alloc( &pool );
1765 if (status != STATUS_SUCCESS)
1766 return status;
1768 if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool, NULL ) != NULL)
1770 tp_threadpool_shutdown( pool );
1771 tp_threadpool_release( pool );
1775 pool = default_threadpool;
1778 RtlEnterCriticalSection( &pool->cs );
1780 /* Make sure that the threadpool has at least one thread. */
1781 if (!pool->num_workers)
1782 status = tp_new_worker_thread( pool );
1784 /* Keep a reference, and increment objcount to ensure that the
1785 * last thread doesn't terminate. */
1786 if (status == STATUS_SUCCESS)
1788 InterlockedIncrement( &pool->refcount );
1789 pool->objcount++;
1792 RtlLeaveCriticalSection( &pool->cs );
1794 if (status != STATUS_SUCCESS)
1795 return status;
1797 *out = pool;
1798 return STATUS_SUCCESS;
1801 /***********************************************************************
1802 * tp_threadpool_unlock (internal)
1804 * Releases a lock on a threadpool.
1806 static void tp_threadpool_unlock( struct threadpool *pool )
1808 RtlEnterCriticalSection( &pool->cs );
1809 pool->objcount--;
1810 RtlLeaveCriticalSection( &pool->cs );
1811 tp_threadpool_release( pool );
1814 /***********************************************************************
1815 * tp_group_alloc (internal)
1817 * Allocates a new threadpool group object.
1819 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1821 struct threadpool_group *group;
1823 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1824 if (!group)
1825 return STATUS_NO_MEMORY;
1827 group->refcount = 1;
1828 group->shutdown = FALSE;
1830 RtlInitializeCriticalSection( &group->cs );
1831 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1833 list_init( &group->members );
1835 TRACE( "allocated group %p\n", group );
1837 *out = group;
1838 return STATUS_SUCCESS;
1841 /***********************************************************************
1842 * tp_group_shutdown (internal)
1844 * Marks the group object for shutdown.
1846 static void tp_group_shutdown( struct threadpool_group *group )
1848 group->shutdown = TRUE;
1851 /***********************************************************************
1852 * tp_group_release (internal)
1854 * Releases a reference to a group object.
1856 static BOOL tp_group_release( struct threadpool_group *group )
1858 if (InterlockedDecrement( &group->refcount ))
1859 return FALSE;
1861 TRACE( "destroying group %p\n", group );
1863 assert( group->shutdown );
1864 assert( list_empty( &group->members ) );
1866 group->cs.DebugInfo->Spare[0] = 0;
1867 RtlDeleteCriticalSection( &group->cs );
1869 RtlFreeHeap( GetProcessHeap(), 0, group );
1870 return TRUE;
1873 /***********************************************************************
1874 * tp_object_initialize (internal)
1876 * Initializes members of a threadpool object.
1878 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
1879 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
1881 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
1883 object->refcount = 1;
1884 object->shutdown = FALSE;
1886 object->pool = pool;
1887 object->group = NULL;
1888 object->userdata = userdata;
1889 object->group_cancel_callback = NULL;
1890 object->finalization_callback = NULL;
1891 object->may_run_long = 0;
1892 object->race_dll = NULL;
1893 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
1895 memset( &object->group_entry, 0, sizeof(object->group_entry) );
1896 object->is_group_member = FALSE;
1898 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
1899 RtlInitializeConditionVariable( &object->finished_event );
1900 RtlInitializeConditionVariable( &object->group_finished_event );
1901 object->completed_event = NULL;
1902 object->num_pending_callbacks = 0;
1903 object->num_running_callbacks = 0;
1904 object->num_associated_callbacks = 0;
1906 if (environment)
1908 if (environment->Version != 1 && environment->Version != 3)
1909 FIXME( "unsupported environment version %u\n", environment->Version );
1911 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
1912 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
1913 object->finalization_callback = environment->FinalizationCallback;
1914 object->may_run_long = environment->u.s.LongFunction != 0;
1915 object->race_dll = environment->RaceDll;
1916 if (environment->Version == 3)
1918 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1920 object->priority = environment_v3->CallbackPriority;
1921 assert( object->priority < ARRAY_SIZE(pool->pools) );
1924 if (environment->ActivationContext)
1925 FIXME( "activation context not supported yet\n" );
1927 if (environment->u.s.Persistent)
1928 FIXME( "persistent threads not supported yet\n" );
1931 if (object->race_dll)
1932 LdrAddRefDll( 0, object->race_dll );
1934 TRACE( "allocated object %p of type %u\n", object, object->type );
1936 /* For simple callbacks we have to run tp_object_submit before adding this object
1937 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1938 * will be set, and tp_object_submit would fail with an assertion. */
1940 if (is_simple_callback)
1941 tp_object_submit( object, FALSE );
1943 if (object->group)
1945 struct threadpool_group *group = object->group;
1946 InterlockedIncrement( &group->refcount );
1948 RtlEnterCriticalSection( &group->cs );
1949 list_add_tail( &group->members, &object->group_entry );
1950 object->is_group_member = TRUE;
1951 RtlLeaveCriticalSection( &group->cs );
1954 if (is_simple_callback)
1955 tp_object_release( object );
1958 static void tp_object_prio_queue( struct threadpool_object *object )
1960 ++object->pool->num_busy_workers;
1961 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
1964 /***********************************************************************
1965 * tp_object_submit (internal)
1967 * Submits a threadpool object to the associated threadpool. This
1968 * function has to be VOID because TpPostWork can never fail on Windows.
1970 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
1972 struct threadpool *pool = object->pool;
1973 NTSTATUS status = STATUS_UNSUCCESSFUL;
1975 assert( !object->shutdown );
1976 assert( !pool->shutdown );
1978 RtlEnterCriticalSection( &pool->cs );
1980 /* Start new worker threads if required. */
1981 if (pool->num_busy_workers >= pool->num_workers &&
1982 pool->num_workers < pool->max_workers)
1983 status = tp_new_worker_thread( pool );
1985 /* Queue work item and increment refcount. */
1986 InterlockedIncrement( &object->refcount );
1987 if (!object->num_pending_callbacks++)
1988 tp_object_prio_queue( object );
1990 /* Count how often the object was signaled. */
1991 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
1992 object->u.wait.signaled++;
1994 /* No new thread started - wake up one existing thread. */
1995 if (status != STATUS_SUCCESS)
1997 assert( pool->num_workers > 0 );
1998 RtlWakeConditionVariable( &pool->update_event );
2001 RtlLeaveCriticalSection( &pool->cs );
2004 /***********************************************************************
2005 * tp_object_cancel (internal)
2007 * Cancels all currently pending callbacks for a specific object.
2009 static void tp_object_cancel( struct threadpool_object *object )
2011 struct threadpool *pool = object->pool;
2012 LONG pending_callbacks = 0;
2014 RtlEnterCriticalSection( &pool->cs );
2015 if (object->num_pending_callbacks)
2017 pending_callbacks = object->num_pending_callbacks;
2018 object->num_pending_callbacks = 0;
2019 list_remove( &object->pool_entry );
2021 if (object->type == TP_OBJECT_TYPE_WAIT)
2022 object->u.wait.signaled = 0;
2024 if (object->type == TP_OBJECT_TYPE_IO)
2026 object->u.io.skipped_count += object->u.io.pending_count;
2027 object->u.io.pending_count = 0;
2029 RtlLeaveCriticalSection( &pool->cs );
2031 while (pending_callbacks--)
2032 tp_object_release( object );
2035 static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
2037 if (object->num_pending_callbacks)
2038 return FALSE;
2039 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2040 return FALSE;
2042 if (group)
2043 return !object->num_running_callbacks;
2044 else
2045 return !object->num_associated_callbacks;
2048 /***********************************************************************
2049 * tp_object_wait (internal)
2051 * Waits until all pending and running callbacks of a specific object
2052 * have been processed.
2054 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2056 struct threadpool *pool = object->pool;
2058 RtlEnterCriticalSection( &pool->cs );
2059 while (!object_is_finished( object, group_wait ))
2061 if (group_wait)
2062 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2063 else
2064 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2066 RtlLeaveCriticalSection( &pool->cs );
2069 static void tp_ioqueue_unlock( struct threadpool_object *io )
2071 assert( io->type == TP_OBJECT_TYPE_IO );
2073 RtlEnterCriticalSection( &ioqueue.cs );
2075 assert(ioqueue.objcount);
2077 if (!io->shutdown && !--ioqueue.objcount)
2078 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2080 RtlLeaveCriticalSection( &ioqueue.cs );
2083 /***********************************************************************
2084 * tp_object_prepare_shutdown (internal)
2086 * Prepares a threadpool object for shutdown.
2088 static void tp_object_prepare_shutdown( struct threadpool_object *object )
2090 if (object->type == TP_OBJECT_TYPE_TIMER)
2091 tp_timerqueue_unlock( object );
2092 else if (object->type == TP_OBJECT_TYPE_WAIT)
2093 tp_waitqueue_unlock( object );
2094 else if (object->type == TP_OBJECT_TYPE_IO)
2095 tp_ioqueue_unlock( object );
2098 /***********************************************************************
2099 * tp_object_release (internal)
2101 * Releases a reference to a threadpool object.
2103 static BOOL tp_object_release( struct threadpool_object *object )
2105 if (InterlockedDecrement( &object->refcount ))
2106 return FALSE;
2108 TRACE( "destroying object %p of type %u\n", object, object->type );
2110 assert( object->shutdown );
2111 assert( !object->num_pending_callbacks );
2112 assert( !object->num_running_callbacks );
2113 assert( !object->num_associated_callbacks );
2115 /* release reference to the group */
2116 if (object->group)
2118 struct threadpool_group *group = object->group;
2120 RtlEnterCriticalSection( &group->cs );
2121 if (object->is_group_member)
2123 list_remove( &object->group_entry );
2124 object->is_group_member = FALSE;
2126 RtlLeaveCriticalSection( &group->cs );
2128 tp_group_release( group );
2131 tp_threadpool_unlock( object->pool );
2133 if (object->race_dll)
2134 LdrUnloadDll( object->race_dll );
2136 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2137 NtSetEvent( object->completed_event, NULL );
2139 RtlFreeHeap( GetProcessHeap(), 0, object );
2140 return TRUE;
2143 static struct list *threadpool_get_next_item( const struct threadpool *pool )
2145 struct list *ptr;
2146 unsigned int i;
2148 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2150 if ((ptr = list_head( &pool->pools[i] )))
2151 break;
2154 return ptr;
2157 /***********************************************************************
2158 * tp_object_execute (internal)
2160 * Executes a threadpool object callback, object->pool->cs has to be
2161 * held.
2163 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2165 TP_CALLBACK_INSTANCE *callback_instance;
2166 struct threadpool_instance instance;
2167 struct io_completion completion;
2168 struct threadpool *pool = object->pool;
2169 TP_WAIT_RESULT wait_result = 0;
2170 NTSTATUS status;
2172 object->num_pending_callbacks--;
2174 /* For wait objects check if they were signaled or have timed out. */
2175 if (object->type == TP_OBJECT_TYPE_WAIT)
2177 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2178 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2180 else if (object->type == TP_OBJECT_TYPE_IO)
2182 assert( object->u.io.completion_count );
2183 completion = object->u.io.completions[--object->u.io.completion_count];
2186 /* Leave critical section and do the actual callback. */
2187 object->num_associated_callbacks++;
2188 object->num_running_callbacks++;
2189 RtlLeaveCriticalSection( &pool->cs );
2190 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2192 /* Initialize threadpool instance struct. */
2193 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2194 instance.object = object;
2195 instance.threadid = GetCurrentThreadId();
2196 instance.associated = TRUE;
2197 instance.may_run_long = object->may_run_long;
2198 instance.cleanup.critical_section = NULL;
2199 instance.cleanup.mutex = NULL;
2200 instance.cleanup.semaphore = NULL;
2201 instance.cleanup.semaphore_count = 0;
2202 instance.cleanup.event = NULL;
2203 instance.cleanup.library = NULL;
2205 switch (object->type)
2207 case TP_OBJECT_TYPE_SIMPLE:
2209 TRACE( "executing simple callback %p(%p, %p)\n",
2210 object->u.simple.callback, callback_instance, object->userdata );
2211 object->u.simple.callback( callback_instance, object->userdata );
2212 TRACE( "callback %p returned\n", object->u.simple.callback );
2213 break;
2216 case TP_OBJECT_TYPE_WORK:
2218 TRACE( "executing work callback %p(%p, %p, %p)\n",
2219 object->u.work.callback, callback_instance, object->userdata, object );
2220 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2221 TRACE( "callback %p returned\n", object->u.work.callback );
2222 break;
2225 case TP_OBJECT_TYPE_TIMER:
2227 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2228 object->u.timer.callback, callback_instance, object->userdata, object );
2229 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2230 TRACE( "callback %p returned\n", object->u.timer.callback );
2231 break;
2234 case TP_OBJECT_TYPE_WAIT:
2236 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2237 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2238 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2239 TRACE( "callback %p returned\n", object->u.wait.callback );
2240 break;
2243 case TP_OBJECT_TYPE_IO:
2245 TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
2246 object->u.io.callback, callback_instance, object->userdata,
2247 completion.cvalue, &completion.iosb, (TP_IO *)object );
2248 object->u.io.callback( callback_instance, object->userdata,
2249 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2250 TRACE( "callback %p returned\n", object->u.io.callback );
2251 break;
2254 default:
2255 assert(0);
2256 break;
2259 /* Execute finalization callback. */
2260 if (object->finalization_callback)
2262 TRACE( "executing finalization callback %p(%p, %p)\n",
2263 object->finalization_callback, callback_instance, object->userdata );
2264 object->finalization_callback( callback_instance, object->userdata );
2265 TRACE( "callback %p returned\n", object->finalization_callback );
2268 /* Execute cleanup tasks. */
2269 if (instance.cleanup.critical_section)
2271 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2273 if (instance.cleanup.mutex)
2275 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2276 if (status != STATUS_SUCCESS) goto skip_cleanup;
2278 if (instance.cleanup.semaphore)
2280 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2281 if (status != STATUS_SUCCESS) goto skip_cleanup;
2283 if (instance.cleanup.event)
2285 status = NtSetEvent( instance.cleanup.event, NULL );
2286 if (status != STATUS_SUCCESS) goto skip_cleanup;
2288 if (instance.cleanup.library)
2290 LdrUnloadDll( instance.cleanup.library );
2293 skip_cleanup:
2294 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2295 RtlEnterCriticalSection( &pool->cs );
2297 /* Simple callbacks are automatically shutdown after execution. */
2298 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2300 tp_object_prepare_shutdown( object );
2301 object->shutdown = TRUE;
2304 object->num_running_callbacks--;
2305 if (object_is_finished( object, TRUE ))
2306 RtlWakeAllConditionVariable( &object->group_finished_event );
2308 if (instance.associated)
2310 object->num_associated_callbacks--;
2311 if (object_is_finished( object, FALSE ))
2312 RtlWakeAllConditionVariable( &object->finished_event );
2316 /***********************************************************************
2317 * threadpool_worker_proc (internal)
2319 static void CALLBACK threadpool_worker_proc( void *param )
2321 struct threadpool *pool = param;
2322 LARGE_INTEGER timeout;
2323 struct list *ptr;
2325 TRACE( "starting worker thread for pool %p\n", pool );
2327 RtlEnterCriticalSection( &pool->cs );
2328 for (;;)
2330 while ((ptr = threadpool_get_next_item( pool )))
2332 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2333 assert( object->num_pending_callbacks > 0 );
2335 /* If further pending callbacks are queued, move the work item to
2336 * the end of the pool list. Otherwise remove it from the pool. */
2337 list_remove( &object->pool_entry );
2338 if (object->num_pending_callbacks > 1)
2339 tp_object_prio_queue( object );
2341 tp_object_execute( object, FALSE );
2343 assert(pool->num_busy_workers);
2344 pool->num_busy_workers--;
2346 tp_object_release( object );
2349 /* Shutdown worker thread if requested. */
2350 if (pool->shutdown)
2351 break;
2353 /* Wait for new tasks or until the timeout expires. A thread only terminates
2354 * when no new tasks are available, and the number of threads can be
2355 * decreased without violating the min_workers limit. An exception is when
2356 * min_workers == 0, then objcount is used to detect if the last thread
2357 * can be terminated. */
2358 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2359 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2360 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2361 (!pool->min_workers && !pool->objcount)))
2363 break;
2366 pool->num_workers--;
2367 RtlLeaveCriticalSection( &pool->cs );
2369 TRACE( "terminating worker thread for pool %p\n", pool );
2370 tp_threadpool_release( pool );
2371 RtlExitUserThread( 0 );
2374 /***********************************************************************
2375 * TpAllocCleanupGroup (NTDLL.@)
2377 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2379 TRACE( "%p\n", out );
2381 return tp_group_alloc( (struct threadpool_group **)out );
2384 /***********************************************************************
2385 * TpAllocIoCompletion (NTDLL.@)
2387 NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
2388 void *userdata, TP_CALLBACK_ENVIRON *environment )
2390 struct threadpool_object *object;
2391 struct threadpool *pool;
2392 NTSTATUS status;
2394 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2396 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2397 return STATUS_NO_MEMORY;
2399 if ((status = tp_threadpool_lock( &pool, environment )))
2401 RtlFreeHeap( GetProcessHeap(), 0, object );
2402 return status;
2405 object->type = TP_OBJECT_TYPE_IO;
2406 object->u.io.callback = callback;
2407 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2409 tp_threadpool_unlock( pool );
2410 RtlFreeHeap( GetProcessHeap(), 0, object );
2411 return status;
2414 if ((status = tp_ioqueue_lock( object, file )))
2416 tp_threadpool_unlock( pool );
2417 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2418 RtlFreeHeap( GetProcessHeap(), 0, object );
2419 return status;
2422 tp_object_initialize( object, pool, userdata, environment );
2424 *out = (TP_IO *)object;
2425 return STATUS_SUCCESS;
2428 /***********************************************************************
2429 * TpAllocPool (NTDLL.@)
2431 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2433 TRACE( "%p %p\n", out, reserved );
2435 if (reserved)
2436 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2438 return tp_threadpool_alloc( (struct threadpool **)out );
2441 /***********************************************************************
2442 * TpAllocTimer (NTDLL.@)
2444 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2445 TP_CALLBACK_ENVIRON *environment )
2447 struct threadpool_object *object;
2448 struct threadpool *pool;
2449 NTSTATUS status;
2451 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2453 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2454 if (!object)
2455 return STATUS_NO_MEMORY;
2457 status = tp_threadpool_lock( &pool, environment );
2458 if (status)
2460 RtlFreeHeap( GetProcessHeap(), 0, object );
2461 return status;
2464 object->type = TP_OBJECT_TYPE_TIMER;
2465 object->u.timer.callback = callback;
2467 status = tp_timerqueue_lock( object );
2468 if (status)
2470 tp_threadpool_unlock( pool );
2471 RtlFreeHeap( GetProcessHeap(), 0, object );
2472 return status;
2475 tp_object_initialize( object, pool, userdata, environment );
2477 *out = (TP_TIMER *)object;
2478 return STATUS_SUCCESS;
2481 static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2482 TP_CALLBACK_ENVIRON *environment, DWORD flags )
2484 struct threadpool_object *object;
2485 struct threadpool *pool;
2486 NTSTATUS status;
2488 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2489 if (!object)
2490 return STATUS_NO_MEMORY;
2492 status = tp_threadpool_lock( &pool, environment );
2493 if (status)
2495 RtlFreeHeap( GetProcessHeap(), 0, object );
2496 return status;
2499 object->type = TP_OBJECT_TYPE_WAIT;
2500 object->u.wait.callback = callback;
2501 object->u.wait.flags = flags;
2503 status = tp_waitqueue_lock( object );
2504 if (status)
2506 tp_threadpool_unlock( pool );
2507 RtlFreeHeap( GetProcessHeap(), 0, object );
2508 return status;
2511 tp_object_initialize( object, pool, userdata, environment );
2513 *out = (TP_WAIT *)object;
2514 return STATUS_SUCCESS;
2517 /***********************************************************************
2518 * TpAllocWait (NTDLL.@)
2520 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2521 TP_CALLBACK_ENVIRON *environment )
2523 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2524 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2527 /***********************************************************************
2528 * TpAllocWork (NTDLL.@)
2530 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2531 TP_CALLBACK_ENVIRON *environment )
2533 struct threadpool_object *object;
2534 struct threadpool *pool;
2535 NTSTATUS status;
2537 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2539 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2540 if (!object)
2541 return STATUS_NO_MEMORY;
2543 status = tp_threadpool_lock( &pool, environment );
2544 if (status)
2546 RtlFreeHeap( GetProcessHeap(), 0, object );
2547 return status;
2550 object->type = TP_OBJECT_TYPE_WORK;
2551 object->u.work.callback = callback;
2552 tp_object_initialize( object, pool, userdata, environment );
2554 *out = (TP_WORK *)object;
2555 return STATUS_SUCCESS;
2558 /***********************************************************************
2559 * TpCancelAsyncIoOperation (NTDLL.@)
2561 void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
2563 struct threadpool_object *this = impl_from_TP_IO( io );
2565 TRACE( "%p\n", io );
2567 RtlEnterCriticalSection( &this->pool->cs );
2569 TRACE("pending_count %u.\n", this->u.io.pending_count);
2571 this->u.io.pending_count--;
2572 if (object_is_finished( this, TRUE ))
2573 RtlWakeAllConditionVariable( &this->group_finished_event );
2574 if (object_is_finished( this, FALSE ))
2575 RtlWakeAllConditionVariable( &this->finished_event );
2577 RtlLeaveCriticalSection( &this->pool->cs );
2580 /***********************************************************************
2581 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2583 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2585 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2587 TRACE( "%p %p\n", instance, crit );
2589 if (!this->cleanup.critical_section)
2590 this->cleanup.critical_section = crit;
2593 /***********************************************************************
2594 * TpCallbackMayRunLong (NTDLL.@)
2596 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2598 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2599 struct threadpool_object *object = this->object;
2600 struct threadpool *pool;
2601 NTSTATUS status = STATUS_SUCCESS;
2603 TRACE( "%p\n", instance );
2605 if (this->threadid != GetCurrentThreadId())
2607 ERR("called from wrong thread, ignoring\n");
2608 return STATUS_UNSUCCESSFUL; /* FIXME */
2611 if (this->may_run_long)
2612 return STATUS_SUCCESS;
2614 pool = object->pool;
2615 RtlEnterCriticalSection( &pool->cs );
2617 /* Start new worker threads if required. */
2618 if (pool->num_busy_workers >= pool->num_workers)
2620 if (pool->num_workers < pool->max_workers)
2622 status = tp_new_worker_thread( pool );
2624 else
2626 status = STATUS_TOO_MANY_THREADS;
2630 RtlLeaveCriticalSection( &pool->cs );
2631 this->may_run_long = TRUE;
2632 return status;
2635 /***********************************************************************
2636 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2638 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2640 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2642 TRACE( "%p %p\n", instance, mutex );
2644 if (!this->cleanup.mutex)
2645 this->cleanup.mutex = mutex;
2648 /***********************************************************************
2649 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2651 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2653 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2655 TRACE( "%p %p %u\n", instance, semaphore, count );
2657 if (!this->cleanup.semaphore)
2659 this->cleanup.semaphore = semaphore;
2660 this->cleanup.semaphore_count = count;
2664 /***********************************************************************
2665 * TpCallbackSetEventOnCompletion (NTDLL.@)
2667 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2669 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2671 TRACE( "%p %p\n", instance, event );
2673 if (!this->cleanup.event)
2674 this->cleanup.event = event;
2677 /***********************************************************************
2678 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2680 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2682 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2684 TRACE( "%p %p\n", instance, module );
2686 if (!this->cleanup.library)
2687 this->cleanup.library = module;
2690 /***********************************************************************
2691 * TpDisassociateCallback (NTDLL.@)
2693 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2695 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2696 struct threadpool_object *object = this->object;
2697 struct threadpool *pool;
2699 TRACE( "%p\n", instance );
2701 if (this->threadid != GetCurrentThreadId())
2703 ERR("called from wrong thread, ignoring\n");
2704 return;
2707 if (!this->associated)
2708 return;
2710 pool = object->pool;
2711 RtlEnterCriticalSection( &pool->cs );
2713 object->num_associated_callbacks--;
2714 if (object_is_finished( object, FALSE ))
2715 RtlWakeAllConditionVariable( &object->finished_event );
2717 RtlLeaveCriticalSection( &pool->cs );
2718 this->associated = FALSE;
2721 /***********************************************************************
2722 * TpIsTimerSet (NTDLL.@)
2724 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2726 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2728 TRACE( "%p\n", timer );
2730 return this->u.timer.timer_set;
2733 /***********************************************************************
2734 * TpPostWork (NTDLL.@)
2736 VOID WINAPI TpPostWork( TP_WORK *work )
2738 struct threadpool_object *this = impl_from_TP_WORK( work );
2740 TRACE( "%p\n", work );
2742 tp_object_submit( this, FALSE );
2745 /***********************************************************************
2746 * TpReleaseCleanupGroup (NTDLL.@)
2748 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2750 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2752 TRACE( "%p\n", group );
2754 tp_group_shutdown( this );
2755 tp_group_release( this );
2758 /***********************************************************************
2759 * TpReleaseCleanupGroupMembers (NTDLL.@)
2761 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2763 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2764 struct threadpool_object *object, *next;
2765 struct list members;
2767 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2769 RtlEnterCriticalSection( &this->cs );
2771 /* Unset group, increase references, and mark objects for shutdown */
2772 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2774 assert( object->group == this );
2775 assert( object->is_group_member );
2777 if (InterlockedIncrement( &object->refcount ) == 1)
2779 /* Object is basically already destroyed, but group reference
2780 * was not deleted yet. We can safely ignore this object. */
2781 InterlockedDecrement( &object->refcount );
2782 list_remove( &object->group_entry );
2783 object->is_group_member = FALSE;
2784 continue;
2787 object->is_group_member = FALSE;
2788 tp_object_prepare_shutdown( object );
2791 /* Move members to a new temporary list */
2792 list_init( &members );
2793 list_move_tail( &members, &this->members );
2795 RtlLeaveCriticalSection( &this->cs );
2797 /* Cancel pending callbacks if requested */
2798 if (cancel_pending)
2800 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2802 tp_object_cancel( object );
2806 /* Wait for remaining callbacks to finish */
2807 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2809 tp_object_wait( object, TRUE );
2811 if (!object->shutdown)
2813 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2814 if (cancel_pending && object->group_cancel_callback)
2816 TRACE( "executing group cancel callback %p(%p, %p)\n",
2817 object->group_cancel_callback, object->userdata, userdata );
2818 object->group_cancel_callback( object->userdata, userdata );
2819 TRACE( "callback %p returned\n", object->group_cancel_callback );
2822 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2823 tp_object_release( object );
2826 object->shutdown = TRUE;
2827 tp_object_release( object );
2831 /***********************************************************************
2832 * TpReleaseIoCompletion (NTDLL.@)
2834 void WINAPI TpReleaseIoCompletion( TP_IO *io )
2836 struct threadpool_object *this = impl_from_TP_IO( io );
2837 BOOL can_destroy;
2839 TRACE( "%p\n", io );
2841 RtlEnterCriticalSection( &this->pool->cs );
2842 this->u.io.shutting_down = TRUE;
2843 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
2844 RtlLeaveCriticalSection( &this->pool->cs );
2846 if (can_destroy)
2848 tp_object_prepare_shutdown( this );
2849 this->shutdown = TRUE;
2850 tp_object_release( this );
2854 /***********************************************************************
2855 * TpReleasePool (NTDLL.@)
2857 VOID WINAPI TpReleasePool( TP_POOL *pool )
2859 struct threadpool *this = impl_from_TP_POOL( pool );
2861 TRACE( "%p\n", pool );
2863 tp_threadpool_shutdown( this );
2864 tp_threadpool_release( this );
2867 /***********************************************************************
2868 * TpReleaseTimer (NTDLL.@)
2870 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2872 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2874 TRACE( "%p\n", timer );
2876 tp_object_prepare_shutdown( this );
2877 this->shutdown = TRUE;
2878 tp_object_release( this );
2881 /***********************************************************************
2882 * TpReleaseWait (NTDLL.@)
2884 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
2886 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2888 TRACE( "%p\n", wait );
2890 tp_object_prepare_shutdown( this );
2891 this->shutdown = TRUE;
2892 tp_object_release( this );
2895 /***********************************************************************
2896 * TpReleaseWork (NTDLL.@)
2898 VOID WINAPI TpReleaseWork( TP_WORK *work )
2900 struct threadpool_object *this = impl_from_TP_WORK( work );
2902 TRACE( "%p\n", work );
2904 tp_object_prepare_shutdown( this );
2905 this->shutdown = TRUE;
2906 tp_object_release( this );
2909 /***********************************************************************
2910 * TpSetPoolMaxThreads (NTDLL.@)
2912 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
2914 struct threadpool *this = impl_from_TP_POOL( pool );
2916 TRACE( "%p %u\n", pool, maximum );
2918 RtlEnterCriticalSection( &this->cs );
2919 this->max_workers = max( maximum, 1 );
2920 this->min_workers = min( this->min_workers, this->max_workers );
2921 RtlLeaveCriticalSection( &this->cs );
2924 /***********************************************************************
2925 * TpSetPoolMinThreads (NTDLL.@)
2927 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
2929 struct threadpool *this = impl_from_TP_POOL( pool );
2930 NTSTATUS status = STATUS_SUCCESS;
2932 TRACE( "%p %u\n", pool, minimum );
2934 RtlEnterCriticalSection( &this->cs );
2936 while (this->num_workers < minimum)
2938 status = tp_new_worker_thread( this );
2939 if (status != STATUS_SUCCESS)
2940 break;
2943 if (status == STATUS_SUCCESS)
2945 this->min_workers = minimum;
2946 this->max_workers = max( this->min_workers, this->max_workers );
2949 RtlLeaveCriticalSection( &this->cs );
2950 return !status;
2953 /***********************************************************************
2954 * TpSetTimer (NTDLL.@)
2956 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
2958 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2959 struct threadpool_object *other_timer;
2960 BOOL submit_timer = FALSE;
2961 ULONGLONG timestamp;
2963 TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
2965 RtlEnterCriticalSection( &timerqueue.cs );
2967 assert( this->u.timer.timer_initialized );
2968 this->u.timer.timer_set = timeout != NULL;
2970 /* Convert relative timeout to absolute timestamp and handle a timeout
2971 * of zero, which means that the timer is submitted immediately. */
2972 if (timeout)
2974 timestamp = timeout->QuadPart;
2975 if ((LONGLONG)timestamp < 0)
2977 LARGE_INTEGER now;
2978 NtQuerySystemTime( &now );
2979 timestamp = now.QuadPart - timestamp;
2981 else if (!timestamp)
2983 if (!period)
2984 timeout = NULL;
2985 else
2987 LARGE_INTEGER now;
2988 NtQuerySystemTime( &now );
2989 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
2991 submit_timer = TRUE;
2995 /* First remove existing timeout. */
2996 if (this->u.timer.timer_pending)
2998 list_remove( &this->u.timer.timer_entry );
2999 this->u.timer.timer_pending = FALSE;
3002 /* If the timer was enabled, then add it back to the queue. */
3003 if (timeout)
3005 this->u.timer.timeout = timestamp;
3006 this->u.timer.period = period;
3007 this->u.timer.window_length = window_length;
3009 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3010 struct threadpool_object, u.timer.timer_entry )
3012 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3013 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3014 break;
3016 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3018 /* Wake up the timer thread when the timeout has to be updated. */
3019 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3020 RtlWakeAllConditionVariable( &timerqueue.update_event );
3022 this->u.timer.timer_pending = TRUE;
3025 RtlLeaveCriticalSection( &timerqueue.cs );
3027 if (submit_timer)
3028 tp_object_submit( this, FALSE );
3031 /***********************************************************************
3032 * TpSetWait (NTDLL.@)
3034 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
3036 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3037 ULONGLONG timestamp = MAXLONGLONG;
3039 TRACE( "%p %p %p\n", wait, handle, timeout );
3041 RtlEnterCriticalSection( &waitqueue.cs );
3043 assert( this->u.wait.bucket );
3044 this->u.wait.handle = handle;
3046 if (handle || this->u.wait.wait_pending)
3048 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3049 list_remove( &this->u.wait.wait_entry );
3051 /* Convert relative timeout to absolute timestamp. */
3052 if (handle && timeout)
3054 timestamp = timeout->QuadPart;
3055 if ((LONGLONG)timestamp < 0)
3057 LARGE_INTEGER now;
3058 NtQuerySystemTime( &now );
3059 timestamp = now.QuadPart - timestamp;
3063 /* Add wait object back into one of the queues. */
3064 if (handle)
3066 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3067 this->u.wait.wait_pending = TRUE;
3068 this->u.wait.timeout = timestamp;
3070 else
3072 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3073 this->u.wait.wait_pending = FALSE;
3076 /* Wake up the wait queue thread. */
3077 NtSetEvent( bucket->update_event, NULL );
3080 RtlLeaveCriticalSection( &waitqueue.cs );
3083 /***********************************************************************
3084 * TpSimpleTryPost (NTDLL.@)
3086 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
3087 TP_CALLBACK_ENVIRON *environment )
3089 struct threadpool_object *object;
3090 struct threadpool *pool;
3091 NTSTATUS status;
3093 TRACE( "%p %p %p\n", callback, userdata, environment );
3095 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3096 if (!object)
3097 return STATUS_NO_MEMORY;
3099 status = tp_threadpool_lock( &pool, environment );
3100 if (status)
3102 RtlFreeHeap( GetProcessHeap(), 0, object );
3103 return status;
3106 object->type = TP_OBJECT_TYPE_SIMPLE;
3107 object->u.simple.callback = callback;
3108 tp_object_initialize( object, pool, userdata, environment );
3110 return STATUS_SUCCESS;
3113 /***********************************************************************
3114 * TpStartAsyncIoOperation (NTDLL.@)
3116 void WINAPI TpStartAsyncIoOperation( TP_IO *io )
3118 struct threadpool_object *this = impl_from_TP_IO( io );
3120 TRACE( "%p\n", io );
3122 RtlEnterCriticalSection( &this->pool->cs );
3124 this->u.io.pending_count++;
3126 RtlLeaveCriticalSection( &this->pool->cs );
3129 /***********************************************************************
3130 * TpWaitForIoCompletion (NTDLL.@)
3132 void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3134 struct threadpool_object *this = impl_from_TP_IO( io );
3136 TRACE( "%p %d\n", io, cancel_pending );
3138 if (cancel_pending)
3139 tp_object_cancel( this );
3140 tp_object_wait( this, FALSE );
3143 /***********************************************************************
3144 * TpWaitForTimer (NTDLL.@)
3146 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
3148 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3150 TRACE( "%p %d\n", timer, cancel_pending );
3152 if (cancel_pending)
3153 tp_object_cancel( this );
3154 tp_object_wait( this, FALSE );
3157 /***********************************************************************
3158 * TpWaitForWait (NTDLL.@)
3160 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
3162 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3164 TRACE( "%p %d\n", wait, cancel_pending );
3166 if (cancel_pending)
3167 tp_object_cancel( this );
3168 tp_object_wait( this, FALSE );
3171 /***********************************************************************
3172 * TpWaitForWork (NTDLL.@)
3174 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
3176 struct threadpool_object *this = impl_from_TP_WORK( work );
3178 TRACE( "%p %u\n", work, cancel_pending );
3180 if (cancel_pending)
3181 tp_object_cancel( this );
3182 tp_object_wait( this, FALSE );
3185 /***********************************************************************
3186 * TpSetPoolStackInformation (NTDLL.@)
3188 NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3190 struct threadpool *this = impl_from_TP_POOL( pool );
3192 TRACE( "%p %p\n", pool, stack_info );
3194 if (!stack_info)
3195 return STATUS_INVALID_PARAMETER;
3197 RtlEnterCriticalSection( &this->cs );
3198 this->stack_info = *stack_info;
3199 RtlLeaveCriticalSection( &this->cs );
3201 return STATUS_SUCCESS;
3204 /***********************************************************************
3205 * TpQueryPoolStackInformation (NTDLL.@)
3207 NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3209 struct threadpool *this = impl_from_TP_POOL( pool );
3211 TRACE( "%p %p\n", pool, stack_info );
3213 if (!stack_info)
3214 return STATUS_INVALID_PARAMETER;
3216 RtlEnterCriticalSection( &this->cs );
3217 *stack_info = this->stack_info;
3218 RtlLeaveCriticalSection( &this->cs );
3220 return STATUS_SUCCESS;
3223 static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result )
3225 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3226 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3229 /***********************************************************************
3230 * RtlRegisterWait (NTDLL.@)
3232 * Registers a wait for a handle to become signaled.
3234 * PARAMS
3235 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3236 * Object [I] Object to wait to become signaled.
3237 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3238 * Context [I] Context to pass to the callback function when it is executed.
3239 * Milliseconds [I] Number of milliseconds to wait before timing out.
3240 * Flags [I] Flags. See notes.
3242 * RETURNS
3243 * Success: STATUS_SUCCESS.
3244 * Failure: Any NTSTATUS code.
3246 * NOTES
3247 * Flags can be one or more of the following:
3248 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3249 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3250 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3251 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3252 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3254 NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback,
3255 void *context, ULONG milliseconds, ULONG flags )
3257 struct threadpool_object *object;
3258 TP_CALLBACK_ENVIRON environment;
3259 LARGE_INTEGER timeout;
3260 NTSTATUS status;
3261 TP_WAIT *wait;
3263 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %u, flags %x\n",
3264 out, handle, callback, context, milliseconds, flags );
3266 memset( &environment, 0, sizeof(environment) );
3267 environment.Version = 1;
3268 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3269 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3271 flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD);
3272 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3273 return status;
3275 object = impl_from_TP_WAIT(wait);
3276 object->u.wait.rtl_callback = callback;
3278 RtlEnterCriticalSection( &waitqueue.cs );
3279 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3281 *out = object;
3282 RtlLeaveCriticalSection( &waitqueue.cs );
3284 return STATUS_SUCCESS;
3287 /***********************************************************************
3288 * RtlDeregisterWaitEx (NTDLL.@)
3290 * Cancels a wait operation and frees the resources associated with calling
3291 * RtlRegisterWait().
3293 * PARAMS
3294 * WaitObject [I] Handle to the wait object to free.
3296 * RETURNS
3297 * Success: STATUS_SUCCESS.
3298 * Failure: Any NTSTATUS code.
3300 NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event )
3302 struct threadpool_object *object = handle;
3303 NTSTATUS status;
3305 TRACE( "handle %p, event %p\n", handle, event );
3307 if (!object) return STATUS_INVALID_HANDLE;
3309 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3311 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3312 else
3314 assert( object->completed_event == NULL );
3315 object->completed_event = event;
3318 RtlEnterCriticalSection( &object->pool->cs );
3319 if (object->num_pending_callbacks + object->num_running_callbacks
3320 + object->num_associated_callbacks) status = STATUS_PENDING;
3321 else status = STATUS_SUCCESS;
3322 RtlLeaveCriticalSection( &object->pool->cs );
3324 TpReleaseWait( (TP_WAIT *)object );
3325 return status;
3328 /***********************************************************************
3329 * RtlDeregisterWait (NTDLL.@)
3331 * Cancels a wait operation and frees the resources associated with calling
3332 * RtlRegisterWait().
3334 * PARAMS
3335 * WaitObject [I] Handle to the wait object to free.
3337 * RETURNS
3338 * Success: STATUS_SUCCESS.
3339 * Failure: Any NTSTATUS code.
3341 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
3343 return RtlDeregisterWaitEx(WaitHandle, NULL);