user32: Hook drawing menu buttons.
[wine.git] / dlls / ntdll / threadpool.c
blob421d1ade133f4a0c7a9e80e938ae92e6816bff97
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include <assert.h>
23 #include <stdarg.h>
24 #include <limits.h>
26 #define NONAMELESSUNION
27 #include "ntstatus.h"
28 #define WIN32_NO_STATUS
29 #include "winternl.h"
31 #include "wine/debug.h"
32 #include "wine/list.h"
34 #include "ntdll_misc.h"
36 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
39 * Old thread pooling API
42 struct rtl_work_item
44 PRTL_WORK_ITEM_ROUTINE function;
45 PVOID context;
48 #define EXPIRE_NEVER (~(ULONGLONG)0)
49 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
51 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
53 static struct
55 HANDLE compl_port;
56 RTL_CRITICAL_SECTION threadpool_compl_cs;
58 old_threadpool =
60 NULL, /* compl_port */
61 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
64 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
66 0, 0, &old_threadpool.threadpool_compl_cs,
67 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
68 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
71 struct timer_queue;
72 struct queue_timer
74 struct timer_queue *q;
75 struct list entry;
76 ULONG runcount; /* number of callbacks pending execution */
77 RTL_WAITORTIMERCALLBACKFUNC callback;
78 PVOID param;
79 DWORD period;
80 ULONG flags;
81 ULONGLONG expire;
82 BOOL destroy; /* timer should be deleted; once set, never unset */
83 HANDLE event; /* removal event */
86 struct timer_queue
88 DWORD magic;
89 RTL_CRITICAL_SECTION cs;
90 struct list timers; /* sorted by expiration time */
91 BOOL quit; /* queue should be deleted; once set, never unset */
92 HANDLE event;
93 HANDLE thread;
97 * Object-oriented thread pooling API
100 #define THREADPOOL_WORKER_TIMEOUT 5000
101 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
103 /* internal threadpool representation */
104 struct threadpool
106 LONG refcount;
107 LONG objcount;
108 BOOL shutdown;
109 CRITICAL_SECTION cs;
110 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
111 struct list pools[3];
112 RTL_CONDITION_VARIABLE update_event;
113 /* information about worker threads, locked via .cs */
114 int max_workers;
115 int min_workers;
116 int num_workers;
117 int num_busy_workers;
118 HANDLE compl_port;
119 TP_POOL_STACK_INFORMATION stack_info;
122 enum threadpool_objtype
124 TP_OBJECT_TYPE_SIMPLE,
125 TP_OBJECT_TYPE_WORK,
126 TP_OBJECT_TYPE_TIMER,
127 TP_OBJECT_TYPE_WAIT,
128 TP_OBJECT_TYPE_IO,
131 struct io_completion
133 IO_STATUS_BLOCK iosb;
134 ULONG_PTR cvalue;
137 /* internal threadpool object representation */
138 struct threadpool_object
140 void *win32_callback; /* leave space for kernelbase to store win32 callback */
141 LONG refcount;
142 BOOL shutdown;
143 /* read-only information */
144 enum threadpool_objtype type;
145 struct threadpool *pool;
146 struct threadpool_group *group;
147 PVOID userdata;
148 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
149 PTP_SIMPLE_CALLBACK finalization_callback;
150 BOOL may_run_long;
151 HMODULE race_dll;
152 TP_CALLBACK_PRIORITY priority;
153 /* information about the group, locked via .group->cs */
154 struct list group_entry;
155 BOOL is_group_member;
156 /* information about the pool, locked via .pool->cs */
157 struct list pool_entry;
158 RTL_CONDITION_VARIABLE finished_event;
159 RTL_CONDITION_VARIABLE group_finished_event;
160 HANDLE completed_event;
161 LONG num_pending_callbacks;
162 LONG num_running_callbacks;
163 LONG num_associated_callbacks;
164 /* arguments for callback */
165 union
167 struct
169 PTP_SIMPLE_CALLBACK callback;
170 } simple;
171 struct
173 PTP_WORK_CALLBACK callback;
174 } work;
175 struct
177 PTP_TIMER_CALLBACK callback;
178 /* information about the timer, locked via timerqueue.cs */
179 BOOL timer_initialized;
180 BOOL timer_pending;
181 struct list timer_entry;
182 BOOL timer_set;
183 ULONGLONG timeout;
184 LONG period;
185 LONG window_length;
186 } timer;
187 struct
189 PTP_WAIT_CALLBACK callback;
190 LONG signaled;
191 /* information about the wait object, locked via waitqueue.cs */
192 struct waitqueue_bucket *bucket;
193 BOOL wait_pending;
194 struct list wait_entry;
195 ULONGLONG timeout;
196 HANDLE handle;
197 DWORD flags;
198 RTL_WAITORTIMERCALLBACKFUNC rtl_callback;
199 } wait;
200 struct
202 PTP_IO_CALLBACK callback;
203 /* locked via .pool->cs */
204 unsigned int pending_count, skipped_count, completion_count, completion_max;
205 BOOL shutting_down;
206 struct io_completion *completions;
207 } io;
208 } u;
211 /* internal threadpool instance representation */
212 struct threadpool_instance
214 struct threadpool_object *object;
215 DWORD threadid;
216 BOOL associated;
217 BOOL may_run_long;
218 struct
220 CRITICAL_SECTION *critical_section;
221 HANDLE mutex;
222 HANDLE semaphore;
223 LONG semaphore_count;
224 HANDLE event;
225 HMODULE library;
226 } cleanup;
229 /* internal threadpool group representation */
230 struct threadpool_group
232 LONG refcount;
233 BOOL shutdown;
234 CRITICAL_SECTION cs;
235 /* list of group members, locked via .cs */
236 struct list members;
239 /* global timerqueue object */
240 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
242 static struct
244 CRITICAL_SECTION cs;
245 LONG objcount;
246 BOOL thread_running;
247 struct list pending_timers;
248 RTL_CONDITION_VARIABLE update_event;
250 timerqueue =
252 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
253 0, /* objcount */
254 FALSE, /* thread_running */
255 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
256 RTL_CONDITION_VARIABLE_INIT /* update_event */
259 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
261 0, 0, &timerqueue.cs,
262 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
263 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
266 /* global waitqueue object */
267 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
269 static struct
271 CRITICAL_SECTION cs;
272 LONG num_buckets;
273 struct list buckets;
275 waitqueue =
277 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
278 0, /* num_buckets */
279 LIST_INIT( waitqueue.buckets ) /* buckets */
282 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
284 0, 0, &waitqueue.cs,
285 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
286 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
289 struct waitqueue_bucket
291 struct list bucket_entry;
292 LONG objcount;
293 struct list reserved;
294 struct list waiting;
295 HANDLE update_event;
296 BOOL alertable;
299 /* global I/O completion queue object */
300 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
302 static struct
304 CRITICAL_SECTION cs;
305 LONG objcount;
306 BOOL thread_running;
307 HANDLE port;
308 RTL_CONDITION_VARIABLE update_event;
310 ioqueue =
312 .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
315 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
317 0, 0, &ioqueue.cs,
318 { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
319 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
322 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
324 return (struct threadpool *)pool;
327 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
329 struct threadpool_object *object = (struct threadpool_object *)work;
330 assert( object->type == TP_OBJECT_TYPE_WORK );
331 return object;
334 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
336 struct threadpool_object *object = (struct threadpool_object *)timer;
337 assert( object->type == TP_OBJECT_TYPE_TIMER );
338 return object;
341 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
343 struct threadpool_object *object = (struct threadpool_object *)wait;
344 assert( object->type == TP_OBJECT_TYPE_WAIT );
345 return object;
348 static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
350 struct threadpool_object *object = (struct threadpool_object *)io;
351 assert( object->type == TP_OBJECT_TYPE_IO );
352 return object;
355 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
357 return (struct threadpool_group *)group;
360 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
362 return (struct threadpool_instance *)instance;
365 static void CALLBACK threadpool_worker_proc( void *param );
366 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
367 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
368 static void tp_object_prepare_shutdown( struct threadpool_object *object );
369 static BOOL tp_object_release( struct threadpool_object *object );
370 static struct threadpool *default_threadpool = NULL;
372 static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
374 unsigned int new_capacity, max_capacity;
375 void *new_elements;
377 if (count <= *capacity)
378 return TRUE;
380 max_capacity = ~(SIZE_T)0 / size;
381 if (count > max_capacity)
382 return FALSE;
384 new_capacity = max(4, *capacity);
385 while (new_capacity < count && new_capacity <= max_capacity / 2)
386 new_capacity *= 2;
387 if (new_capacity < count)
388 new_capacity = max_capacity;
390 if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
391 return FALSE;
393 *elements = new_elements;
394 *capacity = new_capacity;
396 return TRUE;
399 static void set_thread_name(const WCHAR *name)
401 THREAD_NAME_INFORMATION info;
403 RtlInitUnicodeString(&info.ThreadName, name);
404 NtSetInformationThread(GetCurrentThread(), ThreadNameInformation, &info, sizeof(info));
407 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
409 struct rtl_work_item *item = userdata;
411 TRACE("executing %p(%p)\n", item->function, item->context);
412 item->function( item->context );
414 RtlFreeHeap( GetProcessHeap(), 0, item );
417 /***********************************************************************
418 * RtlQueueWorkItem (NTDLL.@)
420 * Queues a work item into a thread in the thread pool.
422 * PARAMS
423 * function [I] Work function to execute.
424 * context [I] Context to pass to the work function when it is executed.
425 * flags [I] Flags. See notes.
427 * RETURNS
428 * Success: STATUS_SUCCESS.
429 * Failure: Any NTSTATUS code.
431 * NOTES
432 * Flags can be one or more of the following:
433 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
434 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
435 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
436 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
437 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
439 NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
441 TP_CALLBACK_ENVIRON environment;
442 struct rtl_work_item *item;
443 NTSTATUS status;
445 TRACE( "%p %p %u\n", function, context, flags );
447 item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
448 if (!item)
449 return STATUS_NO_MEMORY;
451 memset( &environment, 0, sizeof(environment) );
452 environment.Version = 1;
453 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
454 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
456 item->function = function;
457 item->context = context;
459 status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
460 if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
461 return status;
464 /***********************************************************************
465 * iocp_poller - get completion events and run callbacks
467 static DWORD CALLBACK iocp_poller(LPVOID Arg)
469 HANDLE cport = Arg;
471 while( TRUE )
473 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
474 LPVOID overlapped;
475 IO_STATUS_BLOCK iosb;
476 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
477 if (res)
479 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
481 else
483 DWORD transferred = 0;
484 DWORD err = 0;
486 if (iosb.u.Status == STATUS_SUCCESS)
487 transferred = iosb.Information;
488 else
489 err = RtlNtStatusToDosError(iosb.u.Status);
491 callback( err, transferred, overlapped );
494 return 0;
497 /***********************************************************************
498 * RtlSetIoCompletionCallback (NTDLL.@)
500 * Binds a handle to a thread pool's completion port, and possibly
501 * starts a non-I/O thread to monitor this port and call functions back.
503 * PARAMS
504 * FileHandle [I] Handle to bind to a completion port.
505 * Function [I] Callback function to call on I/O completions.
506 * Flags [I] Not used.
508 * RETURNS
509 * Success: STATUS_SUCCESS.
510 * Failure: Any NTSTATUS code.
513 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
515 IO_STATUS_BLOCK iosb;
516 FILE_COMPLETION_INFORMATION info;
518 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
520 if (!old_threadpool.compl_port)
522 NTSTATUS res = STATUS_SUCCESS;
524 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
525 if (!old_threadpool.compl_port)
527 HANDLE cport;
529 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
530 if (!res)
532 /* FIXME native can start additional threads in case of e.g. hung callback function. */
533 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
534 if (!res)
535 old_threadpool.compl_port = cport;
536 else
537 NtClose( cport );
540 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
541 if (res) return res;
544 info.CompletionPort = old_threadpool.compl_port;
545 info.CompletionKey = (ULONG_PTR)Function;
547 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
550 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
552 if (timeout == INFINITE) return NULL;
553 pTime->QuadPart = (ULONGLONG)timeout * -10000;
554 return pTime;
558 /************************** Timer Queue Impl **************************/
560 static void queue_remove_timer(struct queue_timer *t)
562 /* We MUST hold the queue cs while calling this function. This ensures
563 that we cannot queue another callback for this timer. The runcount
564 being zero makes sure we don't have any already queued. */
565 struct timer_queue *q = t->q;
567 assert(t->runcount == 0);
568 assert(t->destroy);
570 list_remove(&t->entry);
571 if (t->event)
572 NtSetEvent(t->event, NULL);
573 RtlFreeHeap(GetProcessHeap(), 0, t);
575 if (q->quit && list_empty(&q->timers))
576 NtSetEvent(q->event, NULL);
579 static void timer_cleanup_callback(struct queue_timer *t)
581 struct timer_queue *q = t->q;
582 RtlEnterCriticalSection(&q->cs);
584 assert(0 < t->runcount);
585 --t->runcount;
587 if (t->destroy && t->runcount == 0)
588 queue_remove_timer(t);
590 RtlLeaveCriticalSection(&q->cs);
593 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
595 struct queue_timer *t = p;
596 t->callback(t->param, TRUE);
597 timer_cleanup_callback(t);
598 return 0;
601 static inline ULONGLONG queue_current_time(void)
603 LARGE_INTEGER now, freq;
604 NtQueryPerformanceCounter(&now, &freq);
605 return now.QuadPart * 1000 / freq.QuadPart;
608 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
609 BOOL set_event)
611 /* We MUST hold the queue cs while calling this function. */
612 struct timer_queue *q = t->q;
613 struct list *ptr = &q->timers;
615 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
617 if (time != EXPIRE_NEVER)
618 LIST_FOR_EACH(ptr, &q->timers)
620 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
621 if (time < cur->expire)
622 break;
624 list_add_before(ptr, &t->entry);
626 t->expire = time;
628 /* If we insert at the head of the list, we need to expire sooner
629 than expected. */
630 if (set_event && &t->entry == list_head(&q->timers))
631 NtSetEvent(q->event, NULL);
634 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
635 BOOL set_event)
637 /* We MUST hold the queue cs while calling this function. */
638 list_remove(&t->entry);
639 queue_add_timer(t, time, set_event);
642 static void queue_timer_expire(struct timer_queue *q)
644 struct queue_timer *t = NULL;
646 RtlEnterCriticalSection(&q->cs);
647 if (list_head(&q->timers))
649 ULONGLONG now, next;
650 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
651 if (!t->destroy && t->expire <= ((now = queue_current_time())))
653 ++t->runcount;
654 if (t->period)
656 next = t->expire + t->period;
657 /* avoid trigger cascade if overloaded / hibernated */
658 if (next < now)
659 next = now + t->period;
661 else
662 next = EXPIRE_NEVER;
663 queue_move_timer(t, next, FALSE);
665 else
666 t = NULL;
668 RtlLeaveCriticalSection(&q->cs);
670 if (t)
672 if (t->flags & WT_EXECUTEINTIMERTHREAD)
673 timer_callback_wrapper(t);
674 else
676 ULONG flags
677 = (t->flags
678 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
679 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
680 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
681 if (status != STATUS_SUCCESS)
682 timer_cleanup_callback(t);
687 static ULONG queue_get_timeout(struct timer_queue *q)
689 struct queue_timer *t;
690 ULONG timeout = INFINITE;
692 RtlEnterCriticalSection(&q->cs);
693 if (list_head(&q->timers))
695 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
696 assert(!t->destroy || t->expire == EXPIRE_NEVER);
698 if (t->expire != EXPIRE_NEVER)
700 ULONGLONG time = queue_current_time();
701 timeout = t->expire < time ? 0 : t->expire - time;
704 RtlLeaveCriticalSection(&q->cs);
706 return timeout;
709 static void WINAPI timer_queue_thread_proc(LPVOID p)
711 struct timer_queue *q = p;
712 ULONG timeout_ms;
714 set_thread_name(L"wine_threadpool_timer_queue");
715 timeout_ms = INFINITE;
716 for (;;)
718 LARGE_INTEGER timeout;
719 NTSTATUS status;
720 BOOL done = FALSE;
722 status = NtWaitForSingleObject(
723 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
725 if (status == STATUS_WAIT_0)
727 /* There are two possible ways to trigger the event. Either
728 we are quitting and the last timer got removed, or a new
729 timer got put at the head of the list so we need to adjust
730 our timeout. */
731 RtlEnterCriticalSection(&q->cs);
732 if (q->quit && list_empty(&q->timers))
733 done = TRUE;
734 RtlLeaveCriticalSection(&q->cs);
736 else if (status == STATUS_TIMEOUT)
737 queue_timer_expire(q);
739 if (done)
740 break;
742 timeout_ms = queue_get_timeout(q);
745 NtClose(q->event);
746 RtlDeleteCriticalSection(&q->cs);
747 q->magic = 0;
748 RtlFreeHeap(GetProcessHeap(), 0, q);
749 RtlExitUserThread( 0 );
752 static void queue_destroy_timer(struct queue_timer *t)
754 /* We MUST hold the queue cs while calling this function. */
755 t->destroy = TRUE;
756 if (t->runcount == 0)
757 /* Ensure a timer is promptly removed. If callbacks are pending,
758 it will be removed after the last one finishes by the callback
759 cleanup wrapper. */
760 queue_remove_timer(t);
761 else
762 /* Make sure no destroyed timer masks an active timer at the head
763 of the sorted list. */
764 queue_move_timer(t, EXPIRE_NEVER, FALSE);
767 /***********************************************************************
768 * RtlCreateTimerQueue (NTDLL.@)
770 * Creates a timer queue object and returns a handle to it.
772 * PARAMS
773 * NewTimerQueue [O] The newly created queue.
775 * RETURNS
776 * Success: STATUS_SUCCESS.
777 * Failure: Any NTSTATUS code.
779 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
781 NTSTATUS status;
782 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
783 if (!q)
784 return STATUS_NO_MEMORY;
786 RtlInitializeCriticalSection(&q->cs);
787 list_init(&q->timers);
788 q->quit = FALSE;
789 q->magic = TIMER_QUEUE_MAGIC;
790 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
791 if (status != STATUS_SUCCESS)
793 RtlFreeHeap(GetProcessHeap(), 0, q);
794 return status;
796 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
797 timer_queue_thread_proc, q, &q->thread, NULL);
798 if (status != STATUS_SUCCESS)
800 NtClose(q->event);
801 RtlFreeHeap(GetProcessHeap(), 0, q);
802 return status;
805 *NewTimerQueue = q;
806 return STATUS_SUCCESS;
809 /***********************************************************************
810 * RtlDeleteTimerQueueEx (NTDLL.@)
812 * Deletes a timer queue object.
814 * PARAMS
815 * TimerQueue [I] The timer queue to destroy.
816 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
817 * wait until all timers are finished firing before
818 * returning. Otherwise, return immediately and set the
819 * event when all timers are done.
821 * RETURNS
822 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
823 * Failure: Any NTSTATUS code.
825 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
827 struct timer_queue *q = TimerQueue;
828 struct queue_timer *t, *temp;
829 HANDLE thread;
830 NTSTATUS status;
832 if (!q || q->magic != TIMER_QUEUE_MAGIC)
833 return STATUS_INVALID_HANDLE;
835 thread = q->thread;
837 RtlEnterCriticalSection(&q->cs);
838 q->quit = TRUE;
839 if (list_head(&q->timers))
840 /* When the last timer is removed, it will signal the timer thread to
841 exit... */
842 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
843 queue_destroy_timer(t);
844 else
845 /* However if we have none, we must do it ourselves. */
846 NtSetEvent(q->event, NULL);
847 RtlLeaveCriticalSection(&q->cs);
849 if (CompletionEvent == INVALID_HANDLE_VALUE)
851 NtWaitForSingleObject(thread, FALSE, NULL);
852 status = STATUS_SUCCESS;
854 else
856 if (CompletionEvent)
858 FIXME("asynchronous return on completion event unimplemented\n");
859 NtWaitForSingleObject(thread, FALSE, NULL);
860 NtSetEvent(CompletionEvent, NULL);
862 status = STATUS_PENDING;
865 NtClose(thread);
866 return status;
869 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
871 static struct timer_queue *default_timer_queue;
873 if (TimerQueue)
874 return TimerQueue;
875 else
877 if (!default_timer_queue)
879 HANDLE q;
880 NTSTATUS status = RtlCreateTimerQueue(&q);
881 if (status == STATUS_SUCCESS)
883 PVOID p = InterlockedCompareExchangePointer( (void **) &default_timer_queue, q, NULL );
884 if (p)
885 /* Got beat to the punch. */
886 RtlDeleteTimerQueueEx(q, NULL);
889 return default_timer_queue;
893 /***********************************************************************
894 * RtlCreateTimer (NTDLL.@)
896 * Creates a new timer associated with the given queue.
898 * PARAMS
899 * TimerQueue [I] The queue to hold the timer.
900 * NewTimer [O] The newly created timer.
901 * Callback [I] The callback to fire.
902 * Parameter [I] The argument for the callback.
903 * DueTime [I] The delay, in milliseconds, before first firing the
904 * timer.
905 * Period [I] The period, in milliseconds, at which to fire the timer
906 * after the first callback. If zero, the timer will only
907 * fire once. It still needs to be deleted with
908 * RtlDeleteTimer.
909 * Flags [I] Flags controlling the execution of the callback. In
910 * addition to the WT_* thread pool flags (see
911 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
912 * WT_EXECUTEONLYONCE are supported.
914 * RETURNS
915 * Success: STATUS_SUCCESS.
916 * Failure: Any NTSTATUS code.
918 NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer,
919 RTL_WAITORTIMERCALLBACKFUNC Callback,
920 PVOID Parameter, DWORD DueTime, DWORD Period,
921 ULONG Flags)
923 NTSTATUS status;
924 struct queue_timer *t;
925 struct timer_queue *q = get_timer_queue(TimerQueue);
927 if (!q) return STATUS_NO_MEMORY;
928 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
930 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
931 if (!t)
932 return STATUS_NO_MEMORY;
934 t->q = q;
935 t->runcount = 0;
936 t->callback = Callback;
937 t->param = Parameter;
938 t->period = Period;
939 t->flags = Flags;
940 t->destroy = FALSE;
941 t->event = NULL;
943 status = STATUS_SUCCESS;
944 RtlEnterCriticalSection(&q->cs);
945 if (q->quit)
946 status = STATUS_INVALID_HANDLE;
947 else
948 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
949 RtlLeaveCriticalSection(&q->cs);
951 if (status == STATUS_SUCCESS)
952 *NewTimer = t;
953 else
954 RtlFreeHeap(GetProcessHeap(), 0, t);
956 return status;
959 /***********************************************************************
960 * RtlUpdateTimer (NTDLL.@)
962 * Changes the time at which a timer expires.
964 * PARAMS
965 * TimerQueue [I] The queue that holds the timer.
966 * Timer [I] The timer to update.
967 * DueTime [I] The delay, in milliseconds, before next firing the timer.
968 * Period [I] The period, in milliseconds, at which to fire the timer
969 * after the first callback. If zero, the timer will not
970 * refire once. It still needs to be deleted with
971 * RtlDeleteTimer.
973 * RETURNS
974 * Success: STATUS_SUCCESS.
975 * Failure: Any NTSTATUS code.
977 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
978 DWORD DueTime, DWORD Period)
980 struct queue_timer *t = Timer;
981 struct timer_queue *q = t->q;
983 RtlEnterCriticalSection(&q->cs);
984 /* Can't change a timer if it was once-only or destroyed. */
985 if (t->expire != EXPIRE_NEVER)
987 t->period = Period;
988 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
990 RtlLeaveCriticalSection(&q->cs);
992 return STATUS_SUCCESS;
995 /***********************************************************************
996 * RtlDeleteTimer (NTDLL.@)
998 * Cancels a timer-queue timer.
1000 * PARAMS
1001 * TimerQueue [I] The queue that holds the timer.
1002 * Timer [I] The timer to update.
1003 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1004 * wait until the timer is finished firing all pending
1005 * callbacks before returning. Otherwise, return
1006 * immediately and set the timer is done.
1008 * RETURNS
1009 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1010 or if the completion event is NULL.
1011 * Failure: Any NTSTATUS code.
1013 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1014 HANDLE CompletionEvent)
1016 struct queue_timer *t = Timer;
1017 struct timer_queue *q;
1018 NTSTATUS status = STATUS_PENDING;
1019 HANDLE event = NULL;
1021 if (!Timer)
1022 return STATUS_INVALID_PARAMETER_1;
1023 q = t->q;
1024 if (CompletionEvent == INVALID_HANDLE_VALUE)
1026 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1027 if (status == STATUS_SUCCESS)
1028 status = STATUS_PENDING;
1030 else if (CompletionEvent)
1031 event = CompletionEvent;
1033 RtlEnterCriticalSection(&q->cs);
1034 t->event = event;
1035 if (t->runcount == 0 && event)
1036 status = STATUS_SUCCESS;
1037 queue_destroy_timer(t);
1038 RtlLeaveCriticalSection(&q->cs);
1040 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1042 if (status == STATUS_PENDING)
1044 NtWaitForSingleObject(event, FALSE, NULL);
1045 status = STATUS_SUCCESS;
1047 NtClose(event);
1050 return status;
1053 /***********************************************************************
1054 * timerqueue_thread_proc (internal)
1056 static void CALLBACK timerqueue_thread_proc( void *param )
1058 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1059 struct threadpool_object *other_timer;
1060 LARGE_INTEGER now, timeout;
1061 struct list *ptr;
1063 TRACE( "starting timer queue thread\n" );
1064 set_thread_name(L"wine_threadpool_timerqueue");
1066 RtlEnterCriticalSection( &timerqueue.cs );
1067 for (;;)
1069 NtQuerySystemTime( &now );
1071 /* Check for expired timers. */
1072 while ((ptr = list_head( &timerqueue.pending_timers )))
1074 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1075 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1076 assert( timer->u.timer.timer_pending );
1077 if (timer->u.timer.timeout > now.QuadPart)
1078 break;
1080 /* Queue a new callback in one of the worker threads. */
1081 list_remove( &timer->u.timer.timer_entry );
1082 timer->u.timer.timer_pending = FALSE;
1083 tp_object_submit( timer, FALSE );
1085 /* Insert the timer back into the queue, except it's marked for shutdown. */
1086 if (timer->u.timer.period && !timer->shutdown)
1088 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1089 if (timer->u.timer.timeout <= now.QuadPart)
1090 timer->u.timer.timeout = now.QuadPart + 1;
1092 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1093 struct threadpool_object, u.timer.timer_entry )
1095 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1096 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1097 break;
1099 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1100 timer->u.timer.timer_pending = TRUE;
1104 timeout_lower = timeout_upper = MAXLONGLONG;
1106 /* Determine next timeout and use the window length to optimize wakeup times. */
1107 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1108 struct threadpool_object, u.timer.timer_entry )
1110 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1111 if (other_timer->u.timer.timeout >= timeout_upper)
1112 break;
1114 timeout_lower = other_timer->u.timer.timeout;
1115 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1116 if (new_timeout < timeout_upper)
1117 timeout_upper = new_timeout;
1120 /* Wait for timer update events or until the next timer expires. */
1121 if (timerqueue.objcount)
1123 timeout.QuadPart = timeout_lower;
1124 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1125 continue;
1128 /* All timers have been destroyed, if no new timers are created
1129 * within some amount of time, then we can shutdown this thread. */
1130 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1131 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1132 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1134 break;
1138 timerqueue.thread_running = FALSE;
1139 RtlLeaveCriticalSection( &timerqueue.cs );
1141 TRACE( "terminating timer queue thread\n" );
1142 RtlExitUserThread( 0 );
1145 /***********************************************************************
1146 * tp_new_worker_thread (internal)
1148 * Create and account a new worker thread for the desired pool.
1150 static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
1152 HANDLE thread;
1153 NTSTATUS status;
1155 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1156 threadpool_worker_proc, pool, &thread, NULL );
1157 if (status == STATUS_SUCCESS)
1159 InterlockedIncrement( &pool->refcount );
1160 pool->num_workers++;
1161 NtClose( thread );
1163 return status;
1166 /***********************************************************************
1167 * tp_timerqueue_lock (internal)
1169 * Acquires a lock on the global timerqueue. When the lock is acquired
1170 * successfully, it is guaranteed that the timer thread is running.
1172 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1174 NTSTATUS status = STATUS_SUCCESS;
1175 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1177 timer->u.timer.timer_initialized = FALSE;
1178 timer->u.timer.timer_pending = FALSE;
1179 timer->u.timer.timer_set = FALSE;
1180 timer->u.timer.timeout = 0;
1181 timer->u.timer.period = 0;
1182 timer->u.timer.window_length = 0;
1184 RtlEnterCriticalSection( &timerqueue.cs );
1186 /* Make sure that the timerqueue thread is running. */
1187 if (!timerqueue.thread_running)
1189 HANDLE thread;
1190 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1191 timerqueue_thread_proc, NULL, &thread, NULL );
1192 if (status == STATUS_SUCCESS)
1194 timerqueue.thread_running = TRUE;
1195 NtClose( thread );
1199 if (status == STATUS_SUCCESS)
1201 timer->u.timer.timer_initialized = TRUE;
1202 timerqueue.objcount++;
1205 RtlLeaveCriticalSection( &timerqueue.cs );
1206 return status;
1209 /***********************************************************************
1210 * tp_timerqueue_unlock (internal)
1212 * Releases a lock on the global timerqueue.
1214 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1216 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1218 RtlEnterCriticalSection( &timerqueue.cs );
1219 if (timer->u.timer.timer_initialized)
1221 /* If timer was pending, remove it. */
1222 if (timer->u.timer.timer_pending)
1224 list_remove( &timer->u.timer.timer_entry );
1225 timer->u.timer.timer_pending = FALSE;
1228 /* If the last timer object was destroyed, then wake up the thread. */
1229 if (!--timerqueue.objcount)
1231 assert( list_empty( &timerqueue.pending_timers ) );
1232 RtlWakeAllConditionVariable( &timerqueue.update_event );
1235 timer->u.timer.timer_initialized = FALSE;
1237 RtlLeaveCriticalSection( &timerqueue.cs );
1240 /***********************************************************************
1241 * waitqueue_thread_proc (internal)
1243 static void CALLBACK waitqueue_thread_proc( void *param )
1245 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1246 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1247 struct waitqueue_bucket *bucket = param;
1248 struct threadpool_object *wait, *next;
1249 LARGE_INTEGER now, timeout;
1250 DWORD num_handles;
1251 NTSTATUS status;
1253 TRACE( "starting wait queue thread\n" );
1254 set_thread_name(L"wine_threadpool_waitqueue");
1256 RtlEnterCriticalSection( &waitqueue.cs );
1258 for (;;)
1260 NtQuerySystemTime( &now );
1261 timeout.QuadPart = MAXLONGLONG;
1262 num_handles = 0;
1264 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1265 u.wait.wait_entry )
1267 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1268 if (wait->u.wait.timeout <= now.QuadPart)
1270 /* Wait object timed out. */
1271 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1273 list_remove( &wait->u.wait.wait_entry );
1274 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1276 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1278 InterlockedIncrement( &wait->refcount );
1279 wait->num_pending_callbacks++;
1280 RtlEnterCriticalSection( &wait->pool->cs );
1281 tp_object_execute( wait, TRUE );
1282 RtlLeaveCriticalSection( &wait->pool->cs );
1283 tp_object_release( wait );
1285 else tp_object_submit( wait, FALSE );
1287 else
1289 if (wait->u.wait.timeout < timeout.QuadPart)
1290 timeout.QuadPart = wait->u.wait.timeout;
1292 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1293 InterlockedIncrement( &wait->refcount );
1294 objects[num_handles] = wait;
1295 handles[num_handles] = wait->u.wait.handle;
1296 num_handles++;
1300 if (!bucket->objcount)
1302 /* All wait objects have been destroyed, if no new wait objects are created
1303 * within some amount of time, then we can shutdown this thread. */
1304 assert( num_handles == 0 );
1305 RtlLeaveCriticalSection( &waitqueue.cs );
1306 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1307 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
1308 RtlEnterCriticalSection( &waitqueue.cs );
1310 if (status == STATUS_TIMEOUT && !bucket->objcount)
1311 break;
1313 else
1315 handles[num_handles] = bucket->update_event;
1316 RtlLeaveCriticalSection( &waitqueue.cs );
1317 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
1318 RtlEnterCriticalSection( &waitqueue.cs );
1320 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1322 wait = objects[status - STATUS_WAIT_0];
1323 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1324 if (wait->u.wait.bucket)
1326 /* Wait object signaled. */
1327 assert( wait->u.wait.bucket == bucket );
1328 if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
1330 list_remove( &wait->u.wait.wait_entry );
1331 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1333 if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
1335 wait->u.wait.signaled++;
1336 wait->num_pending_callbacks++;
1337 RtlEnterCriticalSection( &wait->pool->cs );
1338 tp_object_execute( wait, TRUE );
1339 RtlLeaveCriticalSection( &wait->pool->cs );
1341 else tp_object_submit( wait, TRUE );
1343 else
1344 WARN("wait object %p triggered while object was destroyed\n", wait);
1347 /* Release temporary references to wait objects. */
1348 while (num_handles)
1350 wait = objects[--num_handles];
1351 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1352 tp_object_release( wait );
1356 /* Try to merge bucket with other threads. */
1357 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1358 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1360 struct waitqueue_bucket *other_bucket;
1361 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1363 if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
1364 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1366 other_bucket->objcount += bucket->objcount;
1367 bucket->objcount = 0;
1369 /* Update reserved list. */
1370 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1372 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1373 wait->u.wait.bucket = other_bucket;
1375 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1377 /* Update waiting list. */
1378 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1380 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1381 wait->u.wait.bucket = other_bucket;
1383 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1385 /* Move bucket to the end, to keep the probability of
1386 * newly added wait objects as small as possible. */
1387 list_remove( &bucket->bucket_entry );
1388 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1390 NtSetEvent( other_bucket->update_event, NULL );
1391 break;
1397 /* Remove this bucket from the list. */
1398 list_remove( &bucket->bucket_entry );
1399 if (!--waitqueue.num_buckets)
1400 assert( list_empty( &waitqueue.buckets ) );
1402 RtlLeaveCriticalSection( &waitqueue.cs );
1404 TRACE( "terminating wait queue thread\n" );
1406 assert( bucket->objcount == 0 );
1407 assert( list_empty( &bucket->reserved ) );
1408 assert( list_empty( &bucket->waiting ) );
1409 NtClose( bucket->update_event );
1411 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1412 RtlExitUserThread( 0 );
1415 /***********************************************************************
1416 * tp_waitqueue_lock (internal)
1418 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1420 struct waitqueue_bucket *bucket;
1421 NTSTATUS status;
1422 HANDLE thread;
1423 BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
1424 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1426 wait->u.wait.signaled = 0;
1427 wait->u.wait.bucket = NULL;
1428 wait->u.wait.wait_pending = FALSE;
1429 wait->u.wait.timeout = 0;
1430 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1432 RtlEnterCriticalSection( &waitqueue.cs );
1434 /* Try to assign to existing bucket if possible. */
1435 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1437 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
1439 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1440 wait->u.wait.bucket = bucket;
1441 bucket->objcount++;
1443 status = STATUS_SUCCESS;
1444 goto out;
1448 /* Create a new bucket and corresponding worker thread. */
1449 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1450 if (!bucket)
1452 status = STATUS_NO_MEMORY;
1453 goto out;
1456 bucket->objcount = 0;
1457 bucket->alertable = alertable;
1458 list_init( &bucket->reserved );
1459 list_init( &bucket->waiting );
1461 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1462 NULL, SynchronizationEvent, FALSE );
1463 if (status)
1465 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1466 goto out;
1469 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
1470 waitqueue_thread_proc, bucket, &thread, NULL );
1471 if (status == STATUS_SUCCESS)
1473 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1474 waitqueue.num_buckets++;
1476 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1477 wait->u.wait.bucket = bucket;
1478 bucket->objcount++;
1480 NtClose( thread );
1482 else
1484 NtClose( bucket->update_event );
1485 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1488 out:
1489 RtlLeaveCriticalSection( &waitqueue.cs );
1490 return status;
1493 /***********************************************************************
1494 * tp_waitqueue_unlock (internal)
1496 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1498 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1500 RtlEnterCriticalSection( &waitqueue.cs );
1501 if (wait->u.wait.bucket)
1503 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1504 assert( bucket->objcount > 0 );
1506 list_remove( &wait->u.wait.wait_entry );
1507 wait->u.wait.bucket = NULL;
1508 bucket->objcount--;
1510 NtSetEvent( bucket->update_event, NULL );
1512 RtlLeaveCriticalSection( &waitqueue.cs );
1515 static void CALLBACK ioqueue_thread_proc( void *param )
1517 struct io_completion *completion;
1518 struct threadpool_object *io;
1519 IO_STATUS_BLOCK iosb;
1520 ULONG_PTR key, value;
1521 BOOL destroy, skip;
1522 NTSTATUS status;
1524 TRACE( "starting I/O completion thread\n" );
1525 set_thread_name(L"wine_threadpool_ioqueue");
1527 RtlEnterCriticalSection( &ioqueue.cs );
1529 for (;;)
1531 RtlLeaveCriticalSection( &ioqueue.cs );
1532 if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
1533 ERR("NtRemoveIoCompletion failed, status %#x.\n", status);
1534 RtlEnterCriticalSection( &ioqueue.cs );
1536 destroy = skip = FALSE;
1537 io = (struct threadpool_object *)key;
1539 TRACE( "io %p, iosb.Status %#x.\n", io, iosb.u.Status );
1541 if (io && (io->shutdown || io->u.io.shutting_down))
1543 RtlEnterCriticalSection( &io->pool->cs );
1544 if (!io->u.io.pending_count)
1546 if (io->u.io.skipped_count)
1547 --io->u.io.skipped_count;
1549 if (io->u.io.skipped_count)
1550 skip = TRUE;
1551 else
1552 destroy = TRUE;
1554 RtlLeaveCriticalSection( &io->pool->cs );
1555 if (skip) continue;
1558 if (destroy)
1560 --ioqueue.objcount;
1561 TRACE( "Releasing io %p.\n", io );
1562 io->shutdown = TRUE;
1563 tp_object_release( io );
1565 else if (io)
1567 RtlEnterCriticalSection( &io->pool->cs );
1569 TRACE( "pending_count %u.\n", io->u.io.pending_count );
1571 if (io->u.io.pending_count)
1573 --io->u.io.pending_count;
1574 if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
1575 io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
1577 ERR( "Failed to allocate memory.\n" );
1578 RtlLeaveCriticalSection( &io->pool->cs );
1579 continue;
1582 completion = &io->u.io.completions[io->u.io.completion_count++];
1583 completion->iosb = iosb;
1584 completion->cvalue = value;
1586 tp_object_submit( io, FALSE );
1588 RtlLeaveCriticalSection( &io->pool->cs );
1591 if (!ioqueue.objcount)
1593 /* All I/O objects have been destroyed; if no new objects are
1594 * created within some amount of time, then we can shutdown this
1595 * thread. */
1596 LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
1597 if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
1598 &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
1599 break;
1603 ioqueue.thread_running = FALSE;
1604 RtlLeaveCriticalSection( &ioqueue.cs );
1606 TRACE( "terminating I/O completion thread\n" );
1608 RtlExitUserThread( 0 );
1611 static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
1613 NTSTATUS status = STATUS_SUCCESS;
1615 assert( io->type == TP_OBJECT_TYPE_IO );
1617 RtlEnterCriticalSection( &ioqueue.cs );
1619 if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
1620 IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
1622 RtlLeaveCriticalSection( &ioqueue.cs );
1623 return status;
1626 if (!ioqueue.thread_running)
1628 HANDLE thread;
1630 if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
1631 0, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
1633 ioqueue.thread_running = TRUE;
1634 NtClose( thread );
1638 if (status == STATUS_SUCCESS)
1640 FILE_COMPLETION_INFORMATION info;
1641 IO_STATUS_BLOCK iosb;
1643 info.CompletionPort = ioqueue.port;
1644 info.CompletionKey = (ULONG_PTR)io;
1646 status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
1649 if (status == STATUS_SUCCESS)
1651 if (!ioqueue.objcount++)
1652 RtlWakeConditionVariable( &ioqueue.update_event );
1655 RtlLeaveCriticalSection( &ioqueue.cs );
1656 return status;
1659 /***********************************************************************
1660 * tp_threadpool_alloc (internal)
1662 * Allocates a new threadpool object.
1664 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1666 IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress );
1667 struct threadpool *pool;
1668 unsigned int i;
1670 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1671 if (!pool)
1672 return STATUS_NO_MEMORY;
1674 pool->refcount = 1;
1675 pool->objcount = 0;
1676 pool->shutdown = FALSE;
1678 RtlInitializeCriticalSection( &pool->cs );
1679 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1681 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1682 list_init( &pool->pools[i] );
1683 RtlInitializeConditionVariable( &pool->update_event );
1685 pool->max_workers = 500;
1686 pool->min_workers = 0;
1687 pool->num_workers = 0;
1688 pool->num_busy_workers = 0;
1689 pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
1690 pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
1692 TRACE( "allocated threadpool %p\n", pool );
1694 *out = pool;
1695 return STATUS_SUCCESS;
1698 /***********************************************************************
1699 * tp_threadpool_shutdown (internal)
1701 * Prepares the shutdown of a threadpool object and notifies all worker
1702 * threads to terminate (after all remaining work items have been
1703 * processed).
1705 static void tp_threadpool_shutdown( struct threadpool *pool )
1707 assert( pool != default_threadpool );
1709 pool->shutdown = TRUE;
1710 RtlWakeAllConditionVariable( &pool->update_event );
1713 /***********************************************************************
1714 * tp_threadpool_release (internal)
1716 * Releases a reference to a threadpool object.
1718 static BOOL tp_threadpool_release( struct threadpool *pool )
1720 unsigned int i;
1722 if (InterlockedDecrement( &pool->refcount ))
1723 return FALSE;
1725 TRACE( "destroying threadpool %p\n", pool );
1727 assert( pool->shutdown );
1728 assert( !pool->objcount );
1729 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
1730 assert( list_empty( &pool->pools[i] ) );
1732 pool->cs.DebugInfo->Spare[0] = 0;
1733 RtlDeleteCriticalSection( &pool->cs );
1735 RtlFreeHeap( GetProcessHeap(), 0, pool );
1736 return TRUE;
1739 /***********************************************************************
1740 * tp_threadpool_lock (internal)
1742 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1743 * block. When the lock is acquired successfully, it is guaranteed that
1744 * there is at least one worker thread to process tasks.
1746 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1748 struct threadpool *pool = NULL;
1749 NTSTATUS status = STATUS_SUCCESS;
1751 if (environment)
1753 /* Validate environment parameters. */
1754 if (environment->Version == 3)
1756 TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1758 switch (environment3->CallbackPriority)
1760 case TP_CALLBACK_PRIORITY_HIGH:
1761 case TP_CALLBACK_PRIORITY_NORMAL:
1762 case TP_CALLBACK_PRIORITY_LOW:
1763 break;
1764 default:
1765 return STATUS_INVALID_PARAMETER;
1769 pool = (struct threadpool *)environment->Pool;
1772 if (!pool)
1774 if (!default_threadpool)
1776 status = tp_threadpool_alloc( &pool );
1777 if (status != STATUS_SUCCESS)
1778 return status;
1780 if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool, NULL ) != NULL)
1782 tp_threadpool_shutdown( pool );
1783 tp_threadpool_release( pool );
1787 pool = default_threadpool;
1790 RtlEnterCriticalSection( &pool->cs );
1792 /* Make sure that the threadpool has at least one thread. */
1793 if (!pool->num_workers)
1794 status = tp_new_worker_thread( pool );
1796 /* Keep a reference, and increment objcount to ensure that the
1797 * last thread doesn't terminate. */
1798 if (status == STATUS_SUCCESS)
1800 InterlockedIncrement( &pool->refcount );
1801 pool->objcount++;
1804 RtlLeaveCriticalSection( &pool->cs );
1806 if (status != STATUS_SUCCESS)
1807 return status;
1809 *out = pool;
1810 return STATUS_SUCCESS;
1813 /***********************************************************************
1814 * tp_threadpool_unlock (internal)
1816 * Releases a lock on a threadpool.
1818 static void tp_threadpool_unlock( struct threadpool *pool )
1820 RtlEnterCriticalSection( &pool->cs );
1821 pool->objcount--;
1822 RtlLeaveCriticalSection( &pool->cs );
1823 tp_threadpool_release( pool );
1826 /***********************************************************************
1827 * tp_group_alloc (internal)
1829 * Allocates a new threadpool group object.
1831 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1833 struct threadpool_group *group;
1835 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1836 if (!group)
1837 return STATUS_NO_MEMORY;
1839 group->refcount = 1;
1840 group->shutdown = FALSE;
1842 RtlInitializeCriticalSection( &group->cs );
1843 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1845 list_init( &group->members );
1847 TRACE( "allocated group %p\n", group );
1849 *out = group;
1850 return STATUS_SUCCESS;
1853 /***********************************************************************
1854 * tp_group_shutdown (internal)
1856 * Marks the group object for shutdown.
1858 static void tp_group_shutdown( struct threadpool_group *group )
1860 group->shutdown = TRUE;
1863 /***********************************************************************
1864 * tp_group_release (internal)
1866 * Releases a reference to a group object.
1868 static BOOL tp_group_release( struct threadpool_group *group )
1870 if (InterlockedDecrement( &group->refcount ))
1871 return FALSE;
1873 TRACE( "destroying group %p\n", group );
1875 assert( group->shutdown );
1876 assert( list_empty( &group->members ) );
1878 group->cs.DebugInfo->Spare[0] = 0;
1879 RtlDeleteCriticalSection( &group->cs );
1881 RtlFreeHeap( GetProcessHeap(), 0, group );
1882 return TRUE;
1885 /***********************************************************************
1886 * tp_object_initialize (internal)
1888 * Initializes members of a threadpool object.
1890 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
1891 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
1893 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
1895 object->refcount = 1;
1896 object->shutdown = FALSE;
1898 object->pool = pool;
1899 object->group = NULL;
1900 object->userdata = userdata;
1901 object->group_cancel_callback = NULL;
1902 object->finalization_callback = NULL;
1903 object->may_run_long = 0;
1904 object->race_dll = NULL;
1905 object->priority = TP_CALLBACK_PRIORITY_NORMAL;
1907 memset( &object->group_entry, 0, sizeof(object->group_entry) );
1908 object->is_group_member = FALSE;
1910 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
1911 RtlInitializeConditionVariable( &object->finished_event );
1912 RtlInitializeConditionVariable( &object->group_finished_event );
1913 object->completed_event = NULL;
1914 object->num_pending_callbacks = 0;
1915 object->num_running_callbacks = 0;
1916 object->num_associated_callbacks = 0;
1918 if (environment)
1920 if (environment->Version != 1 && environment->Version != 3)
1921 FIXME( "unsupported environment version %u\n", environment->Version );
1923 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
1924 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
1925 object->finalization_callback = environment->FinalizationCallback;
1926 object->may_run_long = environment->u.s.LongFunction != 0;
1927 object->race_dll = environment->RaceDll;
1928 if (environment->Version == 3)
1930 TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
1932 object->priority = environment_v3->CallbackPriority;
1933 assert( object->priority < ARRAY_SIZE(pool->pools) );
1936 if (environment->ActivationContext)
1937 FIXME( "activation context not supported yet\n" );
1939 if (environment->u.s.Persistent)
1940 FIXME( "persistent threads not supported yet\n" );
1943 if (object->race_dll)
1944 LdrAddRefDll( 0, object->race_dll );
1946 TRACE( "allocated object %p of type %u\n", object, object->type );
1948 /* For simple callbacks we have to run tp_object_submit before adding this object
1949 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1950 * will be set, and tp_object_submit would fail with an assertion. */
1952 if (is_simple_callback)
1953 tp_object_submit( object, FALSE );
1955 if (object->group)
1957 struct threadpool_group *group = object->group;
1958 InterlockedIncrement( &group->refcount );
1960 RtlEnterCriticalSection( &group->cs );
1961 list_add_tail( &group->members, &object->group_entry );
1962 object->is_group_member = TRUE;
1963 RtlLeaveCriticalSection( &group->cs );
1966 if (is_simple_callback)
1967 tp_object_release( object );
1970 static void tp_object_prio_queue( struct threadpool_object *object )
1972 ++object->pool->num_busy_workers;
1973 list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
1976 /***********************************************************************
1977 * tp_object_submit (internal)
1979 * Submits a threadpool object to the associated threadpool. This
1980 * function has to be VOID because TpPostWork can never fail on Windows.
1982 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
1984 struct threadpool *pool = object->pool;
1985 NTSTATUS status = STATUS_UNSUCCESSFUL;
1987 assert( !object->shutdown );
1988 assert( !pool->shutdown );
1990 RtlEnterCriticalSection( &pool->cs );
1992 /* Start new worker threads if required. */
1993 if (pool->num_busy_workers >= pool->num_workers &&
1994 pool->num_workers < pool->max_workers)
1995 status = tp_new_worker_thread( pool );
1997 /* Queue work item and increment refcount. */
1998 InterlockedIncrement( &object->refcount );
1999 if (!object->num_pending_callbacks++)
2000 tp_object_prio_queue( object );
2002 /* Count how often the object was signaled. */
2003 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2004 object->u.wait.signaled++;
2006 /* No new thread started - wake up one existing thread. */
2007 if (status != STATUS_SUCCESS)
2009 assert( pool->num_workers > 0 );
2010 RtlWakeConditionVariable( &pool->update_event );
2013 RtlLeaveCriticalSection( &pool->cs );
2016 /***********************************************************************
2017 * tp_object_cancel (internal)
2019 * Cancels all currently pending callbacks for a specific object.
2021 static void tp_object_cancel( struct threadpool_object *object )
2023 struct threadpool *pool = object->pool;
2024 LONG pending_callbacks = 0;
2026 RtlEnterCriticalSection( &pool->cs );
2027 if (object->num_pending_callbacks)
2029 pending_callbacks = object->num_pending_callbacks;
2030 object->num_pending_callbacks = 0;
2031 list_remove( &object->pool_entry );
2033 if (object->type == TP_OBJECT_TYPE_WAIT)
2034 object->u.wait.signaled = 0;
2036 if (object->type == TP_OBJECT_TYPE_IO)
2038 object->u.io.skipped_count += object->u.io.pending_count;
2039 object->u.io.pending_count = 0;
2041 RtlLeaveCriticalSection( &pool->cs );
2043 while (pending_callbacks--)
2044 tp_object_release( object );
2047 static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
2049 if (object->num_pending_callbacks)
2050 return FALSE;
2051 if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
2052 return FALSE;
2054 if (group)
2055 return !object->num_running_callbacks;
2056 else
2057 return !object->num_associated_callbacks;
2060 /***********************************************************************
2061 * tp_object_wait (internal)
2063 * Waits until all pending and running callbacks of a specific object
2064 * have been processed.
2066 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2068 struct threadpool *pool = object->pool;
2070 RtlEnterCriticalSection( &pool->cs );
2071 while (!object_is_finished( object, group_wait ))
2073 if (group_wait)
2074 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2075 else
2076 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2078 RtlLeaveCriticalSection( &pool->cs );
2081 static void tp_ioqueue_unlock( struct threadpool_object *io )
2083 assert( io->type == TP_OBJECT_TYPE_IO );
2085 RtlEnterCriticalSection( &ioqueue.cs );
2087 assert(ioqueue.objcount);
2089 if (!io->shutdown && !--ioqueue.objcount)
2090 NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
2092 RtlLeaveCriticalSection( &ioqueue.cs );
2095 /***********************************************************************
2096 * tp_object_prepare_shutdown (internal)
2098 * Prepares a threadpool object for shutdown.
2100 static void tp_object_prepare_shutdown( struct threadpool_object *object )
2102 if (object->type == TP_OBJECT_TYPE_TIMER)
2103 tp_timerqueue_unlock( object );
2104 else if (object->type == TP_OBJECT_TYPE_WAIT)
2105 tp_waitqueue_unlock( object );
2106 else if (object->type == TP_OBJECT_TYPE_IO)
2107 tp_ioqueue_unlock( object );
2110 /***********************************************************************
2111 * tp_object_release (internal)
2113 * Releases a reference to a threadpool object.
2115 static BOOL tp_object_release( struct threadpool_object *object )
2117 if (InterlockedDecrement( &object->refcount ))
2118 return FALSE;
2120 TRACE( "destroying object %p of type %u\n", object, object->type );
2122 assert( object->shutdown );
2123 assert( !object->num_pending_callbacks );
2124 assert( !object->num_running_callbacks );
2125 assert( !object->num_associated_callbacks );
2127 /* release reference to the group */
2128 if (object->group)
2130 struct threadpool_group *group = object->group;
2132 RtlEnterCriticalSection( &group->cs );
2133 if (object->is_group_member)
2135 list_remove( &object->group_entry );
2136 object->is_group_member = FALSE;
2138 RtlLeaveCriticalSection( &group->cs );
2140 tp_group_release( group );
2143 tp_threadpool_unlock( object->pool );
2145 if (object->race_dll)
2146 LdrUnloadDll( object->race_dll );
2148 if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
2149 NtSetEvent( object->completed_event, NULL );
2151 RtlFreeHeap( GetProcessHeap(), 0, object );
2152 return TRUE;
2155 static struct list *threadpool_get_next_item( const struct threadpool *pool )
2157 struct list *ptr;
2158 unsigned int i;
2160 for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
2162 if ((ptr = list_head( &pool->pools[i] )))
2163 break;
2166 return ptr;
2169 /***********************************************************************
2170 * tp_object_execute (internal)
2172 * Executes a threadpool object callback, object->pool->cs has to be
2173 * held.
2175 static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
2177 TP_CALLBACK_INSTANCE *callback_instance;
2178 struct threadpool_instance instance;
2179 struct io_completion completion;
2180 struct threadpool *pool = object->pool;
2181 TP_WAIT_RESULT wait_result = 0;
2182 NTSTATUS status;
2184 object->num_pending_callbacks--;
2186 /* For wait objects check if they were signaled or have timed out. */
2187 if (object->type == TP_OBJECT_TYPE_WAIT)
2189 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2190 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2192 else if (object->type == TP_OBJECT_TYPE_IO)
2194 assert( object->u.io.completion_count );
2195 completion = object->u.io.completions[--object->u.io.completion_count];
2198 /* Leave critical section and do the actual callback. */
2199 object->num_associated_callbacks++;
2200 object->num_running_callbacks++;
2201 RtlLeaveCriticalSection( &pool->cs );
2202 if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
2204 /* Initialize threadpool instance struct. */
2205 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2206 instance.object = object;
2207 instance.threadid = GetCurrentThreadId();
2208 instance.associated = TRUE;
2209 instance.may_run_long = object->may_run_long;
2210 instance.cleanup.critical_section = NULL;
2211 instance.cleanup.mutex = NULL;
2212 instance.cleanup.semaphore = NULL;
2213 instance.cleanup.semaphore_count = 0;
2214 instance.cleanup.event = NULL;
2215 instance.cleanup.library = NULL;
2217 switch (object->type)
2219 case TP_OBJECT_TYPE_SIMPLE:
2221 TRACE( "executing simple callback %p(%p, %p)\n",
2222 object->u.simple.callback, callback_instance, object->userdata );
2223 object->u.simple.callback( callback_instance, object->userdata );
2224 TRACE( "callback %p returned\n", object->u.simple.callback );
2225 break;
2228 case TP_OBJECT_TYPE_WORK:
2230 TRACE( "executing work callback %p(%p, %p, %p)\n",
2231 object->u.work.callback, callback_instance, object->userdata, object );
2232 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2233 TRACE( "callback %p returned\n", object->u.work.callback );
2234 break;
2237 case TP_OBJECT_TYPE_TIMER:
2239 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2240 object->u.timer.callback, callback_instance, object->userdata, object );
2241 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2242 TRACE( "callback %p returned\n", object->u.timer.callback );
2243 break;
2246 case TP_OBJECT_TYPE_WAIT:
2248 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2249 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2250 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2251 TRACE( "callback %p returned\n", object->u.wait.callback );
2252 break;
2255 case TP_OBJECT_TYPE_IO:
2257 TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
2258 object->u.io.callback, callback_instance, object->userdata,
2259 completion.cvalue, &completion.iosb, (TP_IO *)object );
2260 object->u.io.callback( callback_instance, object->userdata,
2261 (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
2262 TRACE( "callback %p returned\n", object->u.io.callback );
2263 break;
2266 default:
2267 assert(0);
2268 break;
2271 /* Execute finalization callback. */
2272 if (object->finalization_callback)
2274 TRACE( "executing finalization callback %p(%p, %p)\n",
2275 object->finalization_callback, callback_instance, object->userdata );
2276 object->finalization_callback( callback_instance, object->userdata );
2277 TRACE( "callback %p returned\n", object->finalization_callback );
2280 /* Execute cleanup tasks. */
2281 if (instance.cleanup.critical_section)
2283 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2285 if (instance.cleanup.mutex)
2287 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2288 if (status != STATUS_SUCCESS) goto skip_cleanup;
2290 if (instance.cleanup.semaphore)
2292 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2293 if (status != STATUS_SUCCESS) goto skip_cleanup;
2295 if (instance.cleanup.event)
2297 status = NtSetEvent( instance.cleanup.event, NULL );
2298 if (status != STATUS_SUCCESS) goto skip_cleanup;
2300 if (instance.cleanup.library)
2302 LdrUnloadDll( instance.cleanup.library );
2305 skip_cleanup:
2306 if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
2307 RtlEnterCriticalSection( &pool->cs );
2309 /* Simple callbacks are automatically shutdown after execution. */
2310 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2312 tp_object_prepare_shutdown( object );
2313 object->shutdown = TRUE;
2316 object->num_running_callbacks--;
2317 if (object_is_finished( object, TRUE ))
2318 RtlWakeAllConditionVariable( &object->group_finished_event );
2320 if (instance.associated)
2322 object->num_associated_callbacks--;
2323 if (object_is_finished( object, FALSE ))
2324 RtlWakeAllConditionVariable( &object->finished_event );
2328 /***********************************************************************
2329 * threadpool_worker_proc (internal)
2331 static void CALLBACK threadpool_worker_proc( void *param )
2333 struct threadpool *pool = param;
2334 LARGE_INTEGER timeout;
2335 struct list *ptr;
2337 TRACE( "starting worker thread for pool %p\n", pool );
2338 set_thread_name(L"wine_threadpool_worker");
2340 RtlEnterCriticalSection( &pool->cs );
2341 for (;;)
2343 while ((ptr = threadpool_get_next_item( pool )))
2345 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2346 assert( object->num_pending_callbacks > 0 );
2348 /* If further pending callbacks are queued, move the work item to
2349 * the end of the pool list. Otherwise remove it from the pool. */
2350 list_remove( &object->pool_entry );
2351 if (object->num_pending_callbacks > 1)
2352 tp_object_prio_queue( object );
2354 tp_object_execute( object, FALSE );
2356 assert(pool->num_busy_workers);
2357 pool->num_busy_workers--;
2359 tp_object_release( object );
2362 /* Shutdown worker thread if requested. */
2363 if (pool->shutdown)
2364 break;
2366 /* Wait for new tasks or until the timeout expires. A thread only terminates
2367 * when no new tasks are available, and the number of threads can be
2368 * decreased without violating the min_workers limit. An exception is when
2369 * min_workers == 0, then objcount is used to detect if the last thread
2370 * can be terminated. */
2371 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2372 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2373 !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2374 (!pool->min_workers && !pool->objcount)))
2376 break;
2379 pool->num_workers--;
2380 RtlLeaveCriticalSection( &pool->cs );
2382 TRACE( "terminating worker thread for pool %p\n", pool );
2383 tp_threadpool_release( pool );
2384 RtlExitUserThread( 0 );
2387 /***********************************************************************
2388 * TpAllocCleanupGroup (NTDLL.@)
2390 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2392 TRACE( "%p\n", out );
2394 return tp_group_alloc( (struct threadpool_group **)out );
2397 /***********************************************************************
2398 * TpAllocIoCompletion (NTDLL.@)
2400 NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
2401 void *userdata, TP_CALLBACK_ENVIRON *environment )
2403 struct threadpool_object *object;
2404 struct threadpool *pool;
2405 NTSTATUS status;
2407 TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
2409 if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
2410 return STATUS_NO_MEMORY;
2412 if ((status = tp_threadpool_lock( &pool, environment )))
2414 RtlFreeHeap( GetProcessHeap(), 0, object );
2415 return status;
2418 object->type = TP_OBJECT_TYPE_IO;
2419 object->u.io.callback = callback;
2420 if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
2422 tp_threadpool_unlock( pool );
2423 RtlFreeHeap( GetProcessHeap(), 0, object );
2424 return status;
2427 if ((status = tp_ioqueue_lock( object, file )))
2429 tp_threadpool_unlock( pool );
2430 RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
2431 RtlFreeHeap( GetProcessHeap(), 0, object );
2432 return status;
2435 tp_object_initialize( object, pool, userdata, environment );
2437 *out = (TP_IO *)object;
2438 return STATUS_SUCCESS;
2441 /***********************************************************************
2442 * TpAllocPool (NTDLL.@)
2444 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2446 TRACE( "%p %p\n", out, reserved );
2448 if (reserved)
2449 FIXME( "reserved argument is nonzero (%p)\n", reserved );
2451 return tp_threadpool_alloc( (struct threadpool **)out );
2454 /***********************************************************************
2455 * TpAllocTimer (NTDLL.@)
2457 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2458 TP_CALLBACK_ENVIRON *environment )
2460 struct threadpool_object *object;
2461 struct threadpool *pool;
2462 NTSTATUS status;
2464 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2466 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2467 if (!object)
2468 return STATUS_NO_MEMORY;
2470 status = tp_threadpool_lock( &pool, environment );
2471 if (status)
2473 RtlFreeHeap( GetProcessHeap(), 0, object );
2474 return status;
2477 object->type = TP_OBJECT_TYPE_TIMER;
2478 object->u.timer.callback = callback;
2480 status = tp_timerqueue_lock( object );
2481 if (status)
2483 tp_threadpool_unlock( pool );
2484 RtlFreeHeap( GetProcessHeap(), 0, object );
2485 return status;
2488 tp_object_initialize( object, pool, userdata, environment );
2490 *out = (TP_TIMER *)object;
2491 return STATUS_SUCCESS;
2494 static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2495 TP_CALLBACK_ENVIRON *environment, DWORD flags )
2497 struct threadpool_object *object;
2498 struct threadpool *pool;
2499 NTSTATUS status;
2501 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2502 if (!object)
2503 return STATUS_NO_MEMORY;
2505 status = tp_threadpool_lock( &pool, environment );
2506 if (status)
2508 RtlFreeHeap( GetProcessHeap(), 0, object );
2509 return status;
2512 object->type = TP_OBJECT_TYPE_WAIT;
2513 object->u.wait.callback = callback;
2514 object->u.wait.flags = flags;
2516 status = tp_waitqueue_lock( object );
2517 if (status)
2519 tp_threadpool_unlock( pool );
2520 RtlFreeHeap( GetProcessHeap(), 0, object );
2521 return status;
2524 tp_object_initialize( object, pool, userdata, environment );
2526 *out = (TP_WAIT *)object;
2527 return STATUS_SUCCESS;
2530 /***********************************************************************
2531 * TpAllocWait (NTDLL.@)
2533 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2534 TP_CALLBACK_ENVIRON *environment )
2536 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2537 return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
2540 /***********************************************************************
2541 * TpAllocWork (NTDLL.@)
2543 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2544 TP_CALLBACK_ENVIRON *environment )
2546 struct threadpool_object *object;
2547 struct threadpool *pool;
2548 NTSTATUS status;
2550 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2552 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2553 if (!object)
2554 return STATUS_NO_MEMORY;
2556 status = tp_threadpool_lock( &pool, environment );
2557 if (status)
2559 RtlFreeHeap( GetProcessHeap(), 0, object );
2560 return status;
2563 object->type = TP_OBJECT_TYPE_WORK;
2564 object->u.work.callback = callback;
2565 tp_object_initialize( object, pool, userdata, environment );
2567 *out = (TP_WORK *)object;
2568 return STATUS_SUCCESS;
2571 /***********************************************************************
2572 * TpCancelAsyncIoOperation (NTDLL.@)
2574 void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
2576 struct threadpool_object *this = impl_from_TP_IO( io );
2578 TRACE( "%p\n", io );
2580 RtlEnterCriticalSection( &this->pool->cs );
2582 TRACE("pending_count %u.\n", this->u.io.pending_count);
2584 this->u.io.pending_count--;
2585 if (object_is_finished( this, TRUE ))
2586 RtlWakeAllConditionVariable( &this->group_finished_event );
2587 if (object_is_finished( this, FALSE ))
2588 RtlWakeAllConditionVariable( &this->finished_event );
2590 RtlLeaveCriticalSection( &this->pool->cs );
2593 /***********************************************************************
2594 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2596 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2598 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2600 TRACE( "%p %p\n", instance, crit );
2602 if (!this->cleanup.critical_section)
2603 this->cleanup.critical_section = crit;
2606 /***********************************************************************
2607 * TpCallbackMayRunLong (NTDLL.@)
2609 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2611 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2612 struct threadpool_object *object = this->object;
2613 struct threadpool *pool;
2614 NTSTATUS status = STATUS_SUCCESS;
2616 TRACE( "%p\n", instance );
2618 if (this->threadid != GetCurrentThreadId())
2620 ERR("called from wrong thread, ignoring\n");
2621 return STATUS_UNSUCCESSFUL; /* FIXME */
2624 if (this->may_run_long)
2625 return STATUS_SUCCESS;
2627 pool = object->pool;
2628 RtlEnterCriticalSection( &pool->cs );
2630 /* Start new worker threads if required. */
2631 if (pool->num_busy_workers >= pool->num_workers)
2633 if (pool->num_workers < pool->max_workers)
2635 status = tp_new_worker_thread( pool );
2637 else
2639 status = STATUS_TOO_MANY_THREADS;
2643 RtlLeaveCriticalSection( &pool->cs );
2644 this->may_run_long = TRUE;
2645 return status;
2648 /***********************************************************************
2649 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2651 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2653 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2655 TRACE( "%p %p\n", instance, mutex );
2657 if (!this->cleanup.mutex)
2658 this->cleanup.mutex = mutex;
2661 /***********************************************************************
2662 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2664 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2666 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2668 TRACE( "%p %p %u\n", instance, semaphore, count );
2670 if (!this->cleanup.semaphore)
2672 this->cleanup.semaphore = semaphore;
2673 this->cleanup.semaphore_count = count;
2677 /***********************************************************************
2678 * TpCallbackSetEventOnCompletion (NTDLL.@)
2680 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2682 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2684 TRACE( "%p %p\n", instance, event );
2686 if (!this->cleanup.event)
2687 this->cleanup.event = event;
2690 /***********************************************************************
2691 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2693 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2695 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2697 TRACE( "%p %p\n", instance, module );
2699 if (!this->cleanup.library)
2700 this->cleanup.library = module;
2703 /***********************************************************************
2704 * TpDisassociateCallback (NTDLL.@)
2706 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2708 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2709 struct threadpool_object *object = this->object;
2710 struct threadpool *pool;
2712 TRACE( "%p\n", instance );
2714 if (this->threadid != GetCurrentThreadId())
2716 ERR("called from wrong thread, ignoring\n");
2717 return;
2720 if (!this->associated)
2721 return;
2723 pool = object->pool;
2724 RtlEnterCriticalSection( &pool->cs );
2726 object->num_associated_callbacks--;
2727 if (object_is_finished( object, FALSE ))
2728 RtlWakeAllConditionVariable( &object->finished_event );
2730 RtlLeaveCriticalSection( &pool->cs );
2731 this->associated = FALSE;
2734 /***********************************************************************
2735 * TpIsTimerSet (NTDLL.@)
2737 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2739 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2741 TRACE( "%p\n", timer );
2743 return this->u.timer.timer_set;
2746 /***********************************************************************
2747 * TpPostWork (NTDLL.@)
2749 VOID WINAPI TpPostWork( TP_WORK *work )
2751 struct threadpool_object *this = impl_from_TP_WORK( work );
2753 TRACE( "%p\n", work );
2755 tp_object_submit( this, FALSE );
2758 /***********************************************************************
2759 * TpReleaseCleanupGroup (NTDLL.@)
2761 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2763 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2765 TRACE( "%p\n", group );
2767 tp_group_shutdown( this );
2768 tp_group_release( this );
2771 /***********************************************************************
2772 * TpReleaseCleanupGroupMembers (NTDLL.@)
2774 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2776 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2777 struct threadpool_object *object, *next;
2778 struct list members;
2780 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2782 RtlEnterCriticalSection( &this->cs );
2784 /* Unset group, increase references, and mark objects for shutdown */
2785 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2787 assert( object->group == this );
2788 assert( object->is_group_member );
2790 if (InterlockedIncrement( &object->refcount ) == 1)
2792 /* Object is basically already destroyed, but group reference
2793 * was not deleted yet. We can safely ignore this object. */
2794 InterlockedDecrement( &object->refcount );
2795 list_remove( &object->group_entry );
2796 object->is_group_member = FALSE;
2797 continue;
2800 object->is_group_member = FALSE;
2801 tp_object_prepare_shutdown( object );
2804 /* Move members to a new temporary list */
2805 list_init( &members );
2806 list_move_tail( &members, &this->members );
2808 RtlLeaveCriticalSection( &this->cs );
2810 /* Cancel pending callbacks if requested */
2811 if (cancel_pending)
2813 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2815 tp_object_cancel( object );
2819 /* Wait for remaining callbacks to finish */
2820 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2822 tp_object_wait( object, TRUE );
2824 if (!object->shutdown)
2826 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2827 if (cancel_pending && object->group_cancel_callback)
2829 TRACE( "executing group cancel callback %p(%p, %p)\n",
2830 object->group_cancel_callback, object->userdata, userdata );
2831 object->group_cancel_callback( object->userdata, userdata );
2832 TRACE( "callback %p returned\n", object->group_cancel_callback );
2835 if (object->type != TP_OBJECT_TYPE_SIMPLE)
2836 tp_object_release( object );
2839 object->shutdown = TRUE;
2840 tp_object_release( object );
2844 /***********************************************************************
2845 * TpReleaseIoCompletion (NTDLL.@)
2847 void WINAPI TpReleaseIoCompletion( TP_IO *io )
2849 struct threadpool_object *this = impl_from_TP_IO( io );
2850 BOOL can_destroy;
2852 TRACE( "%p\n", io );
2854 RtlEnterCriticalSection( &this->pool->cs );
2855 this->u.io.shutting_down = TRUE;
2856 can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
2857 RtlLeaveCriticalSection( &this->pool->cs );
2859 if (can_destroy)
2861 tp_object_prepare_shutdown( this );
2862 this->shutdown = TRUE;
2863 tp_object_release( this );
2867 /***********************************************************************
2868 * TpReleasePool (NTDLL.@)
2870 VOID WINAPI TpReleasePool( TP_POOL *pool )
2872 struct threadpool *this = impl_from_TP_POOL( pool );
2874 TRACE( "%p\n", pool );
2876 tp_threadpool_shutdown( this );
2877 tp_threadpool_release( this );
2880 /***********************************************************************
2881 * TpReleaseTimer (NTDLL.@)
2883 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2885 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2887 TRACE( "%p\n", timer );
2889 tp_object_prepare_shutdown( this );
2890 this->shutdown = TRUE;
2891 tp_object_release( this );
2894 /***********************************************************************
2895 * TpReleaseWait (NTDLL.@)
2897 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
2899 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2901 TRACE( "%p\n", wait );
2903 tp_object_prepare_shutdown( this );
2904 this->shutdown = TRUE;
2905 tp_object_release( this );
2908 /***********************************************************************
2909 * TpReleaseWork (NTDLL.@)
2911 VOID WINAPI TpReleaseWork( TP_WORK *work )
2913 struct threadpool_object *this = impl_from_TP_WORK( work );
2915 TRACE( "%p\n", work );
2917 tp_object_prepare_shutdown( this );
2918 this->shutdown = TRUE;
2919 tp_object_release( this );
2922 /***********************************************************************
2923 * TpSetPoolMaxThreads (NTDLL.@)
2925 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
2927 struct threadpool *this = impl_from_TP_POOL( pool );
2929 TRACE( "%p %u\n", pool, maximum );
2931 RtlEnterCriticalSection( &this->cs );
2932 this->max_workers = max( maximum, 1 );
2933 this->min_workers = min( this->min_workers, this->max_workers );
2934 RtlLeaveCriticalSection( &this->cs );
2937 /***********************************************************************
2938 * TpSetPoolMinThreads (NTDLL.@)
2940 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
2942 struct threadpool *this = impl_from_TP_POOL( pool );
2943 NTSTATUS status = STATUS_SUCCESS;
2945 TRACE( "%p %u\n", pool, minimum );
2947 RtlEnterCriticalSection( &this->cs );
2949 while (this->num_workers < minimum)
2951 status = tp_new_worker_thread( this );
2952 if (status != STATUS_SUCCESS)
2953 break;
2956 if (status == STATUS_SUCCESS)
2958 this->min_workers = minimum;
2959 this->max_workers = max( this->min_workers, this->max_workers );
2962 RtlLeaveCriticalSection( &this->cs );
2963 return !status;
2966 /***********************************************************************
2967 * TpSetTimer (NTDLL.@)
2969 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
2971 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2972 struct threadpool_object *other_timer;
2973 BOOL submit_timer = FALSE;
2974 ULONGLONG timestamp;
2976 TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
2978 RtlEnterCriticalSection( &timerqueue.cs );
2980 assert( this->u.timer.timer_initialized );
2981 this->u.timer.timer_set = timeout != NULL;
2983 /* Convert relative timeout to absolute timestamp and handle a timeout
2984 * of zero, which means that the timer is submitted immediately. */
2985 if (timeout)
2987 timestamp = timeout->QuadPart;
2988 if ((LONGLONG)timestamp < 0)
2990 LARGE_INTEGER now;
2991 NtQuerySystemTime( &now );
2992 timestamp = now.QuadPart - timestamp;
2994 else if (!timestamp)
2996 if (!period)
2997 timeout = NULL;
2998 else
3000 LARGE_INTEGER now;
3001 NtQuerySystemTime( &now );
3002 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
3004 submit_timer = TRUE;
3008 /* First remove existing timeout. */
3009 if (this->u.timer.timer_pending)
3011 list_remove( &this->u.timer.timer_entry );
3012 this->u.timer.timer_pending = FALSE;
3015 /* If the timer was enabled, then add it back to the queue. */
3016 if (timeout)
3018 this->u.timer.timeout = timestamp;
3019 this->u.timer.period = period;
3020 this->u.timer.window_length = window_length;
3022 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
3023 struct threadpool_object, u.timer.timer_entry )
3025 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
3026 if (this->u.timer.timeout < other_timer->u.timer.timeout)
3027 break;
3029 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
3031 /* Wake up the timer thread when the timeout has to be updated. */
3032 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
3033 RtlWakeAllConditionVariable( &timerqueue.update_event );
3035 this->u.timer.timer_pending = TRUE;
3038 RtlLeaveCriticalSection( &timerqueue.cs );
3040 if (submit_timer)
3041 tp_object_submit( this, FALSE );
3044 /***********************************************************************
3045 * TpSetWait (NTDLL.@)
3047 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
3049 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3050 ULONGLONG timestamp = MAXLONGLONG;
3052 TRACE( "%p %p %p\n", wait, handle, timeout );
3054 RtlEnterCriticalSection( &waitqueue.cs );
3056 assert( this->u.wait.bucket );
3057 this->u.wait.handle = handle;
3059 if (handle || this->u.wait.wait_pending)
3061 struct waitqueue_bucket *bucket = this->u.wait.bucket;
3062 list_remove( &this->u.wait.wait_entry );
3064 /* Convert relative timeout to absolute timestamp. */
3065 if (handle && timeout)
3067 timestamp = timeout->QuadPart;
3068 if ((LONGLONG)timestamp < 0)
3070 LARGE_INTEGER now;
3071 NtQuerySystemTime( &now );
3072 timestamp = now.QuadPart - timestamp;
3076 /* Add wait object back into one of the queues. */
3077 if (handle)
3079 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
3080 this->u.wait.wait_pending = TRUE;
3081 this->u.wait.timeout = timestamp;
3083 else
3085 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
3086 this->u.wait.wait_pending = FALSE;
3089 /* Wake up the wait queue thread. */
3090 NtSetEvent( bucket->update_event, NULL );
3093 RtlLeaveCriticalSection( &waitqueue.cs );
3096 /***********************************************************************
3097 * TpSimpleTryPost (NTDLL.@)
3099 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
3100 TP_CALLBACK_ENVIRON *environment )
3102 struct threadpool_object *object;
3103 struct threadpool *pool;
3104 NTSTATUS status;
3106 TRACE( "%p %p %p\n", callback, userdata, environment );
3108 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
3109 if (!object)
3110 return STATUS_NO_MEMORY;
3112 status = tp_threadpool_lock( &pool, environment );
3113 if (status)
3115 RtlFreeHeap( GetProcessHeap(), 0, object );
3116 return status;
3119 object->type = TP_OBJECT_TYPE_SIMPLE;
3120 object->u.simple.callback = callback;
3121 tp_object_initialize( object, pool, userdata, environment );
3123 return STATUS_SUCCESS;
3126 /***********************************************************************
3127 * TpStartAsyncIoOperation (NTDLL.@)
3129 void WINAPI TpStartAsyncIoOperation( TP_IO *io )
3131 struct threadpool_object *this = impl_from_TP_IO( io );
3133 TRACE( "%p\n", io );
3135 RtlEnterCriticalSection( &this->pool->cs );
3137 this->u.io.pending_count++;
3139 RtlLeaveCriticalSection( &this->pool->cs );
3142 /***********************************************************************
3143 * TpWaitForIoCompletion (NTDLL.@)
3145 void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
3147 struct threadpool_object *this = impl_from_TP_IO( io );
3149 TRACE( "%p %d\n", io, cancel_pending );
3151 if (cancel_pending)
3152 tp_object_cancel( this );
3153 tp_object_wait( this, FALSE );
3156 /***********************************************************************
3157 * TpWaitForTimer (NTDLL.@)
3159 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
3161 struct threadpool_object *this = impl_from_TP_TIMER( timer );
3163 TRACE( "%p %d\n", timer, cancel_pending );
3165 if (cancel_pending)
3166 tp_object_cancel( this );
3167 tp_object_wait( this, FALSE );
3170 /***********************************************************************
3171 * TpWaitForWait (NTDLL.@)
3173 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
3175 struct threadpool_object *this = impl_from_TP_WAIT( wait );
3177 TRACE( "%p %d\n", wait, cancel_pending );
3179 if (cancel_pending)
3180 tp_object_cancel( this );
3181 tp_object_wait( this, FALSE );
3184 /***********************************************************************
3185 * TpWaitForWork (NTDLL.@)
3187 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
3189 struct threadpool_object *this = impl_from_TP_WORK( work );
3191 TRACE( "%p %u\n", work, cancel_pending );
3193 if (cancel_pending)
3194 tp_object_cancel( this );
3195 tp_object_wait( this, FALSE );
3198 /***********************************************************************
3199 * TpSetPoolStackInformation (NTDLL.@)
3201 NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3203 struct threadpool *this = impl_from_TP_POOL( pool );
3205 TRACE( "%p %p\n", pool, stack_info );
3207 if (!stack_info)
3208 return STATUS_INVALID_PARAMETER;
3210 RtlEnterCriticalSection( &this->cs );
3211 this->stack_info = *stack_info;
3212 RtlLeaveCriticalSection( &this->cs );
3214 return STATUS_SUCCESS;
3217 /***********************************************************************
3218 * TpQueryPoolStackInformation (NTDLL.@)
3220 NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION *stack_info )
3222 struct threadpool *this = impl_from_TP_POOL( pool );
3224 TRACE( "%p %p\n", pool, stack_info );
3226 if (!stack_info)
3227 return STATUS_INVALID_PARAMETER;
3229 RtlEnterCriticalSection( &this->cs );
3230 *stack_info = this->stack_info;
3231 RtlLeaveCriticalSection( &this->cs );
3233 return STATUS_SUCCESS;
3236 static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result )
3238 struct threadpool_object *object = impl_from_TP_WAIT(wait);
3239 object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
3242 /***********************************************************************
3243 * RtlRegisterWait (NTDLL.@)
3245 * Registers a wait for a handle to become signaled.
3247 * PARAMS
3248 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3249 * Object [I] Object to wait to become signaled.
3250 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3251 * Context [I] Context to pass to the callback function when it is executed.
3252 * Milliseconds [I] Number of milliseconds to wait before timing out.
3253 * Flags [I] Flags. See notes.
3255 * RETURNS
3256 * Success: STATUS_SUCCESS.
3257 * Failure: Any NTSTATUS code.
3259 * NOTES
3260 * Flags can be one or more of the following:
3261 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3262 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3263 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3264 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3265 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3267 NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback,
3268 void *context, ULONG milliseconds, ULONG flags )
3270 struct threadpool_object *object;
3271 TP_CALLBACK_ENVIRON environment;
3272 LARGE_INTEGER timeout;
3273 NTSTATUS status;
3274 TP_WAIT *wait;
3276 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %u, flags %x\n",
3277 out, handle, callback, context, milliseconds, flags );
3279 memset( &environment, 0, sizeof(environment) );
3280 environment.Version = 1;
3281 environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
3282 environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
3284 flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD);
3285 if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
3286 return status;
3288 object = impl_from_TP_WAIT(wait);
3289 object->u.wait.rtl_callback = callback;
3291 RtlEnterCriticalSection( &waitqueue.cs );
3292 TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
3294 *out = object;
3295 RtlLeaveCriticalSection( &waitqueue.cs );
3297 return STATUS_SUCCESS;
3300 /***********************************************************************
3301 * RtlDeregisterWaitEx (NTDLL.@)
3303 * Cancels a wait operation and frees the resources associated with calling
3304 * RtlRegisterWait().
3306 * PARAMS
3307 * WaitObject [I] Handle to the wait object to free.
3309 * RETURNS
3310 * Success: STATUS_SUCCESS.
3311 * Failure: Any NTSTATUS code.
3313 NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event )
3315 struct threadpool_object *object = handle;
3316 NTSTATUS status;
3318 TRACE( "handle %p, event %p\n", handle, event );
3320 if (!object) return STATUS_INVALID_HANDLE;
3322 TpSetWait( (TP_WAIT *)object, NULL, NULL );
3324 if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
3325 else
3327 assert( object->completed_event == NULL );
3328 object->completed_event = event;
3331 RtlEnterCriticalSection( &object->pool->cs );
3332 if (object->num_pending_callbacks + object->num_running_callbacks
3333 + object->num_associated_callbacks) status = STATUS_PENDING;
3334 else status = STATUS_SUCCESS;
3335 RtlLeaveCriticalSection( &object->pool->cs );
3337 TpReleaseWait( (TP_WAIT *)object );
3338 return status;
3341 /***********************************************************************
3342 * RtlDeregisterWait (NTDLL.@)
3344 * Cancels a wait operation and frees the resources associated with calling
3345 * RtlRegisterWait().
3347 * PARAMS
3348 * WaitObject [I] Handle to the wait object to free.
3350 * RETURNS
3351 * Success: STATUS_SUCCESS.
3352 * Failure: Any NTSTATUS code.
3354 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
3356 return RtlDeregisterWaitEx(WaitHandle, NULL);