msvcr100: Use Context to store critical_section owner.
[wine.git] / dlls / ntdll / threadpool.c
blob4f22114a55ef7a98381235710290d56a4570c1ad
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include <assert.h>
23 #include <stdarg.h>
24 #include <limits.h>
26 #include "ntstatus.h"
27 #define WIN32_NO_STATUS
28 #include "winternl.h"
30 #include "wine/debug.h"
31 #include "wine/list.h"
33 #include "ntdll_misc.h"
35 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
38 * Old thread pooling API
41 struct rtl_work_item
43 PRTL_WORK_ITEM_ROUTINE function;
44 PVOID context;
47 #define EXPIRE_NEVER (~(ULONGLONG)0)
48 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
50 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
52 static struct
54 HANDLE compl_port;
55 RTL_CRITICAL_SECTION threadpool_compl_cs;
57 old_threadpool =
59 NULL, /* compl_port */
60 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
63 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
65 0, 0, &old_threadpool.threadpool_compl_cs,
66 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
67 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
70 struct timer_queue;
71 struct queue_timer
73 struct timer_queue *q;
74 struct list entry;
75 ULONG runcount; /* number of callbacks pending execution */
76 RTL_WAITORTIMERCALLBACKFUNC callback;
77 PVOID param;
78 DWORD period;
79 ULONG flags;
80 ULONGLONG expire;
81 BOOL destroy; /* timer should be deleted; once set, never unset */
82 HANDLE event; /* removal event */
85 struct timer_queue
87 DWORD magic;
88 RTL_CRITICAL_SECTION cs;
89 struct list timers; /* sorted by expiration time */
90 BOOL quit; /* queue should be deleted; once set, never unset */
91 HANDLE event;
92 HANDLE thread;
96 * Object-oriented thread pooling API
99 #define THREADPOOL_WORKER_TIMEOUT 5000
100 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
102 /* internal threadpool representation */
103 struct threadpool
105 LONG refcount;
106 LONG objcount;
107 BOOL shutdown;
108 CRITICAL_SECTION cs;
109 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
110 struct list pools[3];
111 RTL_CONDITION_VARIABLE update_event;
112 /* information about worker threads, locked via .cs */
113 int max_workers;
114 int min_workers;
115 int num_workers;
116 int num_busy_workers;
117 HANDLE compl_port;
118 TP_POOL_STACK_INFORMATION stack_info;
121 enum threadpool_objtype
123 TP_OBJECT_TYPE_SIMPLE,
124 TP_OBJECT_TYPE_WORK,
125 TP_OBJECT_TYPE_TIMER,
126 TP_OBJECT_TYPE_WAIT,
127 TP_OBJECT_TYPE_IO,
130 struct io_completion
132 IO_STATUS_BLOCK iosb;
133 ULONG_PTR cvalue;
136 /* internal threadpool object representation */
137 struct threadpool_object
139 void *win32_callback; /* leave space for kernelbase to store win32 callback */
140 LONG refcount;
141 BOOL shutdown;
142 /* read-only information */
143 enum threadpool_objtype type;
144 struct threadpool *pool;
145 struct threadpool_group *group;
146 PVOID userdata;
147 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
148 PTP_SIMPLE_CALLBACK finalization_callback;
149 BOOL may_run_long;
150 HMODULE race_dll;
151 TP_CALLBACK_PRIORITY priority;
152 /* information about the group, locked via .group->cs */
153 struct list group_entry;
154 BOOL is_group_member;
155 /* information about the pool, locked via .pool->cs */
156 struct list pool_entry;
157 RTL_CONDITION_VARIABLE finished_event;
158 RTL_CONDITION_VARIABLE group_finished_event;
159 HANDLE completed_event;
160 LONG num_pending_callbacks;
161 LONG num_running_callbacks;
162 LONG num_associated_callbacks;
163 /* arguments for callback */
164 union
166 struct
168 PTP_SIMPLE_CALLBACK callback;
169 } simple;
170 struct
172 PTP_WORK_CALLBACK callback;
173 } work;
174 struct
176 PTP_TIMER_CALLBACK callback;
177 /* information about the timer, locked via timerqueue.cs */
178 BOOL timer_initialized;
179 BOOL timer_pending;
180 struct list timer_entry;
181 BOOL timer_set;
182 ULONGLONG timeout;
183 LONG period;
184 LONG window_length;
185 } timer;
186 struct
188 PTP_WAIT_CALLBACK callback;
189 LONG signaled;
190 /* information about the wait object, locked via waitqueue.cs */
191 struct waitqueue_bucket *bucket;
192 BOOL wait_pending;
193 struct list wait_entry;
194 ULONGLONG timeout;
195 HANDLE handle;
196 DWORD flags;
197 RTL_WAITORTIMERCALLBACKFUNC rtl_callback;
198 } wait;
199 struct
201 PTP_IO_CALLBACK callback;
202 /* locked via .pool->cs */
203 unsigned int pending_count, skipped_count, completion_count, completion_max;
204 BOOL shutting_down;
205 struct io_completion *completions;
206 } io;
207 } u;
210 /* internal threadpool instance representation */
211 struct threadpool_instance
213 struct threadpool_object *object;
214 DWORD threadid;
215 BOOL associated;
216 BOOL may_run_long;
217 struct
219 CRITICAL_SECTION *critical_section;
220 HANDLE mutex;
221 HANDLE semaphore;
222 LONG semaphore_count;
223 HANDLE event;
224 HMODULE library;
225 } cleanup;
228 /* internal threadpool group representation */
229 struct threadpool_group
231 LONG refcount;
232 BOOL shutdown;
233 CRITICAL_SECTION cs;
234 /* list of group members, locked via .cs */
235 struct list members;
238 /* global timerqueue object */
239 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
241 static struct
243 CRITICAL_SECTION cs;
244 LONG objcount;
245 BOOL thread_running;
246 struct list pending_timers;
247 RTL_CONDITION_VARIABLE update_event;
249 timerqueue =
251 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
252 0, /* objcount */
253 FALSE, /* thread_running */
254 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
255 RTL_CONDITION_VARIABLE_INIT /* update_event */
258 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
260 0, 0, &timerqueue.cs,
261 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
262 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
265 /* global waitqueue object */
266 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
268 static struct
270 CRITICAL_SECTION cs;
271 LONG num_buckets;
272 struct list buckets;
274 waitqueue =
276 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
277 0, /* num_buckets */
278 LIST_INIT( waitqueue.buckets ) /* buckets */
281 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
283 0, 0, &waitqueue.cs,
284 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
285 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
288 struct waitqueue_bucket
290 struct list bucket_entry;
291 LONG objcount;
292 struct list reserved;
293 struct list waiting;
294 HANDLE update_event;
295 BOOL alertable;
298 /* global I/O completion queue object */
299 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
301 static struct
303 CRITICAL_SECTION cs;
304 LONG objcount;
305 BOOL thread_running;
306 HANDLE port;
307 RTL_CONDITION_VARIABLE update_event;
309 ioqueue =
311 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
314 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
316 0, 0, &ioqueue.cs,
317 { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
318 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
321 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
323 return (struct threadpool *)pool;
326 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
328 struct threadpool_object *object = (struct threadpool_object *)work;
329 assert( object->type == TP_OBJECT_TYPE_WORK );
330 return object;
333 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
335 struct threadpool_object *object = (struct threadpool_object *)timer;
336 assert( object->type == TP_OBJECT_TYPE_TIMER );
337 return object;
340 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
342 struct threadpool_object *object = (struct threadpool_object *)wait;
343 assert( object->type == TP_OBJECT_TYPE_WAIT );
344 return object;
347 static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
349 struct threadpool_object *object = (struct threadpool_object *)io;
350 assert( object->type == TP_OBJECT_TYPE_IO );
351 return object;
354 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
356 return (struct threadpool_group *)group;
359 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
361 return (struct threadpool_instance *)instance;
364 static void CALLBACK threadpool_worker_proc( void *param );
365 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
366 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
367 static void tp_object_prepare_shutdown( struct threadpool_object *object );
368 static BOOL tp_object_release( struct threadpool_object *object );
369 static struct threadpool *default_threadpool = NULL;
371 static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
373 unsigned int new_capacity, max_capacity;
374 void *new_elements;
376 if (count <= *capacity)
377 return TRUE;
379 max_capacity = ~(SIZE_T)0 / size;
380 if (count > max_capacity)
381 return FALSE;
383 new_capacity = max(4, *capacity);
384 while (new_capacity < count && new_capacity <= max_capacity / 2)
385 new_capacity *= 2;
386 if (new_capacity < count)
387 new_capacity = max_capacity;
389 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
390 return FALSE;
392 *elements = new_elements;
393 *capacity = new_capacity;
395 return TRUE;
398 static void set_thread_name(const WCHAR *name)
400 THREAD_NAME_INFORMATION info;
402 RtlInitUnicodeString(&info.ThreadName, name);
403 NtSetInformationThread(GetCurrentThread(), ThreadNameInformation, &info, sizeof(info));
406 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
408 struct rtl_work_item *item = userdata;
410 TRACE("executing %p(%p)\n", item->function, item->context);
411 item->function( item->context );
413 RtlFreeHeap( GetProcessHeap(), 0, item );
416 /***********************************************************************
417 * RtlQueueWorkItem (NTDLL.@)
419 * Queues a work item into a thread in the thread pool.
421 * PARAMS
422 * function [I] Work function to execute.
423 * context [I] Context to pass to the work function when it is executed.
424 * flags [I] Flags. See notes.
426 * RETURNS
427 * Success: STATUS_SUCCESS.
428 * Failure: Any NTSTATUS code.
430 * NOTES
431 * Flags can be one or more of the following:
432 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
433 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
434 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
435 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
436 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
438 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
440 TP_CALLBACK_ENVIRON environment;
441 struct rtl_work_item *item;
442 NTSTATUS status;
444 TRACE( "%p %p %lu\n", function, context, flags );
446 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
447 if (!item)
448 return STATUS_NO_MEMORY;
450 memset( &environment, 0, sizeof(environment) );
451 environment.Version = 1;
452 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
453 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
455 item->function = function;
456 item->context = context;
458 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
459 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
460 return status;
463 /***********************************************************************
464 * iocp_poller - get completion events and run callbacks
466 static DWORD CALLBACK iocp_poller(LPVOID Arg)
468 HANDLE cport = Arg;
470 while( TRUE )
472 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
473 LPVOID overlapped;
474 IO_STATUS_BLOCK iosb;
475 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
476 if (res)
478 ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
480 else
482 DWORD transferred = 0;
483 DWORD err = 0;
485 if (iosb.Status == STATUS_SUCCESS)
486 transferred = iosb.Information;
487 else
488 err = RtlNtStatusToDosError(iosb.Status);
490 callback( err, transferred, overlapped );
493 return 0;
496 /***********************************************************************
497 * RtlSetIoCompletionCallback (NTDLL.@)
499 * Binds a handle to a thread pool's completion port, and possibly
500 * starts a non-I/O thread to monitor this port and call functions back.
502 * PARAMS
503 * FileHandle [I] Handle to bind to a completion port.
504 * Function [I] Callback function to call on I/O completions.
505 * Flags [I] Not used.
507 * RETURNS
508 * Success: STATUS_SUCCESS.
509 * Failure: Any NTSTATUS code.
512 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
514 IO_STATUS_BLOCK iosb;
515 FILE_COMPLETION_INFORMATION info;
517 if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
519 if (!old_threadpool.compl_port)
521 NTSTATUS res = STATUS_SUCCESS;
523 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
524 if (!old_threadpool.compl_port)
526 HANDLE cport;
528 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
529 if (!res)
531 /* FIXME native can start additional threads in case of e.g. hung callback function. */
532 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
533 if (!res)
534 old_threadpool.compl_port = cport;
535 else
536 NtClose( cport );
539 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
540 if (res) return res;
543 info.CompletionPort = old_threadpool.compl_port;
544 info.CompletionKey = (ULONG_PTR)Function;
546 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
549 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
551 if (timeout == INFINITE) return NULL;
552 pTime->QuadPart = (ULONGLONG)timeout * -10000;
553 return pTime;
557 /************************** Timer Queue Impl **************************/
559 static void queue_remove_timer(struct queue_timer *t)
561 /* We MUST hold the queue cs while calling this function. This ensures
562 that we cannot queue another callback for this timer. The runcount
563 being zero makes sure we don't have any already queued. */
564 struct timer_queue *q = t->q;
566 assert(t->runcount == 0);
567 assert(t->destroy);
569 list_remove(&t->entry);
570 if (t->event)
571 NtSetEvent(t->event, NULL);
572 RtlFreeHeap(GetProcessHeap(), 0, t);
574 if (q->quit && list_empty(&q->timers))
575 NtSetEvent(q->event, NULL);
578 static void timer_cleanup_callback(struct queue_timer *t)
580 struct timer_queue *q = t->q;
581 RtlEnterCriticalSection(&q->cs);
583 assert(0 < t->runcount);
584 --t->runcount;
586 if (t->destroy && t->runcount == 0)
587 queue_remove_timer(t);
589 RtlLeaveCriticalSection(&q->cs);
592 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
594 struct queue_timer *t = p;
595 t->callback(t->param, TRUE);
596 timer_cleanup_callback(t);
597 return 0;
600 static inline ULONGLONG queue_current_time(void)
602 LARGE_INTEGER now, freq;
603 NtQueryPerformanceCounter(&now, &freq);
604 return now.QuadPart * 1000 / freq.QuadPart;
607 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
608 BOOL set_event)
610 /* We MUST hold the queue cs while calling this function. */
611 struct timer_queue *q = t->q;
612 struct list *ptr = &q->timers;
614 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
616 if (time != EXPIRE_NEVER)
617 LIST_FOR_EACH(ptr, &q->timers)
619 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
620 if (time < cur->expire)
621 break;
623 list_add_before(ptr, &t->entry);
625 t->expire = time;
627 /* If we insert at the head of the list, we need to expire sooner
628 than expected. */
629 if (set_event && &t->entry == list_head(&q->timers))
630 NtSetEvent(q->event, NULL);
633 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
634 BOOL set_event)
636 /* We MUST hold the queue cs while calling this function. */
637 list_remove(&t->entry);
638 queue_add_timer(t, time, set_event);
641 static void queue_timer_expire(struct timer_queue *q)
643 struct queue_timer *t = NULL;
645 RtlEnterCriticalSection(&q->cs);
646 if (list_head(&q->timers))
648 ULONGLONG now, next;
649 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
650 if (!t->destroy && t->expire <= ((now = queue_current_time())))
652 ++t->runcount;
653 if (t->period)
655 next = t->expire + t->period;
656 /* avoid trigger cascade if overloaded / hibernated */
657 if (next < now)
658 next = now + t->period;
660 else
661 next = EXPIRE_NEVER;
662 queue_move_timer(t, next, FALSE);
664 else
665 t = NULL;
667 RtlLeaveCriticalSection(&q->cs);
669 if (t)
671 if (t->flags & WT_EXECUTEINTIMERTHREAD)
672 timer_callback_wrapper(t);
673 else
675 ULONG flags
676 = (t->flags
677 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
678 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
679 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
680 if (status != STATUS_SUCCESS)
681 timer_cleanup_callback(t);
686 static ULONG queue_get_timeout(struct timer_queue *q)
688 struct queue_timer *t;
689 ULONG timeout = INFINITE;
691 RtlEnterCriticalSection(&q->cs);
692 if (list_head(&q->timers))
694 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
695 assert(!t->destroy || t->expire == EXPIRE_NEVER);
697 if (t->expire != EXPIRE_NEVER)
699 ULONGLONG time = queue_current_time();
700 timeout = t->expire < time ? 0 : t->expire - time;
703 RtlLeaveCriticalSection(&q->cs);
705 return timeout;
708 static void WINAPI timer_queue_thread_proc(LPVOID p)
710 struct timer_queue *q = p;
711 ULONG timeout_ms;
713 set_thread_name(L"wine_threadpool_timer_queue");
714 timeout_ms = INFINITE;
715 for (;;)
717 LARGE_INTEGER timeout;
718 NTSTATUS status;
719 BOOL done = FALSE;
721 status = NtWaitForSingleObject(
722 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
724 if (status == STATUS_WAIT_0)
726 /* There are two possible ways to trigger the event. Either
727 we are quitting and the last timer got removed, or a new
728 timer got put at the head of the list so we need to adjust
729 our timeout. */
730 RtlEnterCriticalSection(&q->cs);
731 if (q->quit && list_empty(&q->timers))
732 done = TRUE;
733 RtlLeaveCriticalSection(&q->cs);
735 else if (status == STATUS_TIMEOUT)
736 queue_timer_expire(q);
738 if (done)
739 break;
741 timeout_ms = queue_get_timeout(q);
744 NtClose(q->event);
745 RtlDeleteCriticalSection(&q->cs);
746 q->magic = 0;
747 RtlFreeHeap(GetProcessHeap(), 0, q);
748 RtlExitUserThread( 0 );
751 static void queue_destroy_timer(struct queue_timer *t)
753 /* We MUST hold the queue cs while calling this function. */
754 t->destroy = TRUE;
755 if (t->runcount == 0)
756 /* Ensure a timer is promptly removed. If callbacks are pending,
757 it will be removed after the last one finishes by the callback
758 cleanup wrapper. */
759 queue_remove_timer(t);
760 else
761 /* Make sure no destroyed timer masks an active timer at the head
762 of the sorted list. */
763 queue_move_timer(t, EXPIRE_NEVER, FALSE);
766 /***********************************************************************
767 * RtlCreateTimerQueue (NTDLL.@)
769 * Creates a timer queue object and returns a handle to it.
771 * PARAMS
772 * NewTimerQueue [O] The newly created queue.
774 * RETURNS
775 * Success: STATUS_SUCCESS.
776 * Failure: Any NTSTATUS code.
778 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
780 NTSTATUS status;
781 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
782 if (!q)
783 return STATUS_NO_MEMORY;
785 RtlInitializeCriticalSection(&q->cs);
786 list_init(&q->timers);
787 q->quit = FALSE;
788 q->magic = TIMER_QUEUE_MAGIC;
789 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
790 if (status != STATUS_SUCCESS)
792 RtlFreeHeap(GetProcessHeap(), 0, q);
793 return status;
795 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
796 timer_queue_thread_proc, q, &q->thread, NULL);
797 if (status != STATUS_SUCCESS)
799 NtClose(q->event);
800 RtlFreeHeap(GetProcessHeap(), 0, q);
801 return status;
804 *NewTimerQueue = q;
805 return STATUS_SUCCESS;
808 /***********************************************************************
809 * RtlDeleteTimerQueueEx (NTDLL.@)
811 * Deletes a timer queue object.
813 * PARAMS
814 * TimerQueue [I] The timer queue to destroy.
815 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
816 * wait until all timers are finished firing before
817 * returning. Otherwise, return immediately and set the
818 * event when all timers are done.
820 * RETURNS
821 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
822 * Failure: Any NTSTATUS code.
824 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
826 struct timer_queue *q = TimerQueue;
827 struct queue_timer *t, *temp;
828 HANDLE thread;
829 NTSTATUS status;
831 if (!q || q->magic != TIMER_QUEUE_MAGIC)
832 return STATUS_INVALID_HANDLE;
834 thread = q->thread;
836 RtlEnterCriticalSection(&q->cs);
837 q->quit = TRUE;
838 if (list_head(&q->timers))
839 /* When the last timer is removed, it will signal the timer thread to
840 exit... */
841 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
842 queue_destroy_timer(t);
843 else
844 /* However if we have none, we must do it ourselves. */
845 NtSetEvent(q->event, NULL);
846 RtlLeaveCriticalSection(&q->cs);
848 if (CompletionEvent == INVALID_HANDLE_VALUE)
850 NtWaitForSingleObject(thread, FALSE, NULL);
851 status = STATUS_SUCCESS;
853 else
855 if (CompletionEvent)
857 FIXME("asynchronous return on completion event unimplemented\n");
858 NtWaitForSingleObject(thread, FALSE, NULL);
859 NtSetEvent(CompletionEvent, NULL);
861 status = STATUS_PENDING;
864 NtClose(thread);
865 return status;
868 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
870 static struct timer_queue *default_timer_queue;
872 if (TimerQueue)
873 return TimerQueue;
874 else
876 if (!default_timer_queue)
878 HANDLE q;
879 NTSTATUS status = RtlCreateTimerQueue(&q);
880 if (status == STATUS_SUCCESS)
882 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
883 if (p)
884 /* Got beat to the punch. */
885 RtlDeleteTimerQueueEx(q, NULL);
888 return default_timer_queue;
892 /***********************************************************************
893 * RtlCreateTimer (NTDLL.@)
895 * Creates a new timer associated with the given queue.
897 * PARAMS
898 * TimerQueue [I] The queue to hold the timer.
899 * NewTimer [O] The newly created timer.
900 * Callback [I] The callback to fire.
901 * Parameter [I] The argument for the callback.
902 * DueTime [I] The delay, in milliseconds, before first firing the
903 * timer.
904 * Period [I] The period, in milliseconds, at which to fire the timer
905 * after the first callback. If zero, the timer will only
906 * fire once. It still needs to be deleted with
907 * RtlDeleteTimer.
908 * Flags [I] Flags controlling the execution of the callback. In
909 * addition to the WT_* thread pool flags (see
910 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
911 * WT_EXECUTEONLYONCE are supported.
913 * RETURNS
914 * Success: STATUS_SUCCESS.
915 * Failure: Any NTSTATUS code.
917 NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer,
918 RTL_WAITORTIMERCALLBACKFUNC Callback,
919 PVOID Parameter, DWORD DueTime, DWORD Period,
920 ULONG Flags)
922 NTSTATUS status;
923 struct queue_timer *t;
924 struct timer_queue *q = get_timer_queue(TimerQueue);
926 if (!q) return STATUS_NO_MEMORY;
927 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
929 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
930 if (!t)
931 return STATUS_NO_MEMORY;
933 t->q = q;
934 t->runcount = 0;
935 t->callback = Callback;
936 t->param = Parameter;
937 t->period = Period;
938 t->flags = Flags;
939 t->destroy = FALSE;
940 t->event = NULL;
942 status = STATUS_SUCCESS;
943 RtlEnterCriticalSection(&q->cs);
944 if (q->quit)
945 status = STATUS_INVALID_HANDLE;
946 else
947 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
948 RtlLeaveCriticalSection(&q->cs);
950 if (status == STATUS_SUCCESS)
951 *NewTimer = t;
952 else
953 RtlFreeHeap(GetProcessHeap(), 0, t);
955 return status;
958 /***********************************************************************
959 * RtlUpdateTimer (NTDLL.@)
961 * Changes the time at which a timer expires.
963 * PARAMS
964 * TimerQueue [I] The queue that holds the timer.
965 * Timer [I] The timer to update.
966 * DueTime [I] The delay, in milliseconds, before next firing the timer.
967 * Period [I] The period, in milliseconds, at which to fire the timer
968 * after the first callback. If zero, the timer will not
969 * refire once. It still needs to be deleted with
970 * RtlDeleteTimer.
972 * RETURNS
973 * Success: STATUS_SUCCESS.
974 * Failure: Any NTSTATUS code.
976 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
977 DWORD DueTime, DWORD Period)
979 struct queue_timer *t = Timer;
980 struct timer_queue *q = t->q;
982 RtlEnterCriticalSection(&q->cs);
983 /* Can't change a timer if it was once-only or destroyed. */
984 if (t->expire != EXPIRE_NEVER)
986 t->period = Period;
987 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
989 RtlLeaveCriticalSection(&q->cs);
991 return STATUS_SUCCESS;
994 /***********************************************************************
995 * RtlDeleteTimer (NTDLL.@)
997 * Cancels a timer-queue timer.
999 * PARAMS
1000 * TimerQueue [I] The queue that holds the timer.
1001 * Timer [I] The timer to update.
1002 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1003 * wait until the timer is finished firing all pending
1004 * callbacks before returning. Otherwise, return
1005 * immediately and set the timer is done.
1007 * RETURNS
1008 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1009 or if the completion event is NULL.
1010 * Failure: Any NTSTATUS code.
1012 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1013 HANDLE CompletionEvent)
1015 struct queue_timer *t = Timer;
1016 struct timer_queue *q;
1017 NTSTATUS status = STATUS_PENDING;
1018 HANDLE event = NULL;
1020 if (!Timer)
1021 return STATUS_INVALID_PARAMETER_1;
1022 q = t->q;
1023 if (CompletionEvent == INVALID_HANDLE_VALUE)
1025 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1026 if (status == STATUS_SUCCESS)
1027 status = STATUS_PENDING;
1029 else if (CompletionEvent)
1030 event = CompletionEvent;
1032 RtlEnterCriticalSection(&q->cs);
1033 t->event = event;
1034 if (t->runcount == 0 && event)
1035 status = STATUS_SUCCESS;
1036 queue_destroy_timer(t);
1037 RtlLeaveCriticalSection(&q->cs);
1039 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1041 if (status == STATUS_PENDING)
1043 NtWaitForSingleObject(event, FALSE, NULL);
1044 status = STATUS_SUCCESS;
1046 NtClose(event);
1049 return status;
1052 /***********************************************************************
1053 * timerqueue_thread_proc (internal)
1055 static void CALLBACK timerqueue_thread_proc( void *param )
1057 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1058 struct threadpool_object *other_timer;
1059 LARGE_INTEGER now, timeout;
1060 struct list *ptr;
1062 TRACE( "starting timer queue thread\n" );
1063 set_thread_name(L"wine_threadpool_timerqueue");
1065 RtlEnterCriticalSection( &timerqueue.cs );
1066 for (;;)
1068 NtQuerySystemTime( &now );
1070 /* Check for expired timers. */
1071 while ((ptr = list_head( &timerqueue.pending_timers )))
1073 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1074 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1075 assert( timer->u.timer.timer_pending );
1076 if (timer->u.timer.timeout > now.QuadPart)
1077 break;
1079 /* Queue a new callback in one of the worker threads. */
1080 list_remove( &timer->u.timer.timer_entry );
1081 timer->u.timer.timer_pending = FALSE;
1082 tp_object_submit( timer, FALSE );
1084 /* Insert the timer back into the queue, except it's marked for shutdown. */
1085 if (timer->u.timer.period && !timer->shutdown)
1087 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1088 if (timer->u.timer.timeout <= now.QuadPart)
1089 timer->u.timer.timeout = now.QuadPart + 1;
1091 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1092 struct threadpool_object, u.timer.timer_entry )
1094 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1095 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1096 break;
1098 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1099 timer->u.timer.timer_pending = TRUE;
1103 timeout_lower = timeout_upper = MAXLONGLONG;
1105 /* Determine next timeout and use the window length to optimize wakeup times. */
1106 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1107 struct threadpool_object, u.timer.timer_entry )
1109 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1110 if (other_timer->u.timer.timeout >= timeout_upper)
1111 break;
1113 timeout_lower = other_timer->u.timer.timeout;
1114 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1115 if (new_timeout < timeout_upper)
1116 timeout_upper = new_timeout;
1119 /* Wait for timer update events or until the next timer expires. */
1120 if (timerqueue.objcount)
1122 timeout.QuadPart = timeout_lower;
1123 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1124 continue;
1127 /* All timers have been destroyed, if no new timers are created
1128 * within some amount of time, then we can shutdown this thread. */
1129 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1130 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1131 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1133 break;
1137 timerqueue.thread_running = FALSE;
1138 RtlLeaveCriticalSection( &timerqueue.cs );
1140 TRACE( "terminating timer queue thread\n" );
1141 RtlExitUserThread( 0 );
1144 /***********************************************************************
1145 * tp_new_worker_thread (internal)
1147 * Create and account a new worker thread for the desired pool.
1149 static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1151 HANDLE thread;
1152 NTSTATUS status;
1154 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0,
1155 pool->stack_info.StackReserve, pool->stack_info.StackCommit,
1156 threadpool_worker_proc, pool, &thread, NULL );
1157 if (status == STATUS_SUCCESS)
1159 InterlockedIncrement( &pool->refcount );
1160 pool->num_workers++;
1161 NtClose( thread );
1163 return status;
1166 /***********************************************************************
1167 * tp_timerqueue_lock (internal)
1169 * Acquires a lock on the global timerqueue. When the lock is acquired
1170 * successfully, it is guaranteed that the timer thread is running.
1172 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1174 NTSTATUS status = STATUS_SUCCESS;
1175 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1177 timer->u.timer.timer_initialized = FALSE;
1178 timer->u.timer.timer_pending = FALSE;
1179 timer->u.timer.timer_set = FALSE;
1180 timer->u.timer.timeout = 0;
1181 timer->u.timer.period = 0;
1182 timer->u.timer.window_length = 0;
1184 RtlEnterCriticalSection( &timerqueue.cs );
1186 /* Make sure that the timerqueue thread is running. */
1187 if (!timerqueue.thread_running)
1189 HANDLE thread;
1190 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1191 timerqueue_thread_proc, NULL, &thread, NULL );
1192 if (status == STATUS_SUCCESS)
1194 timerqueue.thread_running = TRUE;
1195 NtClose( thread );
1199 if (status == STATUS_SUCCESS)
1201 timer->u.timer.timer_initialized = TRUE;
1202 timerqueue.objcount++;
1205 RtlLeaveCriticalSection( &timerqueue.cs );
1206 return status;
1209 /***********************************************************************
1210 * tp_timerqueue_unlock (internal)
1212 * Releases a lock on the global timerqueue.
1214 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1216 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1218 RtlEnterCriticalSection( &timerqueue.cs );
1219 if (timer->u.timer.timer_initialized)
1221 /* If timer was pending, remove it. */
1222 if (timer->u.timer.timer_pending)
1224 list_remove( &timer->u.timer.timer_entry );
1225 timer->u.timer.timer_pending = FALSE;
1228 /* If the last timer object was destroyed, then wake up the thread. */
1229 if (!--timerqueue.objcount)
1231 assert( list_empty( &timerqueue.pending_timers ) );
1232 RtlWakeAllConditionVariable( &timerqueue.update_event );
1235 timer->u.timer.timer_initialized = FALSE;
1237 RtlLeaveCriticalSection( &timerqueue.cs );
1240 /***********************************************************************
1241 * waitqueue_thread_proc (internal)
1243 static void CALLBACK waitqueue_thread_proc( void *param )
1245 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1246 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1247 struct waitqueue_bucket *bucket = param;
1248 struct threadpool_object *wait, *next;
1249 LARGE_INTEGER now, timeout;
1250 DWORD num_handles;
1251 NTSTATUS status;
1253 TRACE( "starting wait queue thread\n" );
1254 set_thread_name(L"wine_threadpool_waitqueue");
1256 RtlEnterCriticalSection( &waitqueue.cs );
1258 for (;;)
1260 NtQuerySystemTime( &now );
1261 timeout.QuadPart = MAXLONGLONG;
1262 num_handles = 0;
1264 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1265 u.wait.wait_entry )
1267 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1268 if (wait->u.wait.timeout <= now.QuadPart)
1270 /* Wait object timed out. */
1271 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1273 list_remove( &wait->u.wait.wait_entry );
1274 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1276 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1278 InterlockedIncrement( &wait->refcount );
1279 wait->num_pending_callbacks++;
1280 RtlEnterCriticalSection( &wait->pool->cs );
1281 tp_object_execute( wait, TRUE );
1282 RtlLeaveCriticalSection( &wait->pool->cs );
1283 tp_object_release( wait );
1285 else tp_object_submit( wait, FALSE );
1287 else
1289 if (wait->u.wait.timeout < timeout.QuadPart)
1290 timeout.QuadPart = wait->u.wait.timeout;
1292 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1293 InterlockedIncrement( &wait->refcount );
1294 objects[num_handles] = wait;
1295 handles[num_handles] = wait->u.wait.handle;
1296 num_handles++;
1300 if (!bucket->objcount)
1302 /* All wait objects have been destroyed, if no new wait objects are created
1303 * within some amount of time, then we can shutdown this thread. */
1304 assert( num_handles == 0 );
1305 RtlLeaveCriticalSection( &waitqueue.cs );
1306 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1307 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1308 RtlEnterCriticalSection( &waitqueue.cs );
1310 if (status == STATUS_TIMEOUT && !bucket->objcount)
1311 break;
1313 else
1315 handles[num_handles] = bucket->update_event;
1316 RtlLeaveCriticalSection( &waitqueue.cs );
1317 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1318 RtlEnterCriticalSection( &waitqueue.cs );
1320 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1322 wait = objects[status - STATUS_WAIT_0];
1323 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1324 if (wait->u.wait.bucket)
1326 /* Wait object signaled. */
1327 assert( wait->u.wait.bucket == bucket );
1328 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1330 list_remove( &wait->u.wait.wait_entry );
1331 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1333 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1335 wait->u.wait.signaled++;
1336 wait->num_pending_callbacks++;
1337 RtlEnterCriticalSection( &wait->pool->cs );
1338 tp_object_execute( wait, TRUE );
1339 RtlLeaveCriticalSection( &wait->pool->cs );
1341 else tp_object_submit( wait, TRUE );
1343 else
1344 WARN("wait object %p triggered while object was destroyed\n", wait);
1347 /* Release temporary references to wait objects. */
1348 while (num_handles)
1350 wait = objects[--num_handles];
1351 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1352 tp_object_release( wait );
1356 /* Try to merge bucket with other threads. */
1357 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1358 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1360 struct waitqueue_bucket *other_bucket;
1361 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1363 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1364 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1366 other_bucket->objcount += bucket->objcount;
1367 bucket->objcount = 0;
1369 /* Update reserved list. */
1370 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1372 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1373 wait->u.wait.bucket = other_bucket;
1375 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1377 /* Update waiting list. */
1378 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1380 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1381 wait->u.wait.bucket = other_bucket;
1383 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1385 /* Move bucket to the end, to keep the probability of
1386 * newly added wait objects as small as possible. */
1387 list_remove( &bucket->bucket_entry );
1388 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1390 NtSetEvent( other_bucket->update_event, NULL );
1391 break;
1397 /* Remove this bucket from the list. */
1398 list_remove( &bucket->bucket_entry );
1399 if (!--waitqueue.num_buckets)
1400 assert( list_empty( &waitqueue.buckets ) );
1402 RtlLeaveCriticalSection( &waitqueue.cs );
1404 TRACE( "terminating wait queue thread\n" );
1406 assert( bucket->objcount == 0 );
1407 assert( list_empty( &bucket->reserved ) );
1408 assert( list_empty( &bucket->waiting ) );
1409 NtClose( bucket->update_event );
1411 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1412 RtlExitUserThread( 0 );
1415 /***********************************************************************
1416 * tp_waitqueue_lock (internal)
1418 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1420 struct waitqueue_bucket *bucket;
1421 NTSTATUS status;
1422 HANDLE thread;
1423 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1424 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1426 wait->u.wait.signaled = 0;
1427 wait->u.wait.bucket = NULL;
1428 wait->u.wait.wait_pending = FALSE;
1429 wait->u.wait.timeout = 0;
1430 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1432 RtlEnterCriticalSection( &waitqueue.cs );
1434 /* Try to assign to existing bucket if possible. */
1435 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1437 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1439 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1440 wait->u.wait.bucket = bucket;
1441 bucket->objcount++;
1443 status = STATUS_SUCCESS;
1444 goto out;
1448 /* Create a new bucket and corresponding worker thread. */
1449 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1450 if (!bucket)
1452 status = STATUS_NO_MEMORY;
1453 goto out;
1456 bucket->objcount = 0;
1457 bucket->alertable = alertable;
1458 list_init( &bucket->reserved );
1459 list_init( &bucket->waiting );
1461 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1462 NULL, SynchronizationEvent, FALSE );
1463 if (status)
1465 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1466 goto out;
1469 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1470 waitqueue_thread_proc, bucket, &thread, NULL );
1471 if (status == STATUS_SUCCESS)
1473 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1474 waitqueue.num_buckets++;
1476 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1477 wait->u.wait.bucket = bucket;
1478 bucket->objcount++;
1480 NtClose( thread );
1482 else
1484 NtClose( bucket->update_event );
1485 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1488 out:
1489 RtlLeaveCriticalSection( &waitqueue.cs );
1490 return status;
1493 /***********************************************************************
1494 * tp_waitqueue_unlock (internal)
1496 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1498 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1500 RtlEnterCriticalSection( &waitqueue.cs );
1501 if (wait->u.wait.bucket)
1503 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1504 assert( bucket->objcount > 0 );
1506 list_remove( &wait->u.wait.wait_entry );
1507 wait->u.wait.bucket = NULL;
1508 bucket->objcount--;
1510 NtSetEvent( bucket->update_event, NULL );
1512 RtlLeaveCriticalSection( &waitqueue.cs );
1515 static void CALLBACK ioqueue_thread_proc( void *param )
1517 struct io_completion *completion;
1518 struct threadpool_object *io;
1519 IO_STATUS_BLOCK iosb;
1520 ULONG_PTR key, value;
1521 BOOL destroy, skip;
1522 NTSTATUS status;
1524 TRACE( "starting I/O completion thread\n" );
1525 set_thread_name(L"wine_threadpool_ioqueue");
1527 RtlEnterCriticalSection( &ioqueue.cs );
1529 for (;;)
1531 RtlLeaveCriticalSection( &ioqueue.cs );
1532 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1533 ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
1534 RtlEnterCriticalSection( &ioqueue.cs );
1536 destroy = skip = FALSE;
1537 io = (struct threadpool_object *)key;
1539 TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
1541 if (io && (io->shutdown || io->u.io.shutting_down))
1543 RtlEnterCriticalSection( &io->pool->cs );
1544 if (!io->u.io.pending_count)
1546 if (io->u.io.skipped_count)
1547 --io->u.io.skipped_count;
1549 if (io->u.io.skipped_count)
1550 skip = TRUE;
1551 else
1552 destroy = TRUE;
1554 RtlLeaveCriticalSection( &io->pool->cs );
1555 if (skip) continue;
1558 if (destroy)
1560 --ioqueue.objcount;
1561 TRACE( "Releasing io %p.\n", io );
1562 io->shutdown = TRUE;
1563 tp_object_release( io );
1565 else if (io)
1567 RtlEnterCriticalSection( &io->pool->cs );
1569 TRACE( "pending_count %u.\n", io->u.io.pending_count );
1571 if (io->u.io.pending_count)
1573 --io->u.io.pending_count;
1574 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1575 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1577 ERR( "Failed to allocate memory.\n" );
1578 RtlLeaveCriticalSection( &io->pool->cs );
1579 continue;
1582 completion = &io->u.io.completions[io->u.io.completion_count++];
1583 completion->iosb = iosb;
1584 completion->cvalue = value;
1586 tp_object_submit( io, FALSE );
1588 RtlLeaveCriticalSection( &io->pool->cs );
1591 if (!ioqueue.objcount)
1593 /* All I/O objects have been destroyed; if no new objects are
1594 * created within some amount of time, then we can shutdown this
1595 * thread. */
1596 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1597 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1598 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1599 break;
1603 ioqueue.thread_running = FALSE;
1604 RtlLeaveCriticalSection( &ioqueue.cs );
1606 TRACE( "terminating I/O completion thread\n" );
1608 RtlExitUserThread( 0 );
1611 static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
1613 NTSTATUS status = STATUS_SUCCESS;
1615 assert( io->type == TP_OBJECT_TYPE_IO );
1617 RtlEnterCriticalSection( &ioqueue.cs );
1619 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1620 IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
1622 RtlLeaveCriticalSection( &ioqueue.cs );
1623 return status;
1626 if (!ioqueue.thread_running)
1628 HANDLE thread;
1630 if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
1631 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1633 ioqueue.thread_running = TRUE;
1634 NtClose( thread );
1638 if (status == STATUS_SUCCESS)
1640 FILE_COMPLETION_INFORMATION info;
1641 IO_STATUS_BLOCK iosb;
1643 info.CompletionPort = ioqueue.port;
1644 info.CompletionKey = (ULONG_PTR)io;
1646 status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
1649 if (status == STATUS_SUCCESS)
1651 if (!ioqueue.objcount++)
1652 RtlWakeConditionVariable( &ioqueue.update_event );
1655 RtlLeaveCriticalSection( &ioqueue.cs );
1656 return status;
1659 /***********************************************************************
1660 * tp_threadpool_alloc (internal)
1662 * Allocates a new threadpool object.
1664 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1666 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress );
1667 struct threadpool *pool;
1668 unsigned int i;
1670 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1671 if (!pool)
1672 return STATUS_NO_MEMORY;
1674 pool->refcount = 1;
1675 pool->objcount = 0;
1676 pool->shutdown = FALSE;
1678 RtlInitializeCriticalSection( &pool->cs );
1679 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1681 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1682 list_init( &pool->pools[i] );
1683 RtlInitializeConditionVariable( &pool->update_event );
1685 pool->max_workers = 500;
1686 pool->min_workers = 0;
1687 pool->num_workers = 0;
1688 pool->num_busy_workers = 0;
1689 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1690 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1692 TRACE( "allocated threadpool %p\n", pool );
1694 *out = pool;
1695 return STATUS_SUCCESS;
1698 /***********************************************************************
1699 * tp_threadpool_shutdown (internal)
1701 * Prepares the shutdown of a threadpool object and notifies all worker
1702 * threads to terminate (after all remaining work items have been
1703 * processed).
1705 static void tp_threadpool_shutdown( struct threadpool *pool )
1707 assert( pool != default_threadpool );
1709 pool->shutdown = TRUE;
1710 RtlWakeAllConditionVariable( &pool->update_event );
1713 /***********************************************************************
1714 * tp_threadpool_release (internal)
1716 * Releases a reference to a threadpool object.
1718 static BOOL tp_threadpool_release( struct threadpool *pool )
1720 unsigned int i;
1722 if (InterlockedDecrement( &pool->refcount ))
1723 return FALSE;
1725 TRACE( "destroying threadpool %p\n", pool );
1727 assert( pool->shutdown );
1728 assert( !pool->objcount );
1729 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1730 assert( list_empty( &pool->pools[i] ) );
1732 pool->cs.DebugInfo->Spare[0] = 0;
1733 RtlDeleteCriticalSection( &pool->cs );
1735 RtlFreeHeap( GetProcessHeap(), 0, pool );
1736 return TRUE;
1739 /***********************************************************************
1740 * tp_threadpool_lock (internal)
1742 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1743 * block. When the lock is acquired successfully, it is guaranteed that
1744 * there is at least one worker thread to process tasks.
1746 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1748 struct threadpool *pool = NULL;
1749 NTSTATUS status = STATUS_SUCCESS;
1751 if (environment)
1753 /* Validate environment parameters. */
1754 if (environment->Version == 3)
1756 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1758 switch (environment3->CallbackPriority)
1760 case TP_CALLBACK_PRIORITY_HIGH:
1761 case TP_CALLBACK_PRIORITY_NORMAL:
1762 case TP_CALLBACK_PRIORITY_LOW:
1763 break;
1764 default:
1765 return STATUS_INVALID_PARAMETER;
1769 pool = (struct threadpool *)environment->Pool;
1772 if (!pool)
1774 if (!default_threadpool)
1776 status = tp_threadpool_alloc( &pool );
1777 if (status != STATUS_SUCCESS)
1778 return status;
1780 if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool, NULL ) != NULL)
1782 tp_threadpool_shutdown( pool );
1783 tp_threadpool_release( pool );
1787 pool = default_threadpool;
1790 RtlEnterCriticalSection( &pool->cs );
1792 /* Make sure that the threadpool has at least one thread. */
1793 if (!pool->num_workers)
1794 status = tp_new_worker_thread( pool );
1796 /* Keep a reference, and increment objcount to ensure that the
1797 * last thread doesn't terminate. */
1798 if (status == STATUS_SUCCESS)
1800 InterlockedIncrement( &pool->refcount );
1801 pool->objcount++;
1804 RtlLeaveCriticalSection( &pool->cs );
1806 if (status != STATUS_SUCCESS)
1807 return status;
1809 *out = pool;
1810 return STATUS_SUCCESS;
1813 /***********************************************************************
1814 * tp_threadpool_unlock (internal)
1816 * Releases a lock on a threadpool.
1818 static void tp_threadpool_unlock( struct threadpool *pool )
1820 RtlEnterCriticalSection( &pool->cs );
1821 pool->objcount--;
1822 RtlLeaveCriticalSection( &pool->cs );
1823 tp_threadpool_release( pool );
1826 /***********************************************************************
1827 * tp_group_alloc (internal)
1829 * Allocates a new threadpool group object.
1831 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1833 struct threadpool_group *group;
1835 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1836 if (!group)
1837 return STATUS_NO_MEMORY;
1839 group->refcount = 1;
1840 group->shutdown = FALSE;
1842 RtlInitializeCriticalSection( &group->cs );
1843 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1845 list_init( &group->members );
1847 TRACE( "allocated group %p\n", group );
1849 *out = group;
1850 return STATUS_SUCCESS;
1853 /***********************************************************************
1854 * tp_group_shutdown (internal)
1856 * Marks the group object for shutdown.
1858 static void tp_group_shutdown( struct threadpool_group *group )
1860 group->shutdown = TRUE;
1863 /***********************************************************************
1864 * tp_group_release (internal)
1866 * Releases a reference to a group object.
1868 static BOOL tp_group_release( struct threadpool_group *group )
1870 if (InterlockedDecrement( &group->refcount ))
1871 return FALSE;
1873 TRACE( "destroying group %p\n", group );
1875 assert( group->shutdown );
1876 assert( list_empty( &group->members ) );
1878 group->cs.DebugInfo->Spare[0] = 0;
1879 RtlDeleteCriticalSection( &group->cs );
1881 RtlFreeHeap( GetProcessHeap(), 0, group );
1882 return TRUE;
1885 /***********************************************************************
1886 * tp_object_initialize (internal)
1888 * Initializes members of a threadpool object.
1890 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
1891 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
1893 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
1895 object->refcount = 1;
1896 object->shutdown = FALSE;
1898 object->pool = pool;
1899 object->group = NULL;
1900 object->userdata = userdata;
1901 object->group_cancel_callback = NULL;
1902 object->finalization_callback = NULL;
1903 object->may_run_long = 0;
1904 object->race_dll = NULL;
1905 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
1907 memset( &object->group_entry, 0, sizeof(object->group_entry) );
1908 object->is_group_member = FALSE;
1910 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
1911 RtlInitializeConditionVariable( &object->finished_event );
1912 RtlInitializeConditionVariable( &object->group_finished_event );
1913 object->completed_event = NULL;
1914 object->num_pending_callbacks = 0;
1915 object->num_running_callbacks = 0;
1916 object->num_associated_callbacks = 0;
1918 if (environment)
1920 if (environment->Version != 1 && environment->Version != 3)
1921 FIXME( "unsupported environment version %lu\n", environment->Version );
1923 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
1924 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
1925 object->finalization_callback = environment->FinalizationCallback;
1926 object->may_run_long = environment->u.s.LongFunction != 0;
1927 object->race_dll = environment->RaceDll;
1928 if (environment->Version == 3)
1930 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1932 object->priority = environment_v3->CallbackPriority;
1933 assert( object->priority < ARRAY_SIZE(pool->pools) );
1936 if (environment->ActivationContext)
1937 FIXME( "activation context not supported yet\n" );
1939 if (environment->u.s.Persistent)
1940 FIXME( "persistent threads not supported yet\n" );
1943 if (object->race_dll)
1944 LdrAddRefDll( 0, object->race_dll );
1946 TRACE( "allocated object %p of type %u\n", object, object->type );
1948 /* For simple callbacks we have to run tp_object_submit before adding this object
1949 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1950 * will be set, and tp_object_submit would fail with an assertion. */
1952 if (is_simple_callback)
1953 tp_object_submit( object, FALSE );
1955 if (object->group)
1957 struct threadpool_group *group = object->group;
1958 InterlockedIncrement( &group->refcount );
1960 RtlEnterCriticalSection( &group->cs );
1961 list_add_tail( &group->members, &object->group_entry );
1962 object->is_group_member = TRUE;
1963 RtlLeaveCriticalSection( &group->cs );
1966 if (is_simple_callback)
1967 tp_object_release( object );
1970 static void tp_object_prio_queue( struct threadpool_object *object )
1972 ++object->pool->num_busy_workers;
1973 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
1976 /***********************************************************************
1977 * tp_object_submit (internal)
1979 * Submits a threadpool object to the associated threadpool. This
1980 * function has to be VOID because TpPostWork can never fail on Windows.
1982 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
1984 struct threadpool *pool = object->pool;
1985 NTSTATUS status = STATUS_UNSUCCESSFUL;
1987 assert( !object->shutdown );
1988 assert( !pool->shutdown );
1990 RtlEnterCriticalSection( &pool->cs );
1992 /* Start new worker threads if required. */
1993 if (pool->num_busy_workers >= pool->num_workers &&
1994 pool->num_workers < pool->max_workers)
1995 status = tp_new_worker_thread( pool );
1997 /* Queue work item and increment refcount. */
1998 InterlockedIncrement( &object->refcount );
1999 if (!object->num_pending_callbacks++)
2000 tp_object_prio_queue( object );
2002 /* Count how often the object was signaled. */
2003 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2004 object->u.wait.signaled++;
2006 /* No new thread started - wake up one existing thread. */
2007 if (status != STATUS_SUCCESS)
2009 assert( pool->num_workers > 0 );
2010 RtlWakeConditionVariable( &pool->update_event );
2013 RtlLeaveCriticalSection( &pool->cs );
2016 /***********************************************************************
2017 * tp_object_cancel (internal)
2019 * Cancels all currently pending callbacks for a specific object.
2021 static void tp_object_cancel( struct threadpool_object *object )
2023 struct threadpool *pool = object->pool;
2024 LONG pending_callbacks = 0;
2026 RtlEnterCriticalSection( &pool->cs );
2027 if (object->num_pending_callbacks)
2029 pending_callbacks = object->num_pending_callbacks;
2030 object->num_pending_callbacks = 0;
2031 list_remove( &object->pool_entry );
2033 if (object->type == TP_OBJECT_TYPE_WAIT)
2034 object->u.wait.signaled = 0;
2036 if (object->type == TP_OBJECT_TYPE_IO)
2038 object->u.io.skipped_count += object->u.io.pending_count;
2039 object->u.io.pending_count = 0;
2041 RtlLeaveCriticalSection( &pool->cs );
2043 while (pending_callbacks--)
2044 tp_object_release( object );
2047 static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
2049 if (object->num_pending_callbacks)
2050 return FALSE;
2051 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2052 return FALSE;
2054 if (group)
2055 return !object->num_running_callbacks;
2056 else
2057 return !object->num_associated_callbacks;
2060 /***********************************************************************
2061 * tp_object_wait (internal)
2063 * Waits until all pending and running callbacks of a specific object
2064 * have been processed.
2066 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2068 struct threadpool *pool = object->pool;
2070 RtlEnterCriticalSection( &pool->cs );
2071 while (!object_is_finished( object, group_wait ))
2073 if (group_wait)
2074 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2075 else
2076 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2078 RtlLeaveCriticalSection( &pool->cs );
2081 static void tp_ioqueue_unlock( struct threadpool_object *io )
2083 assert( io->type == TP_OBJECT_TYPE_IO );
2085 RtlEnterCriticalSection( &ioqueue.cs );
2087 assert(ioqueue.objcount);
2089 if (!io->shutdown && !--ioqueue.objcount)
2090 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2092 RtlLeaveCriticalSection( &ioqueue.cs );
2095 /***********************************************************************
2096 * tp_object_prepare_shutdown (internal)
2098 * Prepares a threadpool object for shutdown.
2100 static void tp_object_prepare_shutdown( struct threadpool_object *object )
2102 if (object->type == TP_OBJECT_TYPE_TIMER)
2103 tp_timerqueue_unlock( object );
2104 else if (object->type == TP_OBJECT_TYPE_WAIT)
2105 tp_waitqueue_unlock( object );
2106 else if (object->type == TP_OBJECT_TYPE_IO)
2107 tp_ioqueue_unlock( object );
2110 /***********************************************************************
2111 * tp_object_release (internal)
2113 * Releases a reference to a threadpool object.
2115 static BOOL tp_object_release( struct threadpool_object *object )
2117 if (InterlockedDecrement( &object->refcount ))
2118 return FALSE;
2120 TRACE( "destroying object %p of type %u\n", object, object->type );
2122 assert( object->shutdown );
2123 assert( !object->num_pending_callbacks );
2124 assert( !object->num_running_callbacks );
2125 assert( !object->num_associated_callbacks );
2127 /* release reference to the group */
2128 if (object->group)
2130 struct threadpool_group *group = object->group;
2132 RtlEnterCriticalSection( &group->cs );
2133 if (object->is_group_member)
2135 list_remove( &object->group_entry );
2136 object->is_group_member = FALSE;
2138 RtlLeaveCriticalSection( &group->cs );
2140 tp_group_release( group );
2143 tp_threadpool_unlock( object->pool );
2145 if (object->race_dll)
2146 LdrUnloadDll( object->race_dll );
2148 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2149 NtSetEvent( object->completed_event, NULL );
2151 RtlFreeHeap( GetProcessHeap(), 0, object );
2152 return TRUE;
2155 static struct list *threadpool_get_next_item( const struct threadpool *pool )
2157 struct list *ptr;
2158 unsigned int i;
2160 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2162 if ((ptr = list_head( &pool->pools[i] )))
2163 break;
2166 return ptr;
2169 /***********************************************************************
2170 * tp_object_execute (internal)
2172 * Executes a threadpool object callback, object->pool->cs has to be
2173 * held.
2175 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2177 TP_CALLBACK_INSTANCE *callback_instance;
2178 struct threadpool_instance instance;
2179 struct io_completion completion;
2180 struct threadpool *pool = object->pool;
2181 TP_WAIT_RESULT wait_result = 0;
2182 NTSTATUS status;
2184 object->num_pending_callbacks--;
2186 /* For wait objects check if they were signaled or have timed out. */
2187 if (object->type == TP_OBJECT_TYPE_WAIT)
2189 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2190 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2192 else if (object->type == TP_OBJECT_TYPE_IO)
2194 assert( object->u.io.completion_count );
2195 completion = object->u.io.completions[--object->u.io.completion_count];
2198 /* Leave critical section and do the actual callback. */
2199 object->num_associated_callbacks++;
2200 object->num_running_callbacks++;
2201 RtlLeaveCriticalSection( &pool->cs );
2202 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2204 /* Initialize threadpool instance struct. */
2205 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2206 instance.object = object;
2207 instance.threadid = GetCurrentThreadId();
2208 instance.associated = TRUE;
2209 instance.may_run_long = object->may_run_long;
2210 instance.cleanup.critical_section = NULL;
2211 instance.cleanup.mutex = NULL;
2212 instance.cleanup.semaphore = NULL;
2213 instance.cleanup.semaphore_count = 0;
2214 instance.cleanup.event = NULL;
2215 instance.cleanup.library = NULL;
2217 switch (object->type)
2219 case TP_OBJECT_TYPE_SIMPLE:
2221 TRACE( "executing simple callback %p(%p, %p)\n",
2222 object->u.simple.callback, callback_instance, object->userdata );
2223 object->u.simple.callback( callback_instance, object->userdata );
2224 TRACE( "callback %p returned\n", object->u.simple.callback );
2225 break;
2228 case TP_OBJECT_TYPE_WORK:
2230 TRACE( "executing work callback %p(%p, %p, %p)\n",
2231 object->u.work.callback, callback_instance, object->userdata, object );
2232 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2233 TRACE( "callback %p returned\n", object->u.work.callback );
2234 break;
2237 case TP_OBJECT_TYPE_TIMER:
2239 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2240 object->u.timer.callback, callback_instance, object->userdata, object );
2241 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2242 TRACE( "callback %p returned\n", object->u.timer.callback );
2243 break;
2246 case TP_OBJECT_TYPE_WAIT:
2248 TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2249 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2250 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2251 TRACE( "callback %p returned\n", object->u.wait.callback );
2252 break;
2255 case TP_OBJECT_TYPE_IO:
2257 TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2258 object->u.io.callback, callback_instance, object->userdata,
2259 completion.cvalue, &completion.iosb, (TP_IO *)object );
2260 object->u.io.callback( callback_instance, object->userdata,
2261 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2262 TRACE( "callback %p returned\n", object->u.io.callback );
2263 break;
2266 default:
2267 assert(0);
2268 break;
2271 /* Execute finalization callback. */
2272 if (object->finalization_callback)
2274 TRACE( "executing finalization callback %p(%p, %p)\n",
2275 object->finalization_callback, callback_instance, object->userdata );
2276 object->finalization_callback( callback_instance, object->userdata );
2277 TRACE( "callback %p returned\n", object->finalization_callback );
2280 /* Execute cleanup tasks. */
2281 if (instance.cleanup.critical_section)
2283 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2285 if (instance.cleanup.mutex)
2287 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2288 if (status != STATUS_SUCCESS) goto skip_cleanup;
2290 if (instance.cleanup.semaphore)
2292 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2293 if (status != STATUS_SUCCESS) goto skip_cleanup;
2295 if (instance.cleanup.event)
2297 status = NtSetEvent( instance.cleanup.event, NULL );
2298 if (status != STATUS_SUCCESS) goto skip_cleanup;
2300 if (instance.cleanup.library)
2302 LdrUnloadDll( instance.cleanup.library );
2305 skip_cleanup:
2306 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2307 RtlEnterCriticalSection( &pool->cs );
2309 /* Simple callbacks are automatically shutdown after execution. */
2310 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2312 tp_object_prepare_shutdown( object );
2313 object->shutdown = TRUE;
2316 object->num_running_callbacks--;
2317 if (object_is_finished( object, TRUE ))
2318 RtlWakeAllConditionVariable( &object->group_finished_event );
2320 if (instance.associated)
2322 object->num_associated_callbacks--;
2323 if (object_is_finished( object, FALSE ))
2324 RtlWakeAllConditionVariable( &object->finished_event );
2328 /***********************************************************************
2329 * threadpool_worker_proc (internal)
2331 static void CALLBACK threadpool_worker_proc( void *param )
2333 struct threadpool *pool = param;
2334 LARGE_INTEGER timeout;
2335 struct list *ptr;
2337 TRACE( "starting worker thread for pool %p\n", pool );
2338 set_thread_name(L"wine_threadpool_worker");
2340 RtlEnterCriticalSection( &pool->cs );
2341 for (;;)
2343 while ((ptr = threadpool_get_next_item( pool )))
2345 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2346 assert( object->num_pending_callbacks > 0 );
2348 /* If further pending callbacks are queued, move the work item to
2349 * the end of the pool list. Otherwise remove it from the pool. */
2350 list_remove( &object->pool_entry );
2351 if (object->num_pending_callbacks > 1)
2352 tp_object_prio_queue( object );
2354 tp_object_execute( object, FALSE );
2356 assert(pool->num_busy_workers);
2357 pool->num_busy_workers--;
2359 tp_object_release( object );
2362 /* Shutdown worker thread if requested. */
2363 if (pool->shutdown)
2364 break;
2366 /* Wait for new tasks or until the timeout expires. A thread only terminates
2367 * when no new tasks are available, and the number of threads can be
2368 * decreased without violating the min_workers limit. An exception is when
2369 * min_workers == 0, then objcount is used to detect if the last thread
2370 * can be terminated. */
2371 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2372 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2373 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2374 (!pool->min_workers && !pool->objcount)))
2376 break;
2379 pool->num_workers--;
2380 RtlLeaveCriticalSection( &pool->cs );
2382 TRACE( "terminating worker thread for pool %p\n", pool );
2383 tp_threadpool_release( pool );
2384 RtlExitUserThread( 0 );
2387 /***********************************************************************
2388 * TpAllocCleanupGroup (NTDLL.@)
2390 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2392 TRACE( "%p\n", out );
2394 return tp_group_alloc( (struct threadpool_group **)out );
2397 /***********************************************************************
2398 * TpAllocIoCompletion (NTDLL.@)
2400 NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
2401 void *userdata, TP_CALLBACK_ENVIRON *environment )
2403 struct threadpool_object *object;
2404 struct threadpool *pool;
2405 NTSTATUS status;
2407 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2409 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2410 return STATUS_NO_MEMORY;
2412 if ((status = tp_threadpool_lock( &pool, environment )))
2414 RtlFreeHeap( GetProcessHeap(), 0, object );
2415 return status;
2418 object->type = TP_OBJECT_TYPE_IO;
2419 object->u.io.callback = callback;
2420 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2422 tp_threadpool_unlock( pool );
2423 RtlFreeHeap( GetProcessHeap(), 0, object );
2424 return status;
2427 if ((status = tp_ioqueue_lock( object, file )))
2429 tp_threadpool_unlock( pool );
2430 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2431 RtlFreeHeap( GetProcessHeap(), 0, object );
2432 return status;
2435 tp_object_initialize( object, pool, userdata, environment );
2437 *out = (TP_IO *)object;
2438 return STATUS_SUCCESS;
2441 /***********************************************************************
2442 * TpAllocPool (NTDLL.@)
2444 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2446 TRACE( "%p %p\n", out, reserved );
2448 if (reserved)
2449 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2451 return tp_threadpool_alloc( (struct threadpool **)out );
2454 /***********************************************************************
2455 * TpAllocTimer (NTDLL.@)
2457 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2458 TP_CALLBACK_ENVIRON *environment )
2460 struct threadpool_object *object;
2461 struct threadpool *pool;
2462 NTSTATUS status;
2464 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2466 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2467 if (!object)
2468 return STATUS_NO_MEMORY;
2470 status = tp_threadpool_lock( &pool, environment );
2471 if (status)
2473 RtlFreeHeap( GetProcessHeap(), 0, object );
2474 return status;
2477 object->type = TP_OBJECT_TYPE_TIMER;
2478 object->u.timer.callback = callback;
2480 status = tp_timerqueue_lock( object );
2481 if (status)
2483 tp_threadpool_unlock( pool );
2484 RtlFreeHeap( GetProcessHeap(), 0, object );
2485 return status;
2488 tp_object_initialize( object, pool, userdata, environment );
2490 *out = (TP_TIMER *)object;
2491 return STATUS_SUCCESS;
2494 static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2495 TP_CALLBACK_ENVIRON *environment, DWORD flags )
2497 struct threadpool_object *object;
2498 struct threadpool *pool;
2499 NTSTATUS status;
2501 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2502 if (!object)
2503 return STATUS_NO_MEMORY;
2505 status = tp_threadpool_lock( &pool, environment );
2506 if (status)
2508 RtlFreeHeap( GetProcessHeap(), 0, object );
2509 return status;
2512 object->type = TP_OBJECT_TYPE_WAIT;
2513 object->u.wait.callback = callback;
2514 object->u.wait.flags = flags;
2516 status = tp_waitqueue_lock( object );
2517 if (status)
2519 tp_threadpool_unlock( pool );
2520 RtlFreeHeap( GetProcessHeap(), 0, object );
2521 return status;
2524 tp_object_initialize( object, pool, userdata, environment );
2526 *out = (TP_WAIT *)object;
2527 return STATUS_SUCCESS;
2530 /***********************************************************************
2531 * TpAllocWait (NTDLL.@)
2533 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2534 TP_CALLBACK_ENVIRON *environment )
2536 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2537 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2540 /***********************************************************************
2541 * TpAllocWork (NTDLL.@)
2543 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2544 TP_CALLBACK_ENVIRON *environment )
2546 struct threadpool_object *object;
2547 struct threadpool *pool;
2548 NTSTATUS status;
2550 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2552 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2553 if (!object)
2554 return STATUS_NO_MEMORY;
2556 status = tp_threadpool_lock( &pool, environment );
2557 if (status)
2559 RtlFreeHeap( GetProcessHeap(), 0, object );
2560 return status;
2563 object->type = TP_OBJECT_TYPE_WORK;
2564 object->u.work.callback = callback;
2565 tp_object_initialize( object, pool, userdata, environment );
2567 *out = (TP_WORK *)object;
2568 return STATUS_SUCCESS;
2571 /***********************************************************************
2572 * TpCancelAsyncIoOperation (NTDLL.@)
2574 void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
2576 struct threadpool_object *this = impl_from_TP_IO( io );
2578 TRACE( "%p\n", io );
2580 RtlEnterCriticalSection( &this->pool->cs );
2582 TRACE("pending_count %u.\n", this->u.io.pending_count);
2584 this->u.io.pending_count--;
2585 if (object_is_finished( this, TRUE ))
2586 RtlWakeAllConditionVariable( &this->group_finished_event );
2587 if (object_is_finished( this, FALSE ))
2588 RtlWakeAllConditionVariable( &this->finished_event );
2590 RtlLeaveCriticalSection( &this->pool->cs );
2593 /***********************************************************************
2594 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2596 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2598 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2600 TRACE( "%p %p\n", instance, crit );
2602 if (!this->cleanup.critical_section)
2603 this->cleanup.critical_section = crit;
2606 /***********************************************************************
2607 * TpCallbackMayRunLong (NTDLL.@)
2609 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2611 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2612 struct threadpool_object *object = this->object;
2613 struct threadpool *pool;
2614 NTSTATUS status = STATUS_SUCCESS;
2616 TRACE( "%p\n", instance );
2618 if (this->threadid != GetCurrentThreadId())
2620 ERR("called from wrong thread, ignoring\n");
2621 return STATUS_UNSUCCESSFUL; /* FIXME */
2624 if (this->may_run_long)
2625 return STATUS_SUCCESS;
2627 pool = object->pool;
2628 RtlEnterCriticalSection( &pool->cs );
2630 /* Start new worker threads if required. */
2631 if (pool->num_busy_workers >= pool->num_workers)
2633 if (pool->num_workers < pool->max_workers)
2635 status = tp_new_worker_thread( pool );
2637 else
2639 status = STATUS_TOO_MANY_THREADS;
2643 RtlLeaveCriticalSection( &pool->cs );
2644 this->may_run_long = TRUE;
2645 return status;
2648 /***********************************************************************
2649 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2651 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2653 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2655 TRACE( "%p %p\n", instance, mutex );
2657 if (!this->cleanup.mutex)
2658 this->cleanup.mutex = mutex;
2661 /***********************************************************************
2662 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2664 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2666 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2668 TRACE( "%p %p %lu\n", instance, semaphore, count );
2670 if (!this->cleanup.semaphore)
2672 this->cleanup.semaphore = semaphore;
2673 this->cleanup.semaphore_count = count;
2677 /***********************************************************************
2678 * TpCallbackSetEventOnCompletion (NTDLL.@)
2680 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2682 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2684 TRACE( "%p %p\n", instance, event );
2686 if (!this->cleanup.event)
2687 this->cleanup.event = event;
2690 /***********************************************************************
2691 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2693 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2695 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2697 TRACE( "%p %p\n", instance, module );
2699 if (!this->cleanup.library)
2700 this->cleanup.library = module;
2703 /***********************************************************************
2704 * TpDisassociateCallback (NTDLL.@)
2706 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2708 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2709 struct threadpool_object *object = this->object;
2710 struct threadpool *pool;
2712 TRACE( "%p\n", instance );
2714 if (this->threadid != GetCurrentThreadId())
2716 ERR("called from wrong thread, ignoring\n");
2717 return;
2720 if (!this->associated)
2721 return;
2723 pool = object->pool;
2724 RtlEnterCriticalSection( &pool->cs );
2726 object->num_associated_callbacks--;
2727 if (object_is_finished( object, FALSE ))
2728 RtlWakeAllConditionVariable( &object->finished_event );
2730 RtlLeaveCriticalSection( &pool->cs );
2731 this->associated = FALSE;
2734 /***********************************************************************
2735 * TpIsTimerSet (NTDLL.@)
2737 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2739 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2741 TRACE( "%p\n", timer );
2743 return this->u.timer.timer_set;
2746 /***********************************************************************
2747 * TpPostWork (NTDLL.@)
2749 VOID WINAPI TpPostWork( TP_WORK *work )
2751 struct threadpool_object *this = impl_from_TP_WORK( work );
2753 TRACE( "%p\n", work );
2755 tp_object_submit( this, FALSE );
2758 /***********************************************************************
2759 * TpReleaseCleanupGroup (NTDLL.@)
2761 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2763 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2765 TRACE( "%p\n", group );
2767 tp_group_shutdown( this );
2768 tp_group_release( this );
2771 /***********************************************************************
2772 * TpReleaseCleanupGroupMembers (NTDLL.@)
2774 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2776 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2777 struct threadpool_object *object, *next;
2778 struct list members;
2780 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2782 RtlEnterCriticalSection( &this->cs );
2784 /* Unset group, increase references, and mark objects for shutdown */
2785 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2787 assert( object->group == this );
2788 assert( object->is_group_member );
2790 if (InterlockedIncrement( &object->refcount ) == 1)
2792 /* Object is basically already destroyed, but group reference
2793 * was not deleted yet. We can safely ignore this object. */
2794 InterlockedDecrement( &object->refcount );
2795 list_remove( &object->group_entry );
2796 object->is_group_member = FALSE;
2797 continue;
2800 object->is_group_member = FALSE;
2801 tp_object_prepare_shutdown( object );
2804 /* Move members to a new temporary list */
2805 list_init( &members );
2806 list_move_tail( &members, &this->members );
2808 RtlLeaveCriticalSection( &this->cs );
2810 /* Cancel pending callbacks if requested */
2811 if (cancel_pending)
2813 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2815 tp_object_cancel( object );
2819 /* Wait for remaining callbacks to finish */
2820 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2822 tp_object_wait( object, TRUE );
2824 if (!object->shutdown)
2826 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2827 if (cancel_pending && object->group_cancel_callback)
2829 TRACE( "executing group cancel callback %p(%p, %p)\n",
2830 object->group_cancel_callback, object->userdata, userdata );
2831 object->group_cancel_callback( object->userdata, userdata );
2832 TRACE( "callback %p returned\n", object->group_cancel_callback );
2835 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2836 tp_object_release( object );
2839 object->shutdown = TRUE;
2840 tp_object_release( object );
2844 /***********************************************************************
2845 * TpReleaseIoCompletion (NTDLL.@)
2847 void WINAPI TpReleaseIoCompletion( TP_IO *io )
2849 struct threadpool_object *this = impl_from_TP_IO( io );
2850 BOOL can_destroy;
2852 TRACE( "%p\n", io );
2854 RtlEnterCriticalSection( &this->pool->cs );
2855 this->u.io.shutting_down = TRUE;
2856 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
2857 RtlLeaveCriticalSection( &this->pool->cs );
2859 if (can_destroy)
2861 tp_object_prepare_shutdown( this );
2862 this->shutdown = TRUE;
2863 tp_object_release( this );
2867 /***********************************************************************
2868 * TpReleasePool (NTDLL.@)
2870 VOID WINAPI TpReleasePool( TP_POOL *pool )
2872 struct threadpool *this = impl_from_TP_POOL( pool );
2874 TRACE( "%p\n", pool );
2876 tp_threadpool_shutdown( this );
2877 tp_threadpool_release( this );
2880 /***********************************************************************
2881 * TpReleaseTimer (NTDLL.@)
2883 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2885 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2887 TRACE( "%p\n", timer );
2889 tp_object_prepare_shutdown( this );
2890 this->shutdown = TRUE;
2891 tp_object_release( this );
2894 /***********************************************************************
2895 * TpReleaseWait (NTDLL.@)
2897 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
2899 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2901 TRACE( "%p\n", wait );
2903 tp_object_prepare_shutdown( this );
2904 this->shutdown = TRUE;
2905 tp_object_release( this );
2908 /***********************************************************************
2909 * TpReleaseWork (NTDLL.@)
2911 VOID WINAPI TpReleaseWork( TP_WORK *work )
2913 struct threadpool_object *this = impl_from_TP_WORK( work );
2915 TRACE( "%p\n", work );
2917 tp_object_prepare_shutdown( this );
2918 this->shutdown = TRUE;
2919 tp_object_release( this );
2922 /***********************************************************************
2923 * TpSetPoolMaxThreads (NTDLL.@)
2925 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
2927 struct threadpool *this = impl_from_TP_POOL( pool );
2929 TRACE( "%p %lu\n", pool, maximum );
2931 RtlEnterCriticalSection( &this->cs );
2932 this->max_workers = max( maximum, 1 );
2933 this->min_workers = min( this->min_workers, this->max_workers );
2934 RtlLeaveCriticalSection( &this->cs );
2937 /***********************************************************************
2938 * TpSetPoolMinThreads (NTDLL.@)
2940 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
2942 struct threadpool *this = impl_from_TP_POOL( pool );
2943 NTSTATUS status = STATUS_SUCCESS;
2945 TRACE( "%p %lu\n", pool, minimum );
2947 RtlEnterCriticalSection( &this->cs );
2949 while (this->num_workers < minimum)
2951 status = tp_new_worker_thread( this );
2952 if (status != STATUS_SUCCESS)
2953 break;
2956 if (status == STATUS_SUCCESS)
2958 this->min_workers = minimum;
2959 this->max_workers = max( this->min_workers, this->max_workers );
2962 RtlLeaveCriticalSection( &this->cs );
2963 return !status;
2966 /***********************************************************************
2967 * TpSetTimer (NTDLL.@)
2969 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
2971 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2972 struct threadpool_object *other_timer;
2973 BOOL submit_timer = FALSE;
2974 ULONGLONG timestamp;
2976 TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
2978 RtlEnterCriticalSection( &timerqueue.cs );
2980 assert( this->u.timer.timer_initialized );
2981 this->u.timer.timer_set = timeout != NULL;
2983 /* Convert relative timeout to absolute timestamp and handle a timeout
2984 * of zero, which means that the timer is submitted immediately. */
2985 if (timeout)
2987 timestamp = timeout->QuadPart;
2988 if ((LONGLONG)timestamp < 0)
2990 LARGE_INTEGER now;
2991 NtQuerySystemTime( &now );
2992 timestamp = now.QuadPart - timestamp;
2994 else if (!timestamp)
2996 if (!period)
2997 timeout = NULL;
2998 else
3000 LARGE_INTEGER now;
3001 NtQuerySystemTime( &now );
3002 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3004 submit_timer = TRUE;
3008 /* First remove existing timeout. */
3009 if (this->u.timer.timer_pending)
3011 list_remove( &this->u.timer.timer_entry );
3012 this->u.timer.timer_pending = FALSE;
3015 /* If the timer was enabled, then add it back to the queue. */
3016 if (timeout)
3018 this->u.timer.timeout = timestamp;
3019 this->u.timer.period = period;
3020 this->u.timer.window_length = window_length;
3022 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3023 struct threadpool_object, u.timer.timer_entry )
3025 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3026 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3027 break;
3029 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3031 /* Wake up the timer thread when the timeout has to be updated. */
3032 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3033 RtlWakeAllConditionVariable( &timerqueue.update_event );
3035 this->u.timer.timer_pending = TRUE;
3038 RtlLeaveCriticalSection( &timerqueue.cs );
3040 if (submit_timer)
3041 tp_object_submit( this, FALSE );
3044 /***********************************************************************
3045 * TpSetWait (NTDLL.@)
3047 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
3049 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3050 ULONGLONG timestamp = MAXLONGLONG;
3052 TRACE( "%p %p %p\n", wait, handle, timeout );
3054 RtlEnterCriticalSection( &waitqueue.cs );
3056 assert( this->u.wait.bucket );
3057 this->u.wait.handle = handle;
3059 if (handle || this->u.wait.wait_pending)
3061 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3062 list_remove( &this->u.wait.wait_entry );
3064 /* Convert relative timeout to absolute timestamp. */
3065 if (handle && timeout)
3067 timestamp = timeout->QuadPart;
3068 if ((LONGLONG)timestamp < 0)
3070 LARGE_INTEGER now;
3071 NtQuerySystemTime( &now );
3072 timestamp = now.QuadPart - timestamp;
3076 /* Add wait object back into one of the queues. */
3077 if (handle)
3079 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3080 this->u.wait.wait_pending = TRUE;
3081 this->u.wait.timeout = timestamp;
3083 else
3085 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3086 this->u.wait.wait_pending = FALSE;
3089 /* Wake up the wait queue thread. */
3090 NtSetEvent( bucket->update_event, NULL );
3093 RtlLeaveCriticalSection( &waitqueue.cs );
3096 /***********************************************************************
3097 * TpSimpleTryPost (NTDLL.@)
3099 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
3100 TP_CALLBACK_ENVIRON *environment )
3102 struct threadpool_object *object;
3103 struct threadpool *pool;
3104 NTSTATUS status;
3106 TRACE( "%p %p %p\n", callback, userdata, environment );
3108 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3109 if (!object)
3110 return STATUS_NO_MEMORY;
3112 status = tp_threadpool_lock( &pool, environment );
3113 if (status)
3115 RtlFreeHeap( GetProcessHeap(), 0, object );
3116 return status;
3119 object->type = TP_OBJECT_TYPE_SIMPLE;
3120 object->u.simple.callback = callback;
3121 tp_object_initialize( object, pool, userdata, environment );
3123 return STATUS_SUCCESS;
3126 /***********************************************************************
3127 * TpStartAsyncIoOperation (NTDLL.@)
3129 void WINAPI TpStartAsyncIoOperation( TP_IO *io )
3131 struct threadpool_object *this = impl_from_TP_IO( io );
3133 TRACE( "%p\n", io );
3135 RtlEnterCriticalSection( &this->pool->cs );
3137 this->u.io.pending_count++;
3139 RtlLeaveCriticalSection( &this->pool->cs );
3142 /***********************************************************************
3143 * TpWaitForIoCompletion (NTDLL.@)
3145 void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3147 struct threadpool_object *this = impl_from_TP_IO( io );
3149 TRACE( "%p %d\n", io, cancel_pending );
3151 if (cancel_pending)
3152 tp_object_cancel( this );
3153 tp_object_wait( this, FALSE );
3156 /***********************************************************************
3157 * TpWaitForTimer (NTDLL.@)
3159 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
3161 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3163 TRACE( "%p %d\n", timer, cancel_pending );
3165 if (cancel_pending)
3166 tp_object_cancel( this );
3167 tp_object_wait( this, FALSE );
3170 /***********************************************************************
3171 * TpWaitForWait (NTDLL.@)
3173 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
3175 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3177 TRACE( "%p %d\n", wait, cancel_pending );
3179 if (cancel_pending)
3180 tp_object_cancel( this );
3181 tp_object_wait( this, FALSE );
3184 /***********************************************************************
3185 * TpWaitForWork (NTDLL.@)
3187 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
3189 struct threadpool_object *this = impl_from_TP_WORK( work );
3191 TRACE( "%p %u\n", work, cancel_pending );
3193 if (cancel_pending)
3194 tp_object_cancel( this );
3195 tp_object_wait( this, FALSE );
3198 /***********************************************************************
3199 * TpSetPoolStackInformation (NTDLL.@)
3201 NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3203 struct threadpool *this = impl_from_TP_POOL( pool );
3205 TRACE( "%p %p\n", pool, stack_info );
3207 if (!stack_info)
3208 return STATUS_INVALID_PARAMETER;
3210 RtlEnterCriticalSection( &this->cs );
3211 this->stack_info = *stack_info;
3212 RtlLeaveCriticalSection( &this->cs );
3214 return STATUS_SUCCESS;
3217 /***********************************************************************
3218 * TpQueryPoolStackInformation (NTDLL.@)
3220 NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3222 struct threadpool *this = impl_from_TP_POOL( pool );
3224 TRACE( "%p %p\n", pool, stack_info );
3226 if (!stack_info)
3227 return STATUS_INVALID_PARAMETER;
3229 RtlEnterCriticalSection( &this->cs );
3230 *stack_info = this->stack_info;
3231 RtlLeaveCriticalSection( &this->cs );
3233 return STATUS_SUCCESS;
3236 static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result )
3238 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3239 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3242 /***********************************************************************
3243 * RtlRegisterWait (NTDLL.@)
3245 * Registers a wait for a handle to become signaled.
3247 * PARAMS
3248 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3249 * Object [I] Object to wait to become signaled.
3250 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3251 * Context [I] Context to pass to the callback function when it is executed.
3252 * Milliseconds [I] Number of milliseconds to wait before timing out.
3253 * Flags [I] Flags. See notes.
3255 * RETURNS
3256 * Success: STATUS_SUCCESS.
3257 * Failure: Any NTSTATUS code.
3259 * NOTES
3260 * Flags can be one or more of the following:
3261 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3262 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3263 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3264 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3265 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3267 NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback,
3268 void *context, ULONG milliseconds, ULONG flags )
3270 struct threadpool_object *object;
3271 TP_CALLBACK_ENVIRON environment;
3272 LARGE_INTEGER timeout;
3273 NTSTATUS status;
3274 TP_WAIT *wait;
3276 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3277 out, handle, callback, context, milliseconds, flags );
3279 memset( &environment, 0, sizeof(environment) );
3280 environment.Version = 1;
3281 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3282 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3284 flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD);
3285 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3286 return status;
3288 object = impl_from_TP_WAIT(wait);
3289 object->u.wait.rtl_callback = callback;
3291 RtlEnterCriticalSection( &waitqueue.cs );
3292 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3294 *out = object;
3295 RtlLeaveCriticalSection( &waitqueue.cs );
3297 return STATUS_SUCCESS;
3300 /***********************************************************************
3301 * RtlDeregisterWaitEx (NTDLL.@)
3303 * Cancels a wait operation and frees the resources associated with calling
3304 * RtlRegisterWait().
3306 * PARAMS
3307 * WaitObject [I] Handle to the wait object to free.
3309 * RETURNS
3310 * Success: STATUS_SUCCESS.
3311 * Failure: Any NTSTATUS code.
3313 NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event )
3315 struct threadpool_object *object = handle;
3316 NTSTATUS status;
3318 TRACE( "handle %p, event %p\n", handle, event );
3320 if (!object) return STATUS_INVALID_HANDLE;
3322 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3324 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3325 else
3327 assert( object->completed_event == NULL );
3328 object->completed_event = event;
3331 RtlEnterCriticalSection( &object->pool->cs );
3332 if (object->num_pending_callbacks + object->num_running_callbacks
3333 + object->num_associated_callbacks) status = STATUS_PENDING;
3334 else status = STATUS_SUCCESS;
3335 RtlLeaveCriticalSection( &object->pool->cs );
3337 TpReleaseWait( (TP_WAIT *)object );
3338 return status;
3341 /***********************************************************************
3342 * RtlDeregisterWait (NTDLL.@)
3344 * Cancels a wait operation and frees the resources associated with calling
3345 * RtlRegisterWait().
3347 * PARAMS
3348 * WaitObject [I] Handle to the wait object to free.
3350 * RETURNS
3351 * Success: STATUS_SUCCESS.
3352 * Failure: Any NTSTATUS code.
3354 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
3356 return RtlDeregisterWaitEx(WaitHandle, NULL);