ntdll: Try to merge threadpool wait queue buckets if possible.
[wine/multimedia.git] / dlls / ntdll / threadpool.c
blob3f54208bf43181a72ca797a195d4049524f22c08
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2015 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include "config.h"
23 #include "wine/port.h"
25 #include <assert.h>
26 #include <stdarg.h>
27 #include <limits.h>
29 #define NONAMELESSUNION
30 #include "ntstatus.h"
31 #define WIN32_NO_STATUS
32 #include "winternl.h"
34 #include "wine/debug.h"
35 #include "wine/list.h"
37 #include "ntdll_misc.h"
39 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
42 * Old thread pooling API
45 #define OLD_WORKER_TIMEOUT 30000 /* 30 seconds */
46 #define EXPIRE_NEVER (~(ULONGLONG)0)
47 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
49 static RTL_CRITICAL_SECTION_DEBUG critsect_debug;
50 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
52 static struct
54 /* threadpool_cs must be held while modifying the following four elements */
55 struct list work_item_list;
56 LONG num_workers;
57 LONG num_busy_workers;
58 LONG num_items_processed;
59 RTL_CONDITION_VARIABLE threadpool_cond;
60 RTL_CRITICAL_SECTION threadpool_cs;
61 HANDLE compl_port;
62 RTL_CRITICAL_SECTION threadpool_compl_cs;
64 old_threadpool =
66 LIST_INIT(old_threadpool.work_item_list), /* work_item_list */
67 0, /* num_workers */
68 0, /* num_busy_workers */
69 0, /* num_items_processed */
70 RTL_CONDITION_VARIABLE_INIT, /* threadpool_cond */
71 { &critsect_debug, -1, 0, 0, 0, 0 }, /* threadpool_cs */
72 NULL, /* compl_port */
73 { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
76 static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
78 0, 0, &old_threadpool.threadpool_cs,
79 { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
80 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
83 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
85 0, 0, &old_threadpool.threadpool_compl_cs,
86 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
87 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
90 struct work_item
92 struct list entry;
93 PRTL_WORK_ITEM_ROUTINE function;
94 PVOID context;
97 struct wait_work_item
99 HANDLE Object;
100 HANDLE CancelEvent;
101 WAITORTIMERCALLBACK Callback;
102 PVOID Context;
103 ULONG Milliseconds;
104 ULONG Flags;
105 HANDLE CompletionEvent;
106 LONG DeleteCount;
107 BOOLEAN CallbackInProgress;
110 struct timer_queue;
111 struct queue_timer
113 struct timer_queue *q;
114 struct list entry;
115 ULONG runcount; /* number of callbacks pending execution */
116 RTL_WAITORTIMERCALLBACKFUNC callback;
117 PVOID param;
118 DWORD period;
119 ULONG flags;
120 ULONGLONG expire;
121 BOOL destroy; /* timer should be deleted; once set, never unset */
122 HANDLE event; /* removal event */
125 struct timer_queue
127 DWORD magic;
128 RTL_CRITICAL_SECTION cs;
129 struct list timers; /* sorted by expiration time */
130 BOOL quit; /* queue should be deleted; once set, never unset */
131 HANDLE event;
132 HANDLE thread;
136 * Object-oriented thread pooling API
139 #define THREADPOOL_WORKER_TIMEOUT 5000
140 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
142 /* internal threadpool representation */
143 struct threadpool
145 LONG refcount;
146 LONG objcount;
147 BOOL shutdown;
148 CRITICAL_SECTION cs;
149 /* pool of work items, locked via .cs */
150 struct list pool;
151 RTL_CONDITION_VARIABLE update_event;
152 /* information about worker threads, locked via .cs */
153 int max_workers;
154 int min_workers;
155 int num_workers;
156 int num_busy_workers;
159 enum threadpool_objtype
161 TP_OBJECT_TYPE_SIMPLE,
162 TP_OBJECT_TYPE_WORK,
163 TP_OBJECT_TYPE_TIMER,
164 TP_OBJECT_TYPE_WAIT
167 /* internal threadpool object representation */
168 struct threadpool_object
170 LONG refcount;
171 BOOL shutdown;
172 /* read-only information */
173 enum threadpool_objtype type;
174 struct threadpool *pool;
175 struct threadpool_group *group;
176 PVOID userdata;
177 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
178 PTP_SIMPLE_CALLBACK finalization_callback;
179 BOOL may_run_long;
180 HMODULE race_dll;
181 /* information about the group, locked via .group->cs */
182 struct list group_entry;
183 BOOL is_group_member;
184 /* information about the pool, locked via .pool->cs */
185 struct list pool_entry;
186 RTL_CONDITION_VARIABLE finished_event;
187 RTL_CONDITION_VARIABLE group_finished_event;
188 LONG num_pending_callbacks;
189 LONG num_running_callbacks;
190 LONG num_associated_callbacks;
191 /* arguments for callback */
192 union
194 struct
196 PTP_SIMPLE_CALLBACK callback;
197 } simple;
198 struct
200 PTP_WORK_CALLBACK callback;
201 } work;
202 struct
204 PTP_TIMER_CALLBACK callback;
205 /* information about the timer, locked via timerqueue.cs */
206 BOOL timer_initialized;
207 BOOL timer_pending;
208 struct list timer_entry;
209 BOOL timer_set;
210 ULONGLONG timeout;
211 LONG period;
212 LONG window_length;
213 } timer;
214 struct
216 PTP_WAIT_CALLBACK callback;
217 LONG signaled;
218 /* information about the wait object, locked via waitqueue.cs */
219 struct waitqueue_bucket *bucket;
220 BOOL wait_pending;
221 struct list wait_entry;
222 ULONGLONG timeout;
223 HANDLE handle;
224 } wait;
225 } u;
228 /* internal threadpool instance representation */
229 struct threadpool_instance
231 struct threadpool_object *object;
232 DWORD threadid;
233 BOOL associated;
234 BOOL may_run_long;
235 struct
237 CRITICAL_SECTION *critical_section;
238 HANDLE mutex;
239 HANDLE semaphore;
240 LONG semaphore_count;
241 HANDLE event;
242 HMODULE library;
243 } cleanup;
246 /* internal threadpool group representation */
247 struct threadpool_group
249 LONG refcount;
250 BOOL shutdown;
251 CRITICAL_SECTION cs;
252 /* list of group members, locked via .cs */
253 struct list members;
256 /* global timerqueue object */
257 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
259 static struct
261 CRITICAL_SECTION cs;
262 LONG objcount;
263 BOOL thread_running;
264 struct list pending_timers;
265 RTL_CONDITION_VARIABLE update_event;
267 timerqueue =
269 { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
270 0, /* objcount */
271 FALSE, /* thread_running */
272 LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
273 RTL_CONDITION_VARIABLE_INIT /* update_event */
276 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
278 0, 0, &timerqueue.cs,
279 { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
280 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
283 /* global waitqueue object */
284 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
286 static struct
288 CRITICAL_SECTION cs;
289 LONG num_buckets;
290 struct list buckets;
292 waitqueue =
294 { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
295 0, /* num_buckets */
296 LIST_INIT( waitqueue.buckets ) /* buckets */
299 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
301 0, 0, &waitqueue.cs,
302 { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
303 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
306 struct waitqueue_bucket
308 struct list bucket_entry;
309 LONG objcount;
310 struct list reserved;
311 struct list waiting;
312 HANDLE update_event;
315 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
317 return (struct threadpool *)pool;
320 static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
322 struct threadpool_object *object = (struct threadpool_object *)work;
323 assert( object->type == TP_OBJECT_TYPE_WORK );
324 return object;
327 static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
329 struct threadpool_object *object = (struct threadpool_object *)timer;
330 assert( object->type == TP_OBJECT_TYPE_TIMER );
331 return object;
334 static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
336 struct threadpool_object *object = (struct threadpool_object *)wait;
337 assert( object->type == TP_OBJECT_TYPE_WAIT );
338 return object;
341 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
343 return (struct threadpool_group *)group;
346 static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE *instance )
348 return (struct threadpool_instance *)instance;
351 static void CALLBACK threadpool_worker_proc( void *param );
352 static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
353 static void tp_object_shutdown( struct threadpool_object *object );
354 static BOOL tp_object_release( struct threadpool_object *object );
355 static struct threadpool *default_threadpool = NULL;
357 static inline LONG interlocked_inc( PLONG dest )
359 return interlocked_xchg_add( dest, 1 ) + 1;
362 static inline LONG interlocked_dec( PLONG dest )
364 return interlocked_xchg_add( dest, -1 ) - 1;
367 static void WINAPI worker_thread_proc(void * param)
369 struct list *item;
370 struct work_item *work_item_ptr, work_item;
371 LARGE_INTEGER timeout;
372 timeout.QuadPart = -(OLD_WORKER_TIMEOUT * (ULONGLONG)10000);
374 RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
375 old_threadpool.num_workers++;
377 for (;;)
379 if ((item = list_head( &old_threadpool.work_item_list )))
381 work_item_ptr = LIST_ENTRY( item, struct work_item, entry );
382 list_remove( &work_item_ptr->entry );
383 old_threadpool.num_busy_workers++;
384 old_threadpool.num_items_processed++;
385 RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
387 /* copy item to stack and do the work */
388 work_item = *work_item_ptr;
389 RtlFreeHeap( GetProcessHeap(), 0, work_item_ptr );
390 TRACE("executing %p(%p)\n", work_item.function, work_item.context);
391 work_item.function( work_item.context );
393 RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
394 old_threadpool.num_busy_workers--;
396 else if (RtlSleepConditionVariableCS( &old_threadpool.threadpool_cond,
397 &old_threadpool.threadpool_cs, &timeout ) != STATUS_SUCCESS)
399 break;
403 old_threadpool.num_workers--;
404 RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
405 RtlExitUserThread( 0 );
407 /* never reached */
410 /***********************************************************************
411 * RtlQueueWorkItem (NTDLL.@)
413 * Queues a work item into a thread in the thread pool.
415 * PARAMS
416 * Function [I] Work function to execute.
417 * Context [I] Context to pass to the work function when it is executed.
418 * Flags [I] Flags. See notes.
420 * RETURNS
421 * Success: STATUS_SUCCESS.
422 * Failure: Any NTSTATUS code.
424 * NOTES
425 * Flags can be one or more of the following:
426 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
427 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
428 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
429 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
430 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
432 NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, ULONG Flags)
434 HANDLE thread;
435 NTSTATUS status;
436 LONG items_processed;
437 struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item));
439 if (!work_item)
440 return STATUS_NO_MEMORY;
442 work_item->function = Function;
443 work_item->context = Context;
445 if (Flags & ~WT_EXECUTELONGFUNCTION)
446 FIXME("Flags 0x%x not supported\n", Flags);
448 RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
449 list_add_tail( &old_threadpool.work_item_list, &work_item->entry );
450 status = (old_threadpool.num_workers > old_threadpool.num_busy_workers) ?
451 STATUS_SUCCESS : STATUS_UNSUCCESSFUL;
452 items_processed = old_threadpool.num_items_processed;
453 RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
455 /* FIXME: tune this algorithm to not be as aggressive with creating threads
456 * if WT_EXECUTELONGFUNCTION isn't specified */
457 if (status == STATUS_SUCCESS)
458 RtlWakeConditionVariable( &old_threadpool.threadpool_cond );
459 else
461 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
462 worker_thread_proc, NULL, &thread, NULL );
464 /* NOTE: we don't care if we couldn't create the thread if there is at
465 * least one other available to process the request */
466 if (status == STATUS_SUCCESS)
467 NtClose( thread );
468 else
470 RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
471 if (old_threadpool.num_workers > 0 ||
472 old_threadpool.num_items_processed != items_processed)
474 status = STATUS_SUCCESS;
476 else
477 list_remove( &work_item->entry );
478 RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
480 if (status != STATUS_SUCCESS)
481 RtlFreeHeap( GetProcessHeap(), 0, work_item );
485 return status;
488 /***********************************************************************
489 * iocp_poller - get completion events and run callbacks
491 static DWORD CALLBACK iocp_poller(LPVOID Arg)
493 HANDLE cport = Arg;
495 while( TRUE )
497 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
498 LPVOID overlapped;
499 IO_STATUS_BLOCK iosb;
500 NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
501 if (res)
503 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
505 else
507 DWORD transferred = 0;
508 DWORD err = 0;
510 if (iosb.u.Status == STATUS_SUCCESS)
511 transferred = iosb.Information;
512 else
513 err = RtlNtStatusToDosError(iosb.u.Status);
515 callback( err, transferred, overlapped );
518 return 0;
521 /***********************************************************************
522 * RtlSetIoCompletionCallback (NTDLL.@)
524 * Binds a handle to a thread pool's completion port, and possibly
525 * starts a non-I/O thread to monitor this port and call functions back.
527 * PARAMS
528 * FileHandle [I] Handle to bind to a completion port.
529 * Function [I] Callback function to call on I/O completions.
530 * Flags [I] Not used.
532 * RETURNS
533 * Success: STATUS_SUCCESS.
534 * Failure: Any NTSTATUS code.
537 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
539 IO_STATUS_BLOCK iosb;
540 FILE_COMPLETION_INFORMATION info;
542 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
544 if (!old_threadpool.compl_port)
546 NTSTATUS res = STATUS_SUCCESS;
548 RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
549 if (!old_threadpool.compl_port)
551 HANDLE cport;
553 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
554 if (!res)
556 /* FIXME native can start additional threads in case of e.g. hung callback function. */
557 res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
558 if (!res)
559 old_threadpool.compl_port = cport;
560 else
561 NtClose( cport );
564 RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
565 if (res) return res;
568 info.CompletionPort = old_threadpool.compl_port;
569 info.CompletionKey = (ULONG_PTR)Function;
571 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
574 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
576 if (timeout == INFINITE) return NULL;
577 pTime->QuadPart = (ULONGLONG)timeout * -10000;
578 return pTime;
581 static void delete_wait_work_item(struct wait_work_item *wait_work_item)
583 NtClose( wait_work_item->CancelEvent );
584 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
587 static DWORD CALLBACK wait_thread_proc(LPVOID Arg)
589 struct wait_work_item *wait_work_item = Arg;
590 NTSTATUS status;
591 BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0;
592 HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent };
593 LARGE_INTEGER timeout;
594 HANDLE completion_event;
596 TRACE("\n");
598 while (TRUE)
600 status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable,
601 get_nt_timeout( &timeout, wait_work_item->Milliseconds ) );
602 if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT)
604 BOOLEAN TimerOrWaitFired;
606 if (status == STATUS_WAIT_0)
608 TRACE( "object %p signaled, calling callback %p with context %p\n",
609 wait_work_item->Object, wait_work_item->Callback,
610 wait_work_item->Context );
611 TimerOrWaitFired = FALSE;
613 else
615 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
616 wait_work_item->Object, wait_work_item->Callback,
617 wait_work_item->Context );
618 TimerOrWaitFired = TRUE;
620 wait_work_item->CallbackInProgress = TRUE;
621 wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired );
622 wait_work_item->CallbackInProgress = FALSE;
624 if (wait_work_item->Flags & WT_EXECUTEONLYONCE)
625 break;
627 else
628 break;
631 completion_event = wait_work_item->CompletionEvent;
632 if (completion_event) NtSetEvent( completion_event, NULL );
634 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
635 delete_wait_work_item( wait_work_item );
637 return 0;
640 /***********************************************************************
641 * RtlRegisterWait (NTDLL.@)
643 * Registers a wait for a handle to become signaled.
645 * PARAMS
646 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
647 * Object [I] Object to wait to become signaled.
648 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
649 * Context [I] Context to pass to the callback function when it is executed.
650 * Milliseconds [I] Number of milliseconds to wait before timing out.
651 * Flags [I] Flags. See notes.
653 * RETURNS
654 * Success: STATUS_SUCCESS.
655 * Failure: Any NTSTATUS code.
657 * NOTES
658 * Flags can be one or more of the following:
659 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
660 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
661 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
662 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
663 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
665 NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object,
666 RTL_WAITORTIMERCALLBACKFUNC Callback,
667 PVOID Context, ULONG Milliseconds, ULONG Flags)
669 struct wait_work_item *wait_work_item;
670 NTSTATUS status;
672 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags );
674 wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) );
675 if (!wait_work_item)
676 return STATUS_NO_MEMORY;
678 wait_work_item->Object = Object;
679 wait_work_item->Callback = Callback;
680 wait_work_item->Context = Context;
681 wait_work_item->Milliseconds = Milliseconds;
682 wait_work_item->Flags = Flags;
683 wait_work_item->CallbackInProgress = FALSE;
684 wait_work_item->DeleteCount = 0;
685 wait_work_item->CompletionEvent = NULL;
687 status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
688 if (status != STATUS_SUCCESS)
690 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
691 return status;
694 Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD |
695 WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION);
696 status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags );
697 if (status != STATUS_SUCCESS)
699 delete_wait_work_item( wait_work_item );
700 return status;
703 *NewWaitObject = wait_work_item;
704 return status;
707 /***********************************************************************
708 * RtlDeregisterWaitEx (NTDLL.@)
710 * Cancels a wait operation and frees the resources associated with calling
711 * RtlRegisterWait().
713 * PARAMS
714 * WaitObject [I] Handle to the wait object to free.
716 * RETURNS
717 * Success: STATUS_SUCCESS.
718 * Failure: Any NTSTATUS code.
720 NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent)
722 struct wait_work_item *wait_work_item = WaitHandle;
723 NTSTATUS status = STATUS_SUCCESS;
725 TRACE( "(%p)\n", WaitHandle );
727 NtSetEvent( wait_work_item->CancelEvent, NULL );
728 if (wait_work_item->CallbackInProgress)
730 if (CompletionEvent != NULL)
732 if (CompletionEvent == INVALID_HANDLE_VALUE)
734 status = NtCreateEvent( &CompletionEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
735 if (status != STATUS_SUCCESS)
736 return status;
737 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
738 if (wait_work_item->CallbackInProgress)
739 NtWaitForSingleObject( CompletionEvent, FALSE, NULL );
740 NtClose( CompletionEvent );
742 else
744 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
745 if (wait_work_item->CallbackInProgress)
746 status = STATUS_PENDING;
749 else
750 status = STATUS_PENDING;
753 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
755 status = STATUS_SUCCESS;
756 delete_wait_work_item( wait_work_item );
759 return status;
762 /***********************************************************************
763 * RtlDeregisterWait (NTDLL.@)
765 * Cancels a wait operation and frees the resources associated with calling
766 * RtlRegisterWait().
768 * PARAMS
769 * WaitObject [I] Handle to the wait object to free.
771 * RETURNS
772 * Success: STATUS_SUCCESS.
773 * Failure: Any NTSTATUS code.
775 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
777 return RtlDeregisterWaitEx(WaitHandle, NULL);
781 /************************** Timer Queue Impl **************************/
783 static void queue_remove_timer(struct queue_timer *t)
785 /* We MUST hold the queue cs while calling this function. This ensures
786 that we cannot queue another callback for this timer. The runcount
787 being zero makes sure we don't have any already queued. */
788 struct timer_queue *q = t->q;
790 assert(t->runcount == 0);
791 assert(t->destroy);
793 list_remove(&t->entry);
794 if (t->event)
795 NtSetEvent(t->event, NULL);
796 RtlFreeHeap(GetProcessHeap(), 0, t);
798 if (q->quit && list_empty(&q->timers))
799 NtSetEvent(q->event, NULL);
802 static void timer_cleanup_callback(struct queue_timer *t)
804 struct timer_queue *q = t->q;
805 RtlEnterCriticalSection(&q->cs);
807 assert(0 < t->runcount);
808 --t->runcount;
810 if (t->destroy && t->runcount == 0)
811 queue_remove_timer(t);
813 RtlLeaveCriticalSection(&q->cs);
816 static DWORD WINAPI timer_callback_wrapper(LPVOID p)
818 struct queue_timer *t = p;
819 t->callback(t->param, TRUE);
820 timer_cleanup_callback(t);
821 return 0;
824 static inline ULONGLONG queue_current_time(void)
826 LARGE_INTEGER now, freq;
827 NtQueryPerformanceCounter(&now, &freq);
828 return now.QuadPart * 1000 / freq.QuadPart;
831 static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
832 BOOL set_event)
834 /* We MUST hold the queue cs while calling this function. */
835 struct timer_queue *q = t->q;
836 struct list *ptr = &q->timers;
838 assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
840 if (time != EXPIRE_NEVER)
841 LIST_FOR_EACH(ptr, &q->timers)
843 struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
844 if (time < cur->expire)
845 break;
847 list_add_before(ptr, &t->entry);
849 t->expire = time;
851 /* If we insert at the head of the list, we need to expire sooner
852 than expected. */
853 if (set_event && &t->entry == list_head(&q->timers))
854 NtSetEvent(q->event, NULL);
857 static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
858 BOOL set_event)
860 /* We MUST hold the queue cs while calling this function. */
861 list_remove(&t->entry);
862 queue_add_timer(t, time, set_event);
865 static void queue_timer_expire(struct timer_queue *q)
867 struct queue_timer *t = NULL;
869 RtlEnterCriticalSection(&q->cs);
870 if (list_head(&q->timers))
872 ULONGLONG now, next;
873 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
874 if (!t->destroy && t->expire <= ((now = queue_current_time())))
876 ++t->runcount;
877 if (t->period)
879 next = t->expire + t->period;
880 /* avoid trigger cascade if overloaded / hibernated */
881 if (next < now)
882 next = now + t->period;
884 else
885 next = EXPIRE_NEVER;
886 queue_move_timer(t, next, FALSE);
888 else
889 t = NULL;
891 RtlLeaveCriticalSection(&q->cs);
893 if (t)
895 if (t->flags & WT_EXECUTEINTIMERTHREAD)
896 timer_callback_wrapper(t);
897 else
899 ULONG flags
900 = (t->flags
901 & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
902 | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
903 NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
904 if (status != STATUS_SUCCESS)
905 timer_cleanup_callback(t);
910 static ULONG queue_get_timeout(struct timer_queue *q)
912 struct queue_timer *t;
913 ULONG timeout = INFINITE;
915 RtlEnterCriticalSection(&q->cs);
916 if (list_head(&q->timers))
918 t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
919 assert(!t->destroy || t->expire == EXPIRE_NEVER);
921 if (t->expire != EXPIRE_NEVER)
923 ULONGLONG time = queue_current_time();
924 timeout = t->expire < time ? 0 : t->expire - time;
927 RtlLeaveCriticalSection(&q->cs);
929 return timeout;
932 static void WINAPI timer_queue_thread_proc(LPVOID p)
934 struct timer_queue *q = p;
935 ULONG timeout_ms;
937 timeout_ms = INFINITE;
938 for (;;)
940 LARGE_INTEGER timeout;
941 NTSTATUS status;
942 BOOL done = FALSE;
944 status = NtWaitForSingleObject(
945 q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
947 if (status == STATUS_WAIT_0)
949 /* There are two possible ways to trigger the event. Either
950 we are quitting and the last timer got removed, or a new
951 timer got put at the head of the list so we need to adjust
952 our timeout. */
953 RtlEnterCriticalSection(&q->cs);
954 if (q->quit && list_empty(&q->timers))
955 done = TRUE;
956 RtlLeaveCriticalSection(&q->cs);
958 else if (status == STATUS_TIMEOUT)
959 queue_timer_expire(q);
961 if (done)
962 break;
964 timeout_ms = queue_get_timeout(q);
967 NtClose(q->event);
968 RtlDeleteCriticalSection(&q->cs);
969 q->magic = 0;
970 RtlFreeHeap(GetProcessHeap(), 0, q);
973 static void queue_destroy_timer(struct queue_timer *t)
975 /* We MUST hold the queue cs while calling this function. */
976 t->destroy = TRUE;
977 if (t->runcount == 0)
978 /* Ensure a timer is promptly removed. If callbacks are pending,
979 it will be removed after the last one finishes by the callback
980 cleanup wrapper. */
981 queue_remove_timer(t);
982 else
983 /* Make sure no destroyed timer masks an active timer at the head
984 of the sorted list. */
985 queue_move_timer(t, EXPIRE_NEVER, FALSE);
988 /***********************************************************************
989 * RtlCreateTimerQueue (NTDLL.@)
991 * Creates a timer queue object and returns a handle to it.
993 * PARAMS
994 * NewTimerQueue [O] The newly created queue.
996 * RETURNS
997 * Success: STATUS_SUCCESS.
998 * Failure: Any NTSTATUS code.
1000 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
1002 NTSTATUS status;
1003 struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
1004 if (!q)
1005 return STATUS_NO_MEMORY;
1007 RtlInitializeCriticalSection(&q->cs);
1008 list_init(&q->timers);
1009 q->quit = FALSE;
1010 q->magic = TIMER_QUEUE_MAGIC;
1011 status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1012 if (status != STATUS_SUCCESS)
1014 RtlFreeHeap(GetProcessHeap(), 0, q);
1015 return status;
1017 status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1018 timer_queue_thread_proc, q, &q->thread, NULL);
1019 if (status != STATUS_SUCCESS)
1021 NtClose(q->event);
1022 RtlFreeHeap(GetProcessHeap(), 0, q);
1023 return status;
1026 *NewTimerQueue = q;
1027 return STATUS_SUCCESS;
1030 /***********************************************************************
1031 * RtlDeleteTimerQueueEx (NTDLL.@)
1033 * Deletes a timer queue object.
1035 * PARAMS
1036 * TimerQueue [I] The timer queue to destroy.
1037 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1038 * wait until all timers are finished firing before
1039 * returning. Otherwise, return immediately and set the
1040 * event when all timers are done.
1042 * RETURNS
1043 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
1044 * Failure: Any NTSTATUS code.
1046 NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
1048 struct timer_queue *q = TimerQueue;
1049 struct queue_timer *t, *temp;
1050 HANDLE thread;
1051 NTSTATUS status;
1053 if (!q || q->magic != TIMER_QUEUE_MAGIC)
1054 return STATUS_INVALID_HANDLE;
1056 thread = q->thread;
1058 RtlEnterCriticalSection(&q->cs);
1059 q->quit = TRUE;
1060 if (list_head(&q->timers))
1061 /* When the last timer is removed, it will signal the timer thread to
1062 exit... */
1063 LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
1064 queue_destroy_timer(t);
1065 else
1066 /* However if we have none, we must do it ourselves. */
1067 NtSetEvent(q->event, NULL);
1068 RtlLeaveCriticalSection(&q->cs);
1070 if (CompletionEvent == INVALID_HANDLE_VALUE)
1072 NtWaitForSingleObject(thread, FALSE, NULL);
1073 status = STATUS_SUCCESS;
1075 else
1077 if (CompletionEvent)
1079 FIXME("asynchronous return on completion event unimplemented\n");
1080 NtWaitForSingleObject(thread, FALSE, NULL);
1081 NtSetEvent(CompletionEvent, NULL);
1083 status = STATUS_PENDING;
1086 NtClose(thread);
1087 return status;
1090 static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
1092 static struct timer_queue *default_timer_queue;
1094 if (TimerQueue)
1095 return TimerQueue;
1096 else
1098 if (!default_timer_queue)
1100 HANDLE q;
1101 NTSTATUS status = RtlCreateTimerQueue(&q);
1102 if (status == STATUS_SUCCESS)
1104 PVOID p = interlocked_cmpxchg_ptr(
1105 (void **) &default_timer_queue, q, NULL);
1106 if (p)
1107 /* Got beat to the punch. */
1108 RtlDeleteTimerQueueEx(q, NULL);
1111 return default_timer_queue;
1115 /***********************************************************************
1116 * RtlCreateTimer (NTDLL.@)
1118 * Creates a new timer associated with the given queue.
1120 * PARAMS
1121 * NewTimer [O] The newly created timer.
1122 * TimerQueue [I] The queue to hold the timer.
1123 * Callback [I] The callback to fire.
1124 * Parameter [I] The argument for the callback.
1125 * DueTime [I] The delay, in milliseconds, before first firing the
1126 * timer.
1127 * Period [I] The period, in milliseconds, at which to fire the timer
1128 * after the first callback. If zero, the timer will only
1129 * fire once. It still needs to be deleted with
1130 * RtlDeleteTimer.
1131 * Flags [I] Flags controlling the execution of the callback. In
1132 * addition to the WT_* thread pool flags (see
1133 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1134 * WT_EXECUTEONLYONCE are supported.
1136 * RETURNS
1137 * Success: STATUS_SUCCESS.
1138 * Failure: Any NTSTATUS code.
1140 NTSTATUS WINAPI RtlCreateTimer(PHANDLE NewTimer, HANDLE TimerQueue,
1141 RTL_WAITORTIMERCALLBACKFUNC Callback,
1142 PVOID Parameter, DWORD DueTime, DWORD Period,
1143 ULONG Flags)
1145 NTSTATUS status;
1146 struct queue_timer *t;
1147 struct timer_queue *q = get_timer_queue(TimerQueue);
1149 if (!q) return STATUS_NO_MEMORY;
1150 if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
1152 t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
1153 if (!t)
1154 return STATUS_NO_MEMORY;
1156 t->q = q;
1157 t->runcount = 0;
1158 t->callback = Callback;
1159 t->param = Parameter;
1160 t->period = Period;
1161 t->flags = Flags;
1162 t->destroy = FALSE;
1163 t->event = NULL;
1165 status = STATUS_SUCCESS;
1166 RtlEnterCriticalSection(&q->cs);
1167 if (q->quit)
1168 status = STATUS_INVALID_HANDLE;
1169 else
1170 queue_add_timer(t, queue_current_time() + DueTime, TRUE);
1171 RtlLeaveCriticalSection(&q->cs);
1173 if (status == STATUS_SUCCESS)
1174 *NewTimer = t;
1175 else
1176 RtlFreeHeap(GetProcessHeap(), 0, t);
1178 return status;
1181 /***********************************************************************
1182 * RtlUpdateTimer (NTDLL.@)
1184 * Changes the time at which a timer expires.
1186 * PARAMS
1187 * TimerQueue [I] The queue that holds the timer.
1188 * Timer [I] The timer to update.
1189 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1190 * Period [I] The period, in milliseconds, at which to fire the timer
1191 * after the first callback. If zero, the timer will not
1192 * refire once. It still needs to be deleted with
1193 * RtlDeleteTimer.
1195 * RETURNS
1196 * Success: STATUS_SUCCESS.
1197 * Failure: Any NTSTATUS code.
1199 NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
1200 DWORD DueTime, DWORD Period)
1202 struct queue_timer *t = Timer;
1203 struct timer_queue *q = t->q;
1205 RtlEnterCriticalSection(&q->cs);
1206 /* Can't change a timer if it was once-only or destroyed. */
1207 if (t->expire != EXPIRE_NEVER)
1209 t->period = Period;
1210 queue_move_timer(t, queue_current_time() + DueTime, TRUE);
1212 RtlLeaveCriticalSection(&q->cs);
1214 return STATUS_SUCCESS;
1217 /***********************************************************************
1218 * RtlDeleteTimer (NTDLL.@)
1220 * Cancels a timer-queue timer.
1222 * PARAMS
1223 * TimerQueue [I] The queue that holds the timer.
1224 * Timer [I] The timer to update.
1225 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1226 * wait until the timer is finished firing all pending
1227 * callbacks before returning. Otherwise, return
1228 * immediately and set the timer is done.
1230 * RETURNS
1231 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1232 or if the completion event is NULL.
1233 * Failure: Any NTSTATUS code.
1235 NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
1236 HANDLE CompletionEvent)
1238 struct queue_timer *t = Timer;
1239 struct timer_queue *q;
1240 NTSTATUS status = STATUS_PENDING;
1241 HANDLE event = NULL;
1243 if (!Timer)
1244 return STATUS_INVALID_PARAMETER_1;
1245 q = t->q;
1246 if (CompletionEvent == INVALID_HANDLE_VALUE)
1248 status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
1249 if (status == STATUS_SUCCESS)
1250 status = STATUS_PENDING;
1252 else if (CompletionEvent)
1253 event = CompletionEvent;
1255 RtlEnterCriticalSection(&q->cs);
1256 t->event = event;
1257 if (t->runcount == 0 && event)
1258 status = STATUS_SUCCESS;
1259 queue_destroy_timer(t);
1260 RtlLeaveCriticalSection(&q->cs);
1262 if (CompletionEvent == INVALID_HANDLE_VALUE && event)
1264 if (status == STATUS_PENDING)
1266 NtWaitForSingleObject(event, FALSE, NULL);
1267 status = STATUS_SUCCESS;
1269 NtClose(event);
1272 return status;
1275 /***********************************************************************
1276 * timerqueue_thread_proc (internal)
1278 static void CALLBACK timerqueue_thread_proc( void *param )
1280 ULONGLONG timeout_lower, timeout_upper, new_timeout;
1281 struct threadpool_object *other_timer;
1282 LARGE_INTEGER now, timeout;
1283 struct list *ptr;
1285 TRACE( "starting timer queue thread\n" );
1287 RtlEnterCriticalSection( &timerqueue.cs );
1288 for (;;)
1290 NtQuerySystemTime( &now );
1292 /* Check for expired timers. */
1293 while ((ptr = list_head( &timerqueue.pending_timers )))
1295 struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
1296 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1297 assert( timer->u.timer.timer_pending );
1298 if (timer->u.timer.timeout > now.QuadPart)
1299 break;
1301 /* Queue a new callback in one of the worker threads. */
1302 list_remove( &timer->u.timer.timer_entry );
1303 timer->u.timer.timer_pending = FALSE;
1304 tp_object_submit( timer, FALSE );
1306 /* Insert the timer back into the queue, except its marked for shutdown. */
1307 if (timer->u.timer.period && !timer->shutdown)
1309 timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
1310 if (timer->u.timer.timeout <= now.QuadPart)
1311 timer->u.timer.timeout = now.QuadPart + 1;
1313 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1314 struct threadpool_object, u.timer.timer_entry )
1316 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1317 if (timer->u.timer.timeout < other_timer->u.timer.timeout)
1318 break;
1320 list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
1321 timer->u.timer.timer_pending = TRUE;
1325 timeout_lower = TIMEOUT_INFINITE;
1326 timeout_upper = TIMEOUT_INFINITE;
1328 /* Determine next timeout and use the window length to optimize wakeup times. */
1329 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
1330 struct threadpool_object, u.timer.timer_entry )
1332 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
1333 if (other_timer->u.timer.timeout >= timeout_upper)
1334 break;
1336 timeout_lower = other_timer->u.timer.timeout;
1337 new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
1338 if (new_timeout < timeout_upper)
1339 timeout_upper = new_timeout;
1342 /* Wait for timer update events or until the next timer expires. */
1343 if (timerqueue.objcount)
1345 timeout.QuadPart = timeout_lower;
1346 RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
1347 continue;
1350 /* All timers have been destroyed, if no new timers are created
1351 * within some amount of time, then we can shutdown this thread. */
1352 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1353 if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
1354 &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
1356 break;
1360 timerqueue.thread_running = FALSE;
1361 RtlLeaveCriticalSection( &timerqueue.cs );
1363 TRACE( "terminating timer queue thread\n" );
1366 /***********************************************************************
1367 * tp_timerqueue_lock (internal)
1369 * Acquires a lock on the global timerqueue. When the lock is acquired
1370 * successfully, it is guaranteed that the timer thread is running.
1372 static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
1374 NTSTATUS status = STATUS_SUCCESS;
1375 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1377 timer->u.timer.timer_initialized = FALSE;
1378 timer->u.timer.timer_pending = FALSE;
1379 timer->u.timer.timer_set = FALSE;
1380 timer->u.timer.timeout = 0;
1381 timer->u.timer.period = 0;
1382 timer->u.timer.window_length = 0;
1384 RtlEnterCriticalSection( &timerqueue.cs );
1386 /* Make sure that the timerqueue thread is running. */
1387 if (!timerqueue.thread_running)
1389 HANDLE thread;
1390 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1391 timerqueue_thread_proc, NULL, &thread, NULL );
1392 if (status == STATUS_SUCCESS)
1394 timerqueue.thread_running = TRUE;
1395 NtClose( thread );
1399 if (status == STATUS_SUCCESS)
1401 timer->u.timer.timer_initialized = TRUE;
1402 timerqueue.objcount++;
1405 RtlLeaveCriticalSection( &timerqueue.cs );
1406 return status;
1409 /***********************************************************************
1410 * tp_timerqueue_unlock (internal)
1412 * Releases a lock on the global timerqueue.
1414 static void tp_timerqueue_unlock( struct threadpool_object *timer )
1416 assert( timer->type == TP_OBJECT_TYPE_TIMER );
1418 RtlEnterCriticalSection( &timerqueue.cs );
1419 if (timer->u.timer.timer_initialized)
1421 /* If timer was pending, remove it. */
1422 if (timer->u.timer.timer_pending)
1424 list_remove( &timer->u.timer.timer_entry );
1425 timer->u.timer.timer_pending = FALSE;
1428 /* If the last timer object was destroyed, then wake up the thread. */
1429 if (!--timerqueue.objcount)
1431 assert( list_empty( &timerqueue.pending_timers ) );
1432 RtlWakeAllConditionVariable( &timerqueue.update_event );
1435 timer->u.timer.timer_initialized = FALSE;
1437 RtlLeaveCriticalSection( &timerqueue.cs );
1440 /***********************************************************************
1441 * waitqueue_thread_proc (internal)
1443 static void CALLBACK waitqueue_thread_proc( void *param )
1445 struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
1446 HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
1447 struct waitqueue_bucket *bucket = param;
1448 struct threadpool_object *wait, *next;
1449 LARGE_INTEGER now, timeout;
1450 DWORD num_handles;
1451 NTSTATUS status;
1453 TRACE( "starting wait queue thread\n" );
1455 RtlEnterCriticalSection( &waitqueue.cs );
1457 for (;;)
1459 NtQuerySystemTime( &now );
1460 timeout.QuadPart = TIMEOUT_INFINITE;
1461 num_handles = 0;
1463 LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
1464 u.wait.wait_entry )
1466 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1467 if (wait->u.wait.timeout <= now.QuadPart)
1469 /* Wait object timed out. */
1470 list_remove( &wait->u.wait.wait_entry );
1471 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1472 tp_object_submit( wait, FALSE );
1474 else
1476 if (wait->u.wait.timeout < timeout.QuadPart)
1477 timeout.QuadPart = wait->u.wait.timeout;
1479 assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
1480 interlocked_inc( &wait->refcount );
1481 objects[num_handles] = wait;
1482 handles[num_handles] = wait->u.wait.handle;
1483 num_handles++;
1487 if (!bucket->objcount)
1489 /* All wait objects have been destroyed, if no new wait objects are created
1490 * within some amount of time, then we can shutdown this thread. */
1491 assert( num_handles == 0 );
1492 RtlLeaveCriticalSection( &waitqueue.cs );
1493 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
1494 status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout );
1495 RtlEnterCriticalSection( &waitqueue.cs );
1497 if (status == STATUS_TIMEOUT && !bucket->objcount)
1498 break;
1500 else
1502 handles[num_handles] = bucket->update_event;
1503 RtlLeaveCriticalSection( &waitqueue.cs );
1504 status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout );
1505 RtlEnterCriticalSection( &waitqueue.cs );
1507 if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
1509 wait = objects[status - STATUS_WAIT_0];
1510 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1511 if (wait->u.wait.bucket)
1513 /* Wait object signaled. */
1514 assert( wait->u.wait.bucket == bucket );
1515 list_remove( &wait->u.wait.wait_entry );
1516 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1517 tp_object_submit( wait, TRUE );
1519 else
1520 ERR("wait object %p triggered while object was destroyed\n", wait);
1523 /* Release temporary references to wait objects. */
1524 while (num_handles)
1526 wait = objects[--num_handles];
1527 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1528 tp_object_release( wait );
1532 /* Try to merge bucket with other threads. */
1533 if (waitqueue.num_buckets > 1 && bucket->objcount &&
1534 bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
1536 struct waitqueue_bucket *other_bucket;
1537 LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1539 if (other_bucket != bucket && other_bucket->objcount &&
1540 other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
1542 other_bucket->objcount += bucket->objcount;
1543 bucket->objcount = 0;
1545 /* Update reserved list. */
1546 LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct threadpool_object, u.wait.wait_entry )
1548 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1549 wait->u.wait.bucket = other_bucket;
1551 list_move_tail( &other_bucket->reserved, &bucket->reserved );
1553 /* Update waiting list. */
1554 LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct threadpool_object, u.wait.wait_entry )
1556 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1557 wait->u.wait.bucket = other_bucket;
1559 list_move_tail( &other_bucket->waiting, &bucket->waiting );
1561 /* Move bucket to the end, to keep the probability of
1562 * newly added wait objects as small as possible. */
1563 list_remove( &bucket->bucket_entry );
1564 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1566 NtSetEvent( other_bucket->update_event, NULL );
1567 break;
1573 /* Remove this bucket from the list. */
1574 list_remove( &bucket->bucket_entry );
1575 if (!--waitqueue.num_buckets)
1576 assert( list_empty( &waitqueue.buckets ) );
1578 RtlLeaveCriticalSection( &waitqueue.cs );
1580 TRACE( "terminating wait queue thread\n" );
1582 assert( bucket->objcount == 0 );
1583 assert( list_empty( &bucket->reserved ) );
1584 assert( list_empty( &bucket->waiting ) );
1585 NtClose( bucket->update_event );
1587 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1590 /***********************************************************************
1591 * tp_waitqueue_lock (internal)
1593 static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
1595 struct waitqueue_bucket *bucket;
1596 NTSTATUS status;
1597 HANDLE thread;
1598 assert( wait->type = TP_OBJECT_TYPE_WAIT );
1600 wait->u.wait.signaled = 0;
1601 wait->u.wait.bucket = NULL;
1602 wait->u.wait.wait_pending = FALSE;
1603 wait->u.wait.timeout = 0;
1604 wait->u.wait.handle = INVALID_HANDLE_VALUE;
1606 RtlEnterCriticalSection( &waitqueue.cs );
1608 /* Try to assign to existing bucket if possible. */
1609 LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
1611 if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS)
1613 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1614 wait->u.wait.bucket = bucket;
1615 bucket->objcount++;
1617 status = STATUS_SUCCESS;
1618 goto out;
1622 /* Create a new bucket and corresponding worker thread. */
1623 bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
1624 if (!bucket)
1626 status = STATUS_NO_MEMORY;
1627 goto out;
1630 bucket->objcount = 0;
1631 list_init( &bucket->reserved );
1632 list_init( &bucket->waiting );
1634 status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
1635 NULL, SynchronizationEvent, FALSE );
1636 if (status)
1638 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1639 goto out;
1642 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1643 waitqueue_thread_proc, bucket, &thread, NULL );
1644 if (status == STATUS_SUCCESS)
1646 list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
1647 waitqueue.num_buckets++;
1649 list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
1650 wait->u.wait.bucket = bucket;
1651 bucket->objcount++;
1653 NtClose( thread );
1655 else
1657 NtClose( bucket->update_event );
1658 RtlFreeHeap( GetProcessHeap(), 0, bucket );
1661 out:
1662 RtlLeaveCriticalSection( &waitqueue.cs );
1663 return status;
1666 /***********************************************************************
1667 * tp_waitqueue_unlock (internal)
1669 static void tp_waitqueue_unlock( struct threadpool_object *wait )
1671 assert( wait->type == TP_OBJECT_TYPE_WAIT );
1673 RtlEnterCriticalSection( &waitqueue.cs );
1674 if (wait->u.wait.bucket)
1676 struct waitqueue_bucket *bucket = wait->u.wait.bucket;
1677 assert( bucket->objcount > 0 );
1679 list_remove( &wait->u.wait.wait_entry );
1680 wait->u.wait.bucket = NULL;
1681 bucket->objcount--;
1683 NtSetEvent( bucket->update_event, NULL );
1685 RtlLeaveCriticalSection( &waitqueue.cs );
1688 /***********************************************************************
1689 * tp_threadpool_alloc (internal)
1691 * Allocates a new threadpool object.
1693 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
1695 struct threadpool *pool;
1697 pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
1698 if (!pool)
1699 return STATUS_NO_MEMORY;
1701 pool->refcount = 1;
1702 pool->objcount = 0;
1703 pool->shutdown = FALSE;
1705 RtlInitializeCriticalSection( &pool->cs );
1706 pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
1708 list_init( &pool->pool );
1709 RtlInitializeConditionVariable( &pool->update_event );
1711 pool->max_workers = 500;
1712 pool->min_workers = 0;
1713 pool->num_workers = 0;
1714 pool->num_busy_workers = 0;
1716 TRACE( "allocated threadpool %p\n", pool );
1718 *out = pool;
1719 return STATUS_SUCCESS;
1722 /***********************************************************************
1723 * tp_threadpool_shutdown (internal)
1725 * Prepares the shutdown of a threadpool object and notifies all worker
1726 * threads to terminate (after all remaining work items have been
1727 * processed).
1729 static void tp_threadpool_shutdown( struct threadpool *pool )
1731 assert( pool != default_threadpool );
1733 pool->shutdown = TRUE;
1734 RtlWakeAllConditionVariable( &pool->update_event );
1737 /***********************************************************************
1738 * tp_threadpool_release (internal)
1740 * Releases a reference to a threadpool object.
1742 static BOOL tp_threadpool_release( struct threadpool *pool )
1744 if (interlocked_dec( &pool->refcount ))
1745 return FALSE;
1747 TRACE( "destroying threadpool %p\n", pool );
1749 assert( pool->shutdown );
1750 assert( !pool->objcount );
1751 assert( list_empty( &pool->pool ) );
1753 pool->cs.DebugInfo->Spare[0] = 0;
1754 RtlDeleteCriticalSection( &pool->cs );
1756 RtlFreeHeap( GetProcessHeap(), 0, pool );
1757 return TRUE;
1760 /***********************************************************************
1761 * tp_threadpool_lock (internal)
1763 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1764 * block. When the lock is acquired successfully, it is guaranteed that
1765 * there is at least one worker thread to process tasks.
1767 static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
1769 struct threadpool *pool = NULL;
1770 NTSTATUS status = STATUS_SUCCESS;
1772 if (environment)
1773 pool = (struct threadpool *)environment->Pool;
1775 if (!pool)
1777 if (!default_threadpool)
1779 status = tp_threadpool_alloc( &pool );
1780 if (status != STATUS_SUCCESS)
1781 return status;
1783 if (interlocked_cmpxchg_ptr( (void *)&default_threadpool, pool, NULL ) != NULL)
1785 tp_threadpool_shutdown( pool );
1786 tp_threadpool_release( pool );
1790 pool = default_threadpool;
1793 RtlEnterCriticalSection( &pool->cs );
1795 /* Make sure that the threadpool has at least one thread. */
1796 if (!pool->num_workers)
1798 HANDLE thread;
1799 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1800 threadpool_worker_proc, pool, &thread, NULL );
1801 if (status == STATUS_SUCCESS)
1803 interlocked_inc( &pool->refcount );
1804 pool->num_workers++;
1805 NtClose( thread );
1809 /* Keep a reference, and increment objcount to ensure that the
1810 * last thread doesn't terminate. */
1811 if (status == STATUS_SUCCESS)
1813 interlocked_inc( &pool->refcount );
1814 pool->objcount++;
1817 RtlLeaveCriticalSection( &pool->cs );
1819 if (status != STATUS_SUCCESS)
1820 return status;
1822 *out = pool;
1823 return STATUS_SUCCESS;
1826 /***********************************************************************
1827 * tp_threadpool_unlock (internal)
1829 * Releases a lock on a threadpool.
1831 static void tp_threadpool_unlock( struct threadpool *pool )
1833 RtlEnterCriticalSection( &pool->cs );
1834 pool->objcount--;
1835 RtlLeaveCriticalSection( &pool->cs );
1836 tp_threadpool_release( pool );
1839 /***********************************************************************
1840 * tp_group_alloc (internal)
1842 * Allocates a new threadpool group object.
1844 static NTSTATUS tp_group_alloc( struct threadpool_group **out )
1846 struct threadpool_group *group;
1848 group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
1849 if (!group)
1850 return STATUS_NO_MEMORY;
1852 group->refcount = 1;
1853 group->shutdown = FALSE;
1855 RtlInitializeCriticalSection( &group->cs );
1856 group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
1858 list_init( &group->members );
1860 TRACE( "allocated group %p\n", group );
1862 *out = group;
1863 return STATUS_SUCCESS;
1866 /***********************************************************************
1867 * tp_group_shutdown (internal)
1869 * Marks the group object for shutdown.
1871 static void tp_group_shutdown( struct threadpool_group *group )
1873 group->shutdown = TRUE;
1876 /***********************************************************************
1877 * tp_group_release (internal)
1879 * Releases a reference to a group object.
1881 static BOOL tp_group_release( struct threadpool_group *group )
1883 if (interlocked_dec( &group->refcount ))
1884 return FALSE;
1886 TRACE( "destroying group %p\n", group );
1888 assert( group->shutdown );
1889 assert( list_empty( &group->members ) );
1891 group->cs.DebugInfo->Spare[0] = 0;
1892 RtlDeleteCriticalSection( &group->cs );
1894 RtlFreeHeap( GetProcessHeap(), 0, group );
1895 return TRUE;
1898 /***********************************************************************
1899 * tp_object_initialize (internal)
1901 * Initializes members of a threadpool object.
1903 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
1904 PVOID userdata, TP_CALLBACK_ENVIRON *environment )
1906 BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
1908 object->refcount = 1;
1909 object->shutdown = FALSE;
1911 object->pool = pool;
1912 object->group = NULL;
1913 object->userdata = userdata;
1914 object->group_cancel_callback = NULL;
1915 object->finalization_callback = NULL;
1916 object->may_run_long = 0;
1917 object->race_dll = NULL;
1919 memset( &object->group_entry, 0, sizeof(object->group_entry) );
1920 object->is_group_member = FALSE;
1922 memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
1923 RtlInitializeConditionVariable( &object->finished_event );
1924 RtlInitializeConditionVariable( &object->group_finished_event );
1925 object->num_pending_callbacks = 0;
1926 object->num_running_callbacks = 0;
1927 object->num_associated_callbacks = 0;
1929 if (environment)
1931 if (environment->Version != 1)
1932 FIXME( "unsupported environment version %u\n", environment->Version );
1934 object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
1935 object->group_cancel_callback = environment->CleanupGroupCancelCallback;
1936 object->finalization_callback = environment->FinalizationCallback;
1937 object->may_run_long = environment->u.s.LongFunction != 0;
1938 object->race_dll = environment->RaceDll;
1940 if (environment->ActivationContext)
1941 FIXME( "activation context not supported yet\n" );
1943 if (environment->u.s.Persistent)
1944 FIXME( "persistent threads not supported yet\n" );
1947 if (object->race_dll)
1948 LdrAddRefDll( 0, object->race_dll );
1950 TRACE( "allocated object %p of type %u\n", object, object->type );
1952 /* For simple callbacks we have to run tp_object_submit before adding this object
1953 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1954 * will be set, and tp_object_submit would fail with an assertion. */
1956 if (is_simple_callback)
1957 tp_object_submit( object, FALSE );
1959 if (object->group)
1961 struct threadpool_group *group = object->group;
1962 interlocked_inc( &group->refcount );
1964 RtlEnterCriticalSection( &group->cs );
1965 list_add_tail( &group->members, &object->group_entry );
1966 object->is_group_member = TRUE;
1967 RtlLeaveCriticalSection( &group->cs );
1970 if (is_simple_callback)
1972 tp_object_shutdown( object );
1973 tp_object_release( object );
1977 /***********************************************************************
1978 * tp_object_submit (internal)
1980 * Submits a threadpool object to the associcated threadpool. This
1981 * function has to be VOID because TpPostWork can never fail on Windows.
1983 static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
1985 struct threadpool *pool = object->pool;
1986 NTSTATUS status = STATUS_UNSUCCESSFUL;
1988 assert( !object->shutdown );
1989 assert( !pool->shutdown );
1991 RtlEnterCriticalSection( &pool->cs );
1993 /* Start new worker threads if required. */
1994 if (pool->num_busy_workers >= pool->num_workers &&
1995 pool->num_workers < pool->max_workers)
1997 HANDLE thread;
1998 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
1999 threadpool_worker_proc, pool, &thread, NULL );
2000 if (status == STATUS_SUCCESS)
2002 interlocked_inc( &pool->refcount );
2003 pool->num_workers++;
2004 NtClose( thread );
2008 /* Queue work item and increment refcount. */
2009 interlocked_inc( &object->refcount );
2010 if (!object->num_pending_callbacks++)
2011 list_add_tail( &pool->pool, &object->pool_entry );
2013 /* Count how often the object was signaled. */
2014 if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
2015 object->u.wait.signaled++;
2017 /* No new thread started - wake up one existing thread. */
2018 if (status != STATUS_SUCCESS)
2020 assert( pool->num_workers > 0 );
2021 RtlWakeConditionVariable( &pool->update_event );
2024 RtlLeaveCriticalSection( &pool->cs );
2027 /***********************************************************************
2028 * tp_object_cancel (internal)
2030 * Cancels all currently pending callbacks for a specific object.
2032 static void tp_object_cancel( struct threadpool_object *object, BOOL group_cancel, PVOID userdata )
2034 struct threadpool *pool = object->pool;
2035 LONG pending_callbacks = 0;
2037 RtlEnterCriticalSection( &pool->cs );
2038 if (object->num_pending_callbacks)
2040 pending_callbacks = object->num_pending_callbacks;
2041 object->num_pending_callbacks = 0;
2042 list_remove( &object->pool_entry );
2044 if (object->type == TP_OBJECT_TYPE_WAIT)
2045 object->u.wait.signaled = 0;
2047 RtlLeaveCriticalSection( &pool->cs );
2049 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2050 if (pending_callbacks && group_cancel && object->group_cancel_callback)
2052 TRACE( "executing group cancel callback %p(%p, %p)\n", object->group_cancel_callback, object, userdata );
2053 object->group_cancel_callback( object, userdata );
2054 TRACE( "callback %p returned\n", object->group_cancel_callback );
2057 while (pending_callbacks--)
2058 tp_object_release( object );
2061 /***********************************************************************
2062 * tp_object_wait (internal)
2064 * Waits until all pending and running callbacks of a specific object
2065 * have been processed.
2067 static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
2069 struct threadpool *pool = object->pool;
2071 RtlEnterCriticalSection( &pool->cs );
2072 if (group_wait)
2074 while (object->num_pending_callbacks || object->num_running_callbacks)
2075 RtlSleepConditionVariableCS( &object->group_finished_event, &pool->cs, NULL );
2077 else
2079 while (object->num_pending_callbacks || object->num_associated_callbacks)
2080 RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
2082 RtlLeaveCriticalSection( &pool->cs );
2085 /***********************************************************************
2086 * tp_object_shutdown (internal)
2088 * Marks a threadpool object for shutdown (which means that no further
2089 * tasks can be submitted).
2091 static void tp_object_shutdown( struct threadpool_object *object )
2093 if (object->type == TP_OBJECT_TYPE_TIMER)
2094 tp_timerqueue_unlock( object );
2095 else if (object->type == TP_OBJECT_TYPE_WAIT)
2096 tp_waitqueue_unlock( object );
2098 object->shutdown = TRUE;
2101 /***********************************************************************
2102 * tp_object_release (internal)
2104 * Releases a reference to a threadpool object.
2106 static BOOL tp_object_release( struct threadpool_object *object )
2108 if (interlocked_dec( &object->refcount ))
2109 return FALSE;
2111 TRACE( "destroying object %p of type %u\n", object, object->type );
2113 assert( object->shutdown );
2114 assert( !object->num_pending_callbacks );
2115 assert( !object->num_running_callbacks );
2116 assert( !object->num_associated_callbacks );
2118 /* release reference to the group */
2119 if (object->group)
2121 struct threadpool_group *group = object->group;
2123 RtlEnterCriticalSection( &group->cs );
2124 if (object->is_group_member)
2126 list_remove( &object->group_entry );
2127 object->is_group_member = FALSE;
2129 RtlLeaveCriticalSection( &group->cs );
2131 tp_group_release( group );
2134 tp_threadpool_unlock( object->pool );
2136 if (object->race_dll)
2137 LdrUnloadDll( object->race_dll );
2139 RtlFreeHeap( GetProcessHeap(), 0, object );
2140 return TRUE;
2143 /***********************************************************************
2144 * threadpool_worker_proc (internal)
2146 static void CALLBACK threadpool_worker_proc( void *param )
2148 TP_CALLBACK_INSTANCE *callback_instance;
2149 struct threadpool_instance instance;
2150 struct threadpool *pool = param;
2151 TP_WAIT_RESULT wait_result = 0;
2152 LARGE_INTEGER timeout;
2153 struct list *ptr;
2154 NTSTATUS status;
2156 TRACE( "starting worker thread for pool %p\n", pool );
2158 RtlEnterCriticalSection( &pool->cs );
2159 for (;;)
2161 while ((ptr = list_head( &pool->pool )))
2163 struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
2164 assert( object->num_pending_callbacks > 0 );
2166 /* If further pending callbacks are queued, move the work item to
2167 * the end of the pool list. Otherwise remove it from the pool. */
2168 list_remove( &object->pool_entry );
2169 if (--object->num_pending_callbacks)
2170 list_add_tail( &pool->pool, &object->pool_entry );
2172 /* For wait objects check if they were signaled or have timed out. */
2173 if (object->type == TP_OBJECT_TYPE_WAIT)
2175 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
2176 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
2179 /* Leave critical section and do the actual callback. */
2180 object->num_associated_callbacks++;
2181 object->num_running_callbacks++;
2182 pool->num_busy_workers++;
2183 RtlLeaveCriticalSection( &pool->cs );
2185 /* Initialize threadpool instance struct. */
2186 callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
2187 instance.object = object;
2188 instance.threadid = GetCurrentThreadId();
2189 instance.associated = TRUE;
2190 instance.may_run_long = object->may_run_long;
2191 instance.cleanup.critical_section = NULL;
2192 instance.cleanup.mutex = NULL;
2193 instance.cleanup.semaphore = NULL;
2194 instance.cleanup.semaphore_count = 0;
2195 instance.cleanup.event = NULL;
2196 instance.cleanup.library = NULL;
2198 switch (object->type)
2200 case TP_OBJECT_TYPE_SIMPLE:
2202 TRACE( "executing simple callback %p(%p, %p)\n",
2203 object->u.simple.callback, callback_instance, object->userdata );
2204 object->u.simple.callback( callback_instance, object->userdata );
2205 TRACE( "callback %p returned\n", object->u.simple.callback );
2206 break;
2209 case TP_OBJECT_TYPE_WORK:
2211 TRACE( "executing work callback %p(%p, %p, %p)\n",
2212 object->u.work.callback, callback_instance, object->userdata, object );
2213 object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
2214 TRACE( "callback %p returned\n", object->u.work.callback );
2215 break;
2218 case TP_OBJECT_TYPE_TIMER:
2220 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2221 object->u.timer.callback, callback_instance, object->userdata, object );
2222 object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
2223 TRACE( "callback %p returned\n", object->u.timer.callback );
2224 break;
2227 case TP_OBJECT_TYPE_WAIT:
2229 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2230 object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
2231 object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
2232 TRACE( "callback %p returned\n", object->u.wait.callback );
2233 break;
2236 default:
2237 assert(0);
2238 break;
2241 /* Execute finalization callback. */
2242 if (object->finalization_callback)
2244 TRACE( "executing finalization callback %p(%p, %p)\n",
2245 object->finalization_callback, callback_instance, object->userdata );
2246 object->finalization_callback( callback_instance, object->userdata );
2247 TRACE( "callback %p returned\n", object->finalization_callback );
2250 /* Execute cleanup tasks. */
2251 if (instance.cleanup.critical_section)
2253 RtlLeaveCriticalSection( instance.cleanup.critical_section );
2255 if (instance.cleanup.mutex)
2257 status = NtReleaseMutant( instance.cleanup.mutex, NULL );
2258 if (status != STATUS_SUCCESS) goto skip_cleanup;
2260 if (instance.cleanup.semaphore)
2262 status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
2263 if (status != STATUS_SUCCESS) goto skip_cleanup;
2265 if (instance.cleanup.event)
2267 status = NtSetEvent( instance.cleanup.event, NULL );
2268 if (status != STATUS_SUCCESS) goto skip_cleanup;
2270 if (instance.cleanup.library)
2272 LdrUnloadDll( instance.cleanup.library );
2275 skip_cleanup:
2276 RtlEnterCriticalSection( &pool->cs );
2277 pool->num_busy_workers--;
2279 object->num_running_callbacks--;
2280 if (!object->num_pending_callbacks && !object->num_running_callbacks)
2281 RtlWakeAllConditionVariable( &object->group_finished_event );
2283 if (instance.associated)
2285 object->num_associated_callbacks--;
2286 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2287 RtlWakeAllConditionVariable( &object->finished_event );
2290 tp_object_release( object );
2293 /* Shutdown worker thread if requested. */
2294 if (pool->shutdown)
2295 break;
2297 /* Wait for new tasks or until the timeout expires. A thread only terminates
2298 * when no new tasks are available, and the number of threads can be
2299 * decreased without violating the min_workers limit. An exception is when
2300 * min_workers == 0, then objcount is used to detect if the last thread
2301 * can be terminated. */
2302 timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
2303 if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
2304 !list_head( &pool->pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
2305 (!pool->min_workers && !pool->objcount)))
2307 break;
2310 pool->num_workers--;
2311 RtlLeaveCriticalSection( &pool->cs );
2313 TRACE( "terminating worker thread for pool %p\n", pool );
2314 tp_threadpool_release( pool );
2317 /***********************************************************************
2318 * TpAllocCleanupGroup (NTDLL.@)
2320 NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
2322 TRACE( "%p\n", out );
2324 return tp_group_alloc( (struct threadpool_group **)out );
2327 /***********************************************************************
2328 * TpAllocPool (NTDLL.@)
2330 NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
2332 TRACE( "%p %p\n", out, reserved );
2334 if (reserved)
2335 FIXME( "reserved argument is nonzero (%p)", reserved );
2337 return tp_threadpool_alloc( (struct threadpool **)out );
2340 /***********************************************************************
2341 * TpAllocTimer (NTDLL.@)
2343 NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID userdata,
2344 TP_CALLBACK_ENVIRON *environment )
2346 struct threadpool_object *object;
2347 struct threadpool *pool;
2348 NTSTATUS status;
2350 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2352 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2353 if (!object)
2354 return STATUS_NO_MEMORY;
2356 status = tp_threadpool_lock( &pool, environment );
2357 if (status)
2359 RtlFreeHeap( GetProcessHeap(), 0, object );
2360 return status;
2363 object->type = TP_OBJECT_TYPE_TIMER;
2364 object->u.timer.callback = callback;
2366 status = tp_timerqueue_lock( object );
2367 if (status)
2369 tp_threadpool_unlock( pool );
2370 RtlFreeHeap( GetProcessHeap(), 0, object );
2371 return status;
2374 tp_object_initialize( object, pool, userdata, environment );
2376 *out = (TP_TIMER *)object;
2377 return STATUS_SUCCESS;
2380 /***********************************************************************
2381 * TpAllocWait (NTDLL.@)
2383 NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
2384 TP_CALLBACK_ENVIRON *environment )
2386 struct threadpool_object *object;
2387 struct threadpool *pool;
2388 NTSTATUS status;
2390 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2392 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2393 if (!object)
2394 return STATUS_NO_MEMORY;
2396 status = tp_threadpool_lock( &pool, environment );
2397 if (status)
2399 RtlFreeHeap( GetProcessHeap(), 0, object );
2400 return status;
2403 object->type = TP_OBJECT_TYPE_WAIT;
2404 object->u.wait.callback = callback;
2406 status = tp_waitqueue_lock( object );
2407 if (status)
2409 tp_threadpool_unlock( pool );
2410 RtlFreeHeap( GetProcessHeap(), 0, object );
2411 return status;
2414 tp_object_initialize( object, pool, userdata, environment );
2416 *out = (TP_WAIT *)object;
2417 return STATUS_SUCCESS;
2420 /***********************************************************************
2421 * TpAllocWork (NTDLL.@)
2423 NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
2424 TP_CALLBACK_ENVIRON *environment )
2426 struct threadpool_object *object;
2427 struct threadpool *pool;
2428 NTSTATUS status;
2430 TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
2432 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2433 if (!object)
2434 return STATUS_NO_MEMORY;
2436 status = tp_threadpool_lock( &pool, environment );
2437 if (status)
2439 RtlFreeHeap( GetProcessHeap(), 0, object );
2440 return status;
2443 object->type = TP_OBJECT_TYPE_WORK;
2444 object->u.work.callback = callback;
2445 tp_object_initialize( object, pool, userdata, environment );
2447 *out = (TP_WORK *)object;
2448 return STATUS_SUCCESS;
2451 /***********************************************************************
2452 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2454 VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance, CRITICAL_SECTION *crit )
2456 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2458 TRACE( "%p %p\n", instance, crit );
2460 if (!this->cleanup.critical_section)
2461 this->cleanup.critical_section = crit;
2464 /***********************************************************************
2465 * TpCallbackMayRunLong (NTDLL.@)
2467 NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
2469 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2470 struct threadpool_object *object = this->object;
2471 struct threadpool *pool;
2472 NTSTATUS status = STATUS_SUCCESS;
2474 TRACE( "%p\n", instance );
2476 if (this->threadid != GetCurrentThreadId())
2478 ERR("called from wrong thread, ignoring\n");
2479 return STATUS_UNSUCCESSFUL; /* FIXME */
2482 if (this->may_run_long)
2483 return STATUS_SUCCESS;
2485 pool = object->pool;
2486 RtlEnterCriticalSection( &pool->cs );
2488 /* Start new worker threads if required. */
2489 if (pool->num_busy_workers >= pool->num_workers)
2491 if (pool->num_workers < pool->max_workers)
2493 HANDLE thread;
2494 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
2495 threadpool_worker_proc, pool, &thread, NULL );
2496 if (status == STATUS_SUCCESS)
2498 interlocked_inc( &pool->refcount );
2499 pool->num_workers++;
2500 NtClose( thread );
2503 else
2505 status = STATUS_TOO_MANY_THREADS;
2509 RtlLeaveCriticalSection( &pool->cs );
2510 this->may_run_long = TRUE;
2511 return status;
2514 /***********************************************************************
2515 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2517 VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE mutex )
2519 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2521 TRACE( "%p %p\n", instance, mutex );
2523 if (!this->cleanup.mutex)
2524 this->cleanup.mutex = mutex;
2527 /***********************************************************************
2528 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2530 VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE semaphore, DWORD count )
2532 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2534 TRACE( "%p %p %u\n", instance, semaphore, count );
2536 if (!this->cleanup.semaphore)
2538 this->cleanup.semaphore = semaphore;
2539 this->cleanup.semaphore_count = count;
2543 /***********************************************************************
2544 * TpCallbackSetEventOnCompletion (NTDLL.@)
2546 VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event )
2548 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2550 TRACE( "%p %p\n", instance, event );
2552 if (!this->cleanup.event)
2553 this->cleanup.event = event;
2556 /***********************************************************************
2557 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2559 VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE module )
2561 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2563 TRACE( "%p %p\n", instance, module );
2565 if (!this->cleanup.library)
2566 this->cleanup.library = module;
2569 /***********************************************************************
2570 * TpDisassociateCallback (NTDLL.@)
2572 VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
2574 struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
2575 struct threadpool_object *object = this->object;
2576 struct threadpool *pool;
2578 TRACE( "%p\n", instance );
2580 if (this->threadid != GetCurrentThreadId())
2582 ERR("called from wrong thread, ignoring\n");
2583 return;
2586 if (!this->associated)
2587 return;
2589 pool = object->pool;
2590 RtlEnterCriticalSection( &pool->cs );
2592 object->num_associated_callbacks--;
2593 if (!object->num_pending_callbacks && !object->num_associated_callbacks)
2594 RtlWakeAllConditionVariable( &object->finished_event );
2596 RtlLeaveCriticalSection( &pool->cs );
2597 this->associated = FALSE;
2600 /***********************************************************************
2601 * TpIsTimerSet (NTDLL.@)
2603 BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
2605 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2607 TRACE( "%p\n", timer );
2609 return this->u.timer.timer_set;
2612 /***********************************************************************
2613 * TpPostWork (NTDLL.@)
2615 VOID WINAPI TpPostWork( TP_WORK *work )
2617 struct threadpool_object *this = impl_from_TP_WORK( work );
2619 TRACE( "%p\n", work );
2621 tp_object_submit( this, FALSE );
2624 /***********************************************************************
2625 * TpReleaseCleanupGroup (NTDLL.@)
2627 VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
2629 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2631 TRACE( "%p\n", group );
2633 tp_group_shutdown( this );
2634 tp_group_release( this );
2637 /***********************************************************************
2638 * TpReleaseCleanupGroupMembers (NTDLL.@)
2640 VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
2642 struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
2643 struct threadpool_object *object, *next;
2644 struct list members;
2646 TRACE( "%p %u %p\n", group, cancel_pending, userdata );
2648 RtlEnterCriticalSection( &this->cs );
2650 /* Unset group, increase references, and mark objects for shutdown */
2651 LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
2653 assert( object->group == this );
2654 assert( object->is_group_member );
2656 /* Simple callbacks are very special. The user doesn't hold any reference, so
2657 * they would be released too early. Add one additional temporary reference. */
2658 if (object->type == TP_OBJECT_TYPE_SIMPLE)
2660 if (interlocked_inc( &object->refcount ) == 1)
2662 /* Object is basically already destroyed, but group reference
2663 * was not deleted yet. We can safely ignore this object. */
2664 interlocked_dec( &object->refcount );
2665 list_remove( &object->group_entry );
2666 object->is_group_member = FALSE;
2667 continue;
2671 object->is_group_member = FALSE;
2672 tp_object_shutdown( object );
2675 /* Move members to a new temporary list */
2676 list_init( &members );
2677 list_move_tail( &members, &this->members );
2679 RtlLeaveCriticalSection( &this->cs );
2681 /* Cancel pending callbacks if requested */
2682 if (cancel_pending)
2684 LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
2686 tp_object_cancel( object, TRUE, userdata );
2690 /* Wait for remaining callbacks to finish */
2691 LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
2693 tp_object_wait( object, TRUE );
2694 tp_object_release( object );
2698 /***********************************************************************
2699 * TpReleasePool (NTDLL.@)
2701 VOID WINAPI TpReleasePool( TP_POOL *pool )
2703 struct threadpool *this = impl_from_TP_POOL( pool );
2705 TRACE( "%p\n", pool );
2707 tp_threadpool_shutdown( this );
2708 tp_threadpool_release( this );
2711 /***********************************************************************
2712 * TpReleaseTimer (NTDLL.@)
2714 VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
2716 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2718 TRACE( "%p\n", timer );
2720 tp_object_shutdown( this );
2721 tp_object_release( this );
2724 /***********************************************************************
2725 * TpReleaseWait (NTDLL.@)
2727 VOID WINAPI TpReleaseWait( TP_WAIT *wait )
2729 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2731 TRACE( "%p\n", wait );
2733 tp_object_shutdown( this );
2734 tp_object_release( this );
2737 /***********************************************************************
2738 * TpReleaseWork (NTDLL.@)
2740 VOID WINAPI TpReleaseWork( TP_WORK *work )
2742 struct threadpool_object *this = impl_from_TP_WORK( work );
2744 TRACE( "%p\n", work );
2746 tp_object_shutdown( this );
2747 tp_object_release( this );
2750 /***********************************************************************
2751 * TpSetPoolMaxThreads (NTDLL.@)
2753 VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
2755 struct threadpool *this = impl_from_TP_POOL( pool );
2757 TRACE( "%p %u\n", pool, maximum );
2759 RtlEnterCriticalSection( &this->cs );
2760 this->max_workers = max( maximum, 1 );
2761 this->min_workers = min( this->min_workers, this->max_workers );
2762 RtlLeaveCriticalSection( &this->cs );
2765 /***********************************************************************
2766 * TpSetPoolMinThreads (NTDLL.@)
2768 BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
2770 struct threadpool *this = impl_from_TP_POOL( pool );
2771 NTSTATUS status = STATUS_SUCCESS;
2773 TRACE( "%p %u\n", pool, minimum );
2775 RtlEnterCriticalSection( &this->cs );
2777 while (this->num_workers < minimum)
2779 HANDLE thread;
2780 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
2781 threadpool_worker_proc, this, &thread, NULL );
2782 if (status != STATUS_SUCCESS)
2783 break;
2785 interlocked_inc( &this->refcount );
2786 this->num_workers++;
2787 NtClose( thread );
2790 if (status == STATUS_SUCCESS)
2792 this->min_workers = minimum;
2793 this->max_workers = max( this->min_workers, this->max_workers );
2796 RtlLeaveCriticalSection( &this->cs );
2797 return !status;
2800 /***********************************************************************
2801 * TpSetTimer (NTDLL.@)
2803 VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
2805 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2806 struct threadpool_object *other_timer;
2807 BOOL submit_timer = FALSE;
2808 ULONGLONG timestamp;
2810 TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
2812 RtlEnterCriticalSection( &timerqueue.cs );
2814 assert( this->u.timer.timer_initialized );
2815 this->u.timer.timer_set = timeout != NULL;
2817 /* Convert relative timeout to absolute timestamp and handle a timeout
2818 * of zero, which means that the timer is submitted immediately. */
2819 if (timeout)
2821 timestamp = timeout->QuadPart;
2822 if ((LONGLONG)timestamp < 0)
2824 LARGE_INTEGER now;
2825 NtQuerySystemTime( &now );
2826 timestamp = now.QuadPart - timestamp;
2828 else if (!timestamp)
2830 if (!period)
2831 timeout = NULL;
2832 else
2834 LARGE_INTEGER now;
2835 NtQuerySystemTime( &now );
2836 timestamp = now.QuadPart + (ULONGLONG)period * 10000;
2838 submit_timer = TRUE;
2842 /* First remove existing timeout. */
2843 if (this->u.timer.timer_pending)
2845 list_remove( &this->u.timer.timer_entry );
2846 this->u.timer.timer_pending = FALSE;
2849 /* If the timer was enabled, then add it back to the queue. */
2850 if (timeout)
2852 this->u.timer.timeout = timestamp;
2853 this->u.timer.period = period;
2854 this->u.timer.window_length = window_length;
2856 LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
2857 struct threadpool_object, u.timer.timer_entry )
2859 assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
2860 if (this->u.timer.timeout < other_timer->u.timer.timeout)
2861 break;
2863 list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
2865 /* Wake up the timer thread when the timeout has to be updated. */
2866 if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
2867 RtlWakeAllConditionVariable( &timerqueue.update_event );
2869 this->u.timer.timer_pending = TRUE;
2872 RtlLeaveCriticalSection( &timerqueue.cs );
2874 if (submit_timer)
2875 tp_object_submit( this, FALSE );
2878 /***********************************************************************
2879 * TpSetWait (NTDLL.@)
2881 VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
2883 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2884 ULONGLONG timestamp = TIMEOUT_INFINITE;
2885 BOOL submit_wait = FALSE;
2887 TRACE( "%p %p %p\n", wait, handle, timeout );
2889 RtlEnterCriticalSection( &waitqueue.cs );
2891 assert( this->u.wait.bucket );
2892 this->u.wait.handle = handle;
2894 if (handle || this->u.wait.wait_pending)
2896 struct waitqueue_bucket *bucket = this->u.wait.bucket;
2897 list_remove( &this->u.wait.wait_entry );
2899 /* Convert relative timeout to absolute timestamp. */
2900 if (handle && timeout)
2902 timestamp = timeout->QuadPart;
2903 if ((LONGLONG)timestamp < 0)
2905 LARGE_INTEGER now;
2906 NtQuerySystemTime( &now );
2907 timestamp = now.QuadPart - timestamp;
2909 else if (!timestamp)
2911 submit_wait = TRUE;
2912 handle = NULL;
2916 /* Add wait object back into one of the queues. */
2917 if (handle)
2919 list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
2920 this->u.wait.wait_pending = TRUE;
2921 this->u.wait.timeout = timestamp;
2923 else
2925 list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
2926 this->u.wait.wait_pending = FALSE;
2929 /* Wake up the wait queue thread. */
2930 NtSetEvent( bucket->update_event, NULL );
2933 RtlLeaveCriticalSection( &waitqueue.cs );
2935 if (submit_wait)
2936 tp_object_submit( this, FALSE );
2939 /***********************************************************************
2940 * TpSimpleTryPost (NTDLL.@)
2942 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
2943 TP_CALLBACK_ENVIRON *environment )
2945 struct threadpool_object *object;
2946 struct threadpool *pool;
2947 NTSTATUS status;
2949 TRACE( "%p %p %p\n", callback, userdata, environment );
2951 object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
2952 if (!object)
2953 return STATUS_NO_MEMORY;
2955 status = tp_threadpool_lock( &pool, environment );
2956 if (status)
2958 RtlFreeHeap( GetProcessHeap(), 0, object );
2959 return status;
2962 object->type = TP_OBJECT_TYPE_SIMPLE;
2963 object->u.simple.callback = callback;
2964 tp_object_initialize( object, pool, userdata, environment );
2966 return STATUS_SUCCESS;
2969 /***********************************************************************
2970 * TpWaitForTimer (NTDLL.@)
2972 VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
2974 struct threadpool_object *this = impl_from_TP_TIMER( timer );
2976 TRACE( "%p %d\n", timer, cancel_pending );
2978 if (cancel_pending)
2979 tp_object_cancel( this, FALSE, NULL );
2980 tp_object_wait( this, FALSE );
2983 /***********************************************************************
2984 * TpWaitForWait (NTDLL.@)
2986 VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
2988 struct threadpool_object *this = impl_from_TP_WAIT( wait );
2990 TRACE( "%p %d\n", wait, cancel_pending );
2992 if (cancel_pending)
2993 tp_object_cancel( this, FALSE, NULL );
2994 tp_object_wait( this, FALSE );
2997 /***********************************************************************
2998 * TpWaitForWork (NTDLL.@)
3000 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
3002 struct threadpool_object *this = impl_from_TP_WORK( work );
3004 TRACE( "%p %u\n", work, cancel_pending );
3006 if (cancel_pending)
3007 tp_object_cancel( this, FALSE, NULL );
3008 tp_object_wait( this, FALSE );