4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2015 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
23 #include "wine/port.h"
29 #define NONAMELESSUNION
31 #define WIN32_NO_STATUS
34 #include "wine/debug.h"
35 #include "wine/list.h"
37 #include "ntdll_misc.h"
39 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
42 * Old thread pooling API
45 #define OLD_WORKER_TIMEOUT 30000 /* 30 seconds */
46 #define EXPIRE_NEVER (~(ULONGLONG)0)
47 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
49 static RTL_CRITICAL_SECTION_DEBUG critsect_debug
;
50 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
;
54 /* threadpool_cs must be held while modifying the following four elements */
55 struct list work_item_list
;
57 LONG num_busy_workers
;
58 LONG num_items_processed
;
59 RTL_CONDITION_VARIABLE threadpool_cond
;
60 RTL_CRITICAL_SECTION threadpool_cs
;
62 RTL_CRITICAL_SECTION threadpool_compl_cs
;
66 LIST_INIT(old_threadpool
.work_item_list
), /* work_item_list */
68 0, /* num_busy_workers */
69 0, /* num_items_processed */
70 RTL_CONDITION_VARIABLE_INIT
, /* threadpool_cond */
71 { &critsect_debug
, -1, 0, 0, 0, 0 }, /* threadpool_cs */
72 NULL
, /* compl_port */
73 { &critsect_compl_debug
, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
76 static RTL_CRITICAL_SECTION_DEBUG critsect_debug
=
78 0, 0, &old_threadpool
.threadpool_cs
,
79 { &critsect_debug
.ProcessLocksList
, &critsect_debug
.ProcessLocksList
},
80 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_cs") }
83 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
85 0, 0, &old_threadpool
.threadpool_compl_cs
,
86 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
87 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
93 PRTL_WORK_ITEM_ROUTINE function
;
101 WAITORTIMERCALLBACK Callback
;
105 HANDLE CompletionEvent
;
107 BOOLEAN CallbackInProgress
;
113 struct timer_queue
*q
;
115 ULONG runcount
; /* number of callbacks pending execution */
116 RTL_WAITORTIMERCALLBACKFUNC callback
;
121 BOOL destroy
; /* timer should be deleted; once set, never unset */
122 HANDLE event
; /* removal event */
128 RTL_CRITICAL_SECTION cs
;
129 struct list timers
; /* sorted by expiration time */
130 BOOL quit
; /* queue should be deleted; once set, never unset */
136 * Object-oriented thread pooling API
139 #define THREADPOOL_WORKER_TIMEOUT 5000
140 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
142 /* internal threadpool representation */
149 /* pool of work items, locked via .cs */
151 RTL_CONDITION_VARIABLE update_event
;
152 /* information about worker threads, locked via .cs */
156 int num_busy_workers
;
159 enum threadpool_objtype
161 TP_OBJECT_TYPE_SIMPLE
,
163 TP_OBJECT_TYPE_TIMER
,
167 /* internal threadpool object representation */
168 struct threadpool_object
172 /* read-only information */
173 enum threadpool_objtype type
;
174 struct threadpool
*pool
;
175 struct threadpool_group
*group
;
177 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
;
178 PTP_SIMPLE_CALLBACK finalization_callback
;
181 /* information about the group, locked via .group->cs */
182 struct list group_entry
;
183 BOOL is_group_member
;
184 /* information about the pool, locked via .pool->cs */
185 struct list pool_entry
;
186 RTL_CONDITION_VARIABLE finished_event
;
187 RTL_CONDITION_VARIABLE group_finished_event
;
188 LONG num_pending_callbacks
;
189 LONG num_running_callbacks
;
190 LONG num_associated_callbacks
;
191 /* arguments for callback */
196 PTP_SIMPLE_CALLBACK callback
;
200 PTP_WORK_CALLBACK callback
;
204 PTP_TIMER_CALLBACK callback
;
205 /* information about the timer, locked via timerqueue.cs */
206 BOOL timer_initialized
;
208 struct list timer_entry
;
216 PTP_WAIT_CALLBACK callback
;
218 /* information about the wait object, locked via waitqueue.cs */
219 struct waitqueue_bucket
*bucket
;
221 struct list wait_entry
;
228 /* internal threadpool instance representation */
229 struct threadpool_instance
231 struct threadpool_object
*object
;
237 CRITICAL_SECTION
*critical_section
;
240 LONG semaphore_count
;
246 /* internal threadpool group representation */
247 struct threadpool_group
252 /* list of group members, locked via .cs */
256 /* global timerqueue object */
257 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
;
264 struct list pending_timers
;
265 RTL_CONDITION_VARIABLE update_event
;
269 { &timerqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
271 FALSE
, /* thread_running */
272 LIST_INIT( timerqueue
.pending_timers
), /* pending_timers */
273 RTL_CONDITION_VARIABLE_INIT
/* update_event */
276 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
=
278 0, 0, &timerqueue
.cs
,
279 { &timerqueue_debug
.ProcessLocksList
, &timerqueue_debug
.ProcessLocksList
},
280 0, 0, { (DWORD_PTR
)(__FILE__
": timerqueue.cs") }
283 /* global waitqueue object */
284 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
;
294 { &waitqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
296 LIST_INIT( waitqueue
.buckets
) /* buckets */
299 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
=
302 { &waitqueue_debug
.ProcessLocksList
, &waitqueue_debug
.ProcessLocksList
},
303 0, 0, { (DWORD_PTR
)(__FILE__
": waitqueue.cs") }
306 struct waitqueue_bucket
308 struct list bucket_entry
;
310 struct list reserved
;
315 static inline struct threadpool
*impl_from_TP_POOL( TP_POOL
*pool
)
317 return (struct threadpool
*)pool
;
320 static inline struct threadpool_object
*impl_from_TP_WORK( TP_WORK
*work
)
322 struct threadpool_object
*object
= (struct threadpool_object
*)work
;
323 assert( object
->type
== TP_OBJECT_TYPE_WORK
);
327 static inline struct threadpool_object
*impl_from_TP_TIMER( TP_TIMER
*timer
)
329 struct threadpool_object
*object
= (struct threadpool_object
*)timer
;
330 assert( object
->type
== TP_OBJECT_TYPE_TIMER
);
334 static inline struct threadpool_object
*impl_from_TP_WAIT( TP_WAIT
*wait
)
336 struct threadpool_object
*object
= (struct threadpool_object
*)wait
;
337 assert( object
->type
== TP_OBJECT_TYPE_WAIT
);
341 static inline struct threadpool_group
*impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group
)
343 return (struct threadpool_group
*)group
;
346 static inline struct threadpool_instance
*impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE
*instance
)
348 return (struct threadpool_instance
*)instance
;
351 static void CALLBACK
threadpool_worker_proc( void *param
);
352 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
);
353 static void tp_object_shutdown( struct threadpool_object
*object
);
354 static BOOL
tp_object_release( struct threadpool_object
*object
);
355 static struct threadpool
*default_threadpool
= NULL
;
357 static inline LONG
interlocked_inc( PLONG dest
)
359 return interlocked_xchg_add( dest
, 1 ) + 1;
362 static inline LONG
interlocked_dec( PLONG dest
)
364 return interlocked_xchg_add( dest
, -1 ) - 1;
367 static void WINAPI
worker_thread_proc(void * param
)
370 struct work_item
*work_item_ptr
, work_item
;
371 LARGE_INTEGER timeout
;
372 timeout
.QuadPart
= -(OLD_WORKER_TIMEOUT
* (ULONGLONG
)10000);
374 RtlEnterCriticalSection( &old_threadpool
.threadpool_cs
);
375 old_threadpool
.num_workers
++;
379 if ((item
= list_head( &old_threadpool
.work_item_list
)))
381 work_item_ptr
= LIST_ENTRY( item
, struct work_item
, entry
);
382 list_remove( &work_item_ptr
->entry
);
383 old_threadpool
.num_busy_workers
++;
384 old_threadpool
.num_items_processed
++;
385 RtlLeaveCriticalSection( &old_threadpool
.threadpool_cs
);
387 /* copy item to stack and do the work */
388 work_item
= *work_item_ptr
;
389 RtlFreeHeap( GetProcessHeap(), 0, work_item_ptr
);
390 TRACE("executing %p(%p)\n", work_item
.function
, work_item
.context
);
391 work_item
.function( work_item
.context
);
393 RtlEnterCriticalSection( &old_threadpool
.threadpool_cs
);
394 old_threadpool
.num_busy_workers
--;
396 else if (RtlSleepConditionVariableCS( &old_threadpool
.threadpool_cond
,
397 &old_threadpool
.threadpool_cs
, &timeout
) != STATUS_SUCCESS
)
403 old_threadpool
.num_workers
--;
404 RtlLeaveCriticalSection( &old_threadpool
.threadpool_cs
);
405 RtlExitUserThread( 0 );
410 /***********************************************************************
411 * RtlQueueWorkItem (NTDLL.@)
413 * Queues a work item into a thread in the thread pool.
416 * Function [I] Work function to execute.
417 * Context [I] Context to pass to the work function when it is executed.
418 * Flags [I] Flags. See notes.
421 * Success: STATUS_SUCCESS.
422 * Failure: Any NTSTATUS code.
425 * Flags can be one or more of the following:
426 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
427 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
428 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
429 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
430 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
432 NTSTATUS WINAPI
RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function
, PVOID Context
, ULONG Flags
)
436 LONG items_processed
;
437 struct work_item
*work_item
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item
));
440 return STATUS_NO_MEMORY
;
442 work_item
->function
= Function
;
443 work_item
->context
= Context
;
445 if (Flags
& ~WT_EXECUTELONGFUNCTION
)
446 FIXME("Flags 0x%x not supported\n", Flags
);
448 RtlEnterCriticalSection( &old_threadpool
.threadpool_cs
);
449 list_add_tail( &old_threadpool
.work_item_list
, &work_item
->entry
);
450 status
= (old_threadpool
.num_workers
> old_threadpool
.num_busy_workers
) ?
451 STATUS_SUCCESS
: STATUS_UNSUCCESSFUL
;
452 items_processed
= old_threadpool
.num_items_processed
;
453 RtlLeaveCriticalSection( &old_threadpool
.threadpool_cs
);
455 /* FIXME: tune this algorithm to not be as aggressive with creating threads
456 * if WT_EXECUTELONGFUNCTION isn't specified */
457 if (status
== STATUS_SUCCESS
)
458 RtlWakeConditionVariable( &old_threadpool
.threadpool_cond
);
461 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
462 worker_thread_proc
, NULL
, &thread
, NULL
);
464 /* NOTE: we don't care if we couldn't create the thread if there is at
465 * least one other available to process the request */
466 if (status
== STATUS_SUCCESS
)
470 RtlEnterCriticalSection( &old_threadpool
.threadpool_cs
);
471 if (old_threadpool
.num_workers
> 0 ||
472 old_threadpool
.num_items_processed
!= items_processed
)
474 status
= STATUS_SUCCESS
;
477 list_remove( &work_item
->entry
);
478 RtlLeaveCriticalSection( &old_threadpool
.threadpool_cs
);
480 if (status
!= STATUS_SUCCESS
)
481 RtlFreeHeap( GetProcessHeap(), 0, work_item
);
488 /***********************************************************************
489 * iocp_poller - get completion events and run callbacks
491 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
497 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
499 IO_STATUS_BLOCK iosb
;
500 NTSTATUS res
= NtRemoveIoCompletion( cport
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
503 ERR("NtRemoveIoCompletion failed: 0x%x\n", res
);
507 DWORD transferred
= 0;
510 if (iosb
.u
.Status
== STATUS_SUCCESS
)
511 transferred
= iosb
.Information
;
513 err
= RtlNtStatusToDosError(iosb
.u
.Status
);
515 callback( err
, transferred
, overlapped
);
521 /***********************************************************************
522 * RtlSetIoCompletionCallback (NTDLL.@)
524 * Binds a handle to a thread pool's completion port, and possibly
525 * starts a non-I/O thread to monitor this port and call functions back.
528 * FileHandle [I] Handle to bind to a completion port.
529 * Function [I] Callback function to call on I/O completions.
530 * Flags [I] Not used.
533 * Success: STATUS_SUCCESS.
534 * Failure: Any NTSTATUS code.
537 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
539 IO_STATUS_BLOCK iosb
;
540 FILE_COMPLETION_INFORMATION info
;
542 if (Flags
) FIXME("Unknown value Flags=0x%x\n", Flags
);
544 if (!old_threadpool
.compl_port
)
546 NTSTATUS res
= STATUS_SUCCESS
;
548 RtlEnterCriticalSection(&old_threadpool
.threadpool_compl_cs
);
549 if (!old_threadpool
.compl_port
)
553 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
556 /* FIXME native can start additional threads in case of e.g. hung callback function. */
557 res
= RtlQueueWorkItem( iocp_poller
, cport
, WT_EXECUTEDEFAULT
);
559 old_threadpool
.compl_port
= cport
;
564 RtlLeaveCriticalSection(&old_threadpool
.threadpool_compl_cs
);
568 info
.CompletionPort
= old_threadpool
.compl_port
;
569 info
.CompletionKey
= (ULONG_PTR
)Function
;
571 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
574 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
576 if (timeout
== INFINITE
) return NULL
;
577 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
581 static void delete_wait_work_item(struct wait_work_item
*wait_work_item
)
583 NtClose( wait_work_item
->CancelEvent
);
584 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
587 static DWORD CALLBACK
wait_thread_proc(LPVOID Arg
)
589 struct wait_work_item
*wait_work_item
= Arg
;
591 BOOLEAN alertable
= (wait_work_item
->Flags
& WT_EXECUTEINIOTHREAD
) != 0;
592 HANDLE handles
[2] = { wait_work_item
->Object
, wait_work_item
->CancelEvent
};
593 LARGE_INTEGER timeout
;
594 HANDLE completion_event
;
600 status
= NtWaitForMultipleObjects( 2, handles
, TRUE
, alertable
,
601 get_nt_timeout( &timeout
, wait_work_item
->Milliseconds
) );
602 if (status
== STATUS_WAIT_0
|| status
== STATUS_TIMEOUT
)
604 BOOLEAN TimerOrWaitFired
;
606 if (status
== STATUS_WAIT_0
)
608 TRACE( "object %p signaled, calling callback %p with context %p\n",
609 wait_work_item
->Object
, wait_work_item
->Callback
,
610 wait_work_item
->Context
);
611 TimerOrWaitFired
= FALSE
;
615 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
616 wait_work_item
->Object
, wait_work_item
->Callback
,
617 wait_work_item
->Context
);
618 TimerOrWaitFired
= TRUE
;
620 wait_work_item
->CallbackInProgress
= TRUE
;
621 wait_work_item
->Callback( wait_work_item
->Context
, TimerOrWaitFired
);
622 wait_work_item
->CallbackInProgress
= FALSE
;
624 if (wait_work_item
->Flags
& WT_EXECUTEONLYONCE
)
631 completion_event
= wait_work_item
->CompletionEvent
;
632 if (completion_event
) NtSetEvent( completion_event
, NULL
);
634 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
635 delete_wait_work_item( wait_work_item
);
640 /***********************************************************************
641 * RtlRegisterWait (NTDLL.@)
643 * Registers a wait for a handle to become signaled.
646 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
647 * Object [I] Object to wait to become signaled.
648 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
649 * Context [I] Context to pass to the callback function when it is executed.
650 * Milliseconds [I] Number of milliseconds to wait before timing out.
651 * Flags [I] Flags. See notes.
654 * Success: STATUS_SUCCESS.
655 * Failure: Any NTSTATUS code.
658 * Flags can be one or more of the following:
659 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
660 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
661 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
662 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
663 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
665 NTSTATUS WINAPI
RtlRegisterWait(PHANDLE NewWaitObject
, HANDLE Object
,
666 RTL_WAITORTIMERCALLBACKFUNC Callback
,
667 PVOID Context
, ULONG Milliseconds
, ULONG Flags
)
669 struct wait_work_item
*wait_work_item
;
672 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject
, Object
, Callback
, Context
, Milliseconds
, Flags
);
674 wait_work_item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item
) );
676 return STATUS_NO_MEMORY
;
678 wait_work_item
->Object
= Object
;
679 wait_work_item
->Callback
= Callback
;
680 wait_work_item
->Context
= Context
;
681 wait_work_item
->Milliseconds
= Milliseconds
;
682 wait_work_item
->Flags
= Flags
;
683 wait_work_item
->CallbackInProgress
= FALSE
;
684 wait_work_item
->DeleteCount
= 0;
685 wait_work_item
->CompletionEvent
= NULL
;
687 status
= NtCreateEvent( &wait_work_item
->CancelEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
688 if (status
!= STATUS_SUCCESS
)
690 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
694 Flags
= Flags
& (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
|
695 WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
);
696 status
= RtlQueueWorkItem( wait_thread_proc
, wait_work_item
, Flags
);
697 if (status
!= STATUS_SUCCESS
)
699 delete_wait_work_item( wait_work_item
);
703 *NewWaitObject
= wait_work_item
;
707 /***********************************************************************
708 * RtlDeregisterWaitEx (NTDLL.@)
710 * Cancels a wait operation and frees the resources associated with calling
714 * WaitObject [I] Handle to the wait object to free.
717 * Success: STATUS_SUCCESS.
718 * Failure: Any NTSTATUS code.
720 NTSTATUS WINAPI
RtlDeregisterWaitEx(HANDLE WaitHandle
, HANDLE CompletionEvent
)
722 struct wait_work_item
*wait_work_item
= WaitHandle
;
723 NTSTATUS status
= STATUS_SUCCESS
;
725 TRACE( "(%p)\n", WaitHandle
);
727 NtSetEvent( wait_work_item
->CancelEvent
, NULL
);
728 if (wait_work_item
->CallbackInProgress
)
730 if (CompletionEvent
!= NULL
)
732 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
734 status
= NtCreateEvent( &CompletionEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
735 if (status
!= STATUS_SUCCESS
)
737 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, CompletionEvent
);
738 if (wait_work_item
->CallbackInProgress
)
739 NtWaitForSingleObject( CompletionEvent
, FALSE
, NULL
);
740 NtClose( CompletionEvent
);
744 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, CompletionEvent
);
745 if (wait_work_item
->CallbackInProgress
)
746 status
= STATUS_PENDING
;
750 status
= STATUS_PENDING
;
753 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
755 status
= STATUS_SUCCESS
;
756 delete_wait_work_item( wait_work_item
);
762 /***********************************************************************
763 * RtlDeregisterWait (NTDLL.@)
765 * Cancels a wait operation and frees the resources associated with calling
769 * WaitObject [I] Handle to the wait object to free.
772 * Success: STATUS_SUCCESS.
773 * Failure: Any NTSTATUS code.
775 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
777 return RtlDeregisterWaitEx(WaitHandle
, NULL
);
781 /************************** Timer Queue Impl **************************/
783 static void queue_remove_timer(struct queue_timer
*t
)
785 /* We MUST hold the queue cs while calling this function. This ensures
786 that we cannot queue another callback for this timer. The runcount
787 being zero makes sure we don't have any already queued. */
788 struct timer_queue
*q
= t
->q
;
790 assert(t
->runcount
== 0);
793 list_remove(&t
->entry
);
795 NtSetEvent(t
->event
, NULL
);
796 RtlFreeHeap(GetProcessHeap(), 0, t
);
798 if (q
->quit
&& list_empty(&q
->timers
))
799 NtSetEvent(q
->event
, NULL
);
802 static void timer_cleanup_callback(struct queue_timer
*t
)
804 struct timer_queue
*q
= t
->q
;
805 RtlEnterCriticalSection(&q
->cs
);
807 assert(0 < t
->runcount
);
810 if (t
->destroy
&& t
->runcount
== 0)
811 queue_remove_timer(t
);
813 RtlLeaveCriticalSection(&q
->cs
);
816 static DWORD WINAPI
timer_callback_wrapper(LPVOID p
)
818 struct queue_timer
*t
= p
;
819 t
->callback(t
->param
, TRUE
);
820 timer_cleanup_callback(t
);
824 static inline ULONGLONG
queue_current_time(void)
826 LARGE_INTEGER now
, freq
;
827 NtQueryPerformanceCounter(&now
, &freq
);
828 return now
.QuadPart
* 1000 / freq
.QuadPart
;
831 static void queue_add_timer(struct queue_timer
*t
, ULONGLONG time
,
834 /* We MUST hold the queue cs while calling this function. */
835 struct timer_queue
*q
= t
->q
;
836 struct list
*ptr
= &q
->timers
;
838 assert(!q
->quit
|| (t
->destroy
&& time
== EXPIRE_NEVER
));
840 if (time
!= EXPIRE_NEVER
)
841 LIST_FOR_EACH(ptr
, &q
->timers
)
843 struct queue_timer
*cur
= LIST_ENTRY(ptr
, struct queue_timer
, entry
);
844 if (time
< cur
->expire
)
847 list_add_before(ptr
, &t
->entry
);
851 /* If we insert at the head of the list, we need to expire sooner
853 if (set_event
&& &t
->entry
== list_head(&q
->timers
))
854 NtSetEvent(q
->event
, NULL
);
857 static inline void queue_move_timer(struct queue_timer
*t
, ULONGLONG time
,
860 /* We MUST hold the queue cs while calling this function. */
861 list_remove(&t
->entry
);
862 queue_add_timer(t
, time
, set_event
);
865 static void queue_timer_expire(struct timer_queue
*q
)
867 struct queue_timer
*t
= NULL
;
869 RtlEnterCriticalSection(&q
->cs
);
870 if (list_head(&q
->timers
))
873 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
874 if (!t
->destroy
&& t
->expire
<= ((now
= queue_current_time())))
879 next
= t
->expire
+ t
->period
;
880 /* avoid trigger cascade if overloaded / hibernated */
882 next
= now
+ t
->period
;
886 queue_move_timer(t
, next
, FALSE
);
891 RtlLeaveCriticalSection(&q
->cs
);
895 if (t
->flags
& WT_EXECUTEINTIMERTHREAD
)
896 timer_callback_wrapper(t
);
901 & (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
902 | WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
));
903 NTSTATUS status
= RtlQueueWorkItem(timer_callback_wrapper
, t
, flags
);
904 if (status
!= STATUS_SUCCESS
)
905 timer_cleanup_callback(t
);
910 static ULONG
queue_get_timeout(struct timer_queue
*q
)
912 struct queue_timer
*t
;
913 ULONG timeout
= INFINITE
;
915 RtlEnterCriticalSection(&q
->cs
);
916 if (list_head(&q
->timers
))
918 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
919 assert(!t
->destroy
|| t
->expire
== EXPIRE_NEVER
);
921 if (t
->expire
!= EXPIRE_NEVER
)
923 ULONGLONG time
= queue_current_time();
924 timeout
= t
->expire
< time
? 0 : t
->expire
- time
;
927 RtlLeaveCriticalSection(&q
->cs
);
932 static void WINAPI
timer_queue_thread_proc(LPVOID p
)
934 struct timer_queue
*q
= p
;
937 timeout_ms
= INFINITE
;
940 LARGE_INTEGER timeout
;
944 status
= NtWaitForSingleObject(
945 q
->event
, FALSE
, get_nt_timeout(&timeout
, timeout_ms
));
947 if (status
== STATUS_WAIT_0
)
949 /* There are two possible ways to trigger the event. Either
950 we are quitting and the last timer got removed, or a new
951 timer got put at the head of the list so we need to adjust
953 RtlEnterCriticalSection(&q
->cs
);
954 if (q
->quit
&& list_empty(&q
->timers
))
956 RtlLeaveCriticalSection(&q
->cs
);
958 else if (status
== STATUS_TIMEOUT
)
959 queue_timer_expire(q
);
964 timeout_ms
= queue_get_timeout(q
);
968 RtlDeleteCriticalSection(&q
->cs
);
970 RtlFreeHeap(GetProcessHeap(), 0, q
);
973 static void queue_destroy_timer(struct queue_timer
*t
)
975 /* We MUST hold the queue cs while calling this function. */
977 if (t
->runcount
== 0)
978 /* Ensure a timer is promptly removed. If callbacks are pending,
979 it will be removed after the last one finishes by the callback
981 queue_remove_timer(t
);
983 /* Make sure no destroyed timer masks an active timer at the head
984 of the sorted list. */
985 queue_move_timer(t
, EXPIRE_NEVER
, FALSE
);
988 /***********************************************************************
989 * RtlCreateTimerQueue (NTDLL.@)
991 * Creates a timer queue object and returns a handle to it.
994 * NewTimerQueue [O] The newly created queue.
997 * Success: STATUS_SUCCESS.
998 * Failure: Any NTSTATUS code.
1000 NTSTATUS WINAPI
RtlCreateTimerQueue(PHANDLE NewTimerQueue
)
1003 struct timer_queue
*q
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q
);
1005 return STATUS_NO_MEMORY
;
1007 RtlInitializeCriticalSection(&q
->cs
);
1008 list_init(&q
->timers
);
1010 q
->magic
= TIMER_QUEUE_MAGIC
;
1011 status
= NtCreateEvent(&q
->event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1012 if (status
!= STATUS_SUCCESS
)
1014 RtlFreeHeap(GetProcessHeap(), 0, q
);
1017 status
= RtlCreateUserThread(GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1018 timer_queue_thread_proc
, q
, &q
->thread
, NULL
);
1019 if (status
!= STATUS_SUCCESS
)
1022 RtlFreeHeap(GetProcessHeap(), 0, q
);
1027 return STATUS_SUCCESS
;
1030 /***********************************************************************
1031 * RtlDeleteTimerQueueEx (NTDLL.@)
1033 * Deletes a timer queue object.
1036 * TimerQueue [I] The timer queue to destroy.
1037 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1038 * wait until all timers are finished firing before
1039 * returning. Otherwise, return immediately and set the
1040 * event when all timers are done.
1043 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
1044 * Failure: Any NTSTATUS code.
1046 NTSTATUS WINAPI
RtlDeleteTimerQueueEx(HANDLE TimerQueue
, HANDLE CompletionEvent
)
1048 struct timer_queue
*q
= TimerQueue
;
1049 struct queue_timer
*t
, *temp
;
1053 if (!q
|| q
->magic
!= TIMER_QUEUE_MAGIC
)
1054 return STATUS_INVALID_HANDLE
;
1058 RtlEnterCriticalSection(&q
->cs
);
1060 if (list_head(&q
->timers
))
1061 /* When the last timer is removed, it will signal the timer thread to
1063 LIST_FOR_EACH_ENTRY_SAFE(t
, temp
, &q
->timers
, struct queue_timer
, entry
)
1064 queue_destroy_timer(t
);
1066 /* However if we have none, we must do it ourselves. */
1067 NtSetEvent(q
->event
, NULL
);
1068 RtlLeaveCriticalSection(&q
->cs
);
1070 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1072 NtWaitForSingleObject(thread
, FALSE
, NULL
);
1073 status
= STATUS_SUCCESS
;
1077 if (CompletionEvent
)
1079 FIXME("asynchronous return on completion event unimplemented\n");
1080 NtWaitForSingleObject(thread
, FALSE
, NULL
);
1081 NtSetEvent(CompletionEvent
, NULL
);
1083 status
= STATUS_PENDING
;
1090 static struct timer_queue
*get_timer_queue(HANDLE TimerQueue
)
1092 static struct timer_queue
*default_timer_queue
;
1098 if (!default_timer_queue
)
1101 NTSTATUS status
= RtlCreateTimerQueue(&q
);
1102 if (status
== STATUS_SUCCESS
)
1104 PVOID p
= interlocked_cmpxchg_ptr(
1105 (void **) &default_timer_queue
, q
, NULL
);
1107 /* Got beat to the punch. */
1108 RtlDeleteTimerQueueEx(q
, NULL
);
1111 return default_timer_queue
;
1115 /***********************************************************************
1116 * RtlCreateTimer (NTDLL.@)
1118 * Creates a new timer associated with the given queue.
1121 * NewTimer [O] The newly created timer.
1122 * TimerQueue [I] The queue to hold the timer.
1123 * Callback [I] The callback to fire.
1124 * Parameter [I] The argument for the callback.
1125 * DueTime [I] The delay, in milliseconds, before first firing the
1127 * Period [I] The period, in milliseconds, at which to fire the timer
1128 * after the first callback. If zero, the timer will only
1129 * fire once. It still needs to be deleted with
1131 * Flags [I] Flags controlling the execution of the callback. In
1132 * addition to the WT_* thread pool flags (see
1133 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1134 * WT_EXECUTEONLYONCE are supported.
1137 * Success: STATUS_SUCCESS.
1138 * Failure: Any NTSTATUS code.
1140 NTSTATUS WINAPI
RtlCreateTimer(PHANDLE NewTimer
, HANDLE TimerQueue
,
1141 RTL_WAITORTIMERCALLBACKFUNC Callback
,
1142 PVOID Parameter
, DWORD DueTime
, DWORD Period
,
1146 struct queue_timer
*t
;
1147 struct timer_queue
*q
= get_timer_queue(TimerQueue
);
1149 if (!q
) return STATUS_NO_MEMORY
;
1150 if (q
->magic
!= TIMER_QUEUE_MAGIC
) return STATUS_INVALID_HANDLE
;
1152 t
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t
);
1154 return STATUS_NO_MEMORY
;
1158 t
->callback
= Callback
;
1159 t
->param
= Parameter
;
1165 status
= STATUS_SUCCESS
;
1166 RtlEnterCriticalSection(&q
->cs
);
1168 status
= STATUS_INVALID_HANDLE
;
1170 queue_add_timer(t
, queue_current_time() + DueTime
, TRUE
);
1171 RtlLeaveCriticalSection(&q
->cs
);
1173 if (status
== STATUS_SUCCESS
)
1176 RtlFreeHeap(GetProcessHeap(), 0, t
);
1181 /***********************************************************************
1182 * RtlUpdateTimer (NTDLL.@)
1184 * Changes the time at which a timer expires.
1187 * TimerQueue [I] The queue that holds the timer.
1188 * Timer [I] The timer to update.
1189 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1190 * Period [I] The period, in milliseconds, at which to fire the timer
1191 * after the first callback. If zero, the timer will not
1192 * refire once. It still needs to be deleted with
1196 * Success: STATUS_SUCCESS.
1197 * Failure: Any NTSTATUS code.
1199 NTSTATUS WINAPI
RtlUpdateTimer(HANDLE TimerQueue
, HANDLE Timer
,
1200 DWORD DueTime
, DWORD Period
)
1202 struct queue_timer
*t
= Timer
;
1203 struct timer_queue
*q
= t
->q
;
1205 RtlEnterCriticalSection(&q
->cs
);
1206 /* Can't change a timer if it was once-only or destroyed. */
1207 if (t
->expire
!= EXPIRE_NEVER
)
1210 queue_move_timer(t
, queue_current_time() + DueTime
, TRUE
);
1212 RtlLeaveCriticalSection(&q
->cs
);
1214 return STATUS_SUCCESS
;
1217 /***********************************************************************
1218 * RtlDeleteTimer (NTDLL.@)
1220 * Cancels a timer-queue timer.
1223 * TimerQueue [I] The queue that holds the timer.
1224 * Timer [I] The timer to update.
1225 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1226 * wait until the timer is finished firing all pending
1227 * callbacks before returning. Otherwise, return
1228 * immediately and set the timer is done.
1231 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1232 or if the completion event is NULL.
1233 * Failure: Any NTSTATUS code.
1235 NTSTATUS WINAPI
RtlDeleteTimer(HANDLE TimerQueue
, HANDLE Timer
,
1236 HANDLE CompletionEvent
)
1238 struct queue_timer
*t
= Timer
;
1239 struct timer_queue
*q
;
1240 NTSTATUS status
= STATUS_PENDING
;
1241 HANDLE event
= NULL
;
1244 return STATUS_INVALID_PARAMETER_1
;
1246 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1248 status
= NtCreateEvent(&event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1249 if (status
== STATUS_SUCCESS
)
1250 status
= STATUS_PENDING
;
1252 else if (CompletionEvent
)
1253 event
= CompletionEvent
;
1255 RtlEnterCriticalSection(&q
->cs
);
1257 if (t
->runcount
== 0 && event
)
1258 status
= STATUS_SUCCESS
;
1259 queue_destroy_timer(t
);
1260 RtlLeaveCriticalSection(&q
->cs
);
1262 if (CompletionEvent
== INVALID_HANDLE_VALUE
&& event
)
1264 if (status
== STATUS_PENDING
)
1266 NtWaitForSingleObject(event
, FALSE
, NULL
);
1267 status
= STATUS_SUCCESS
;
1275 /***********************************************************************
1276 * timerqueue_thread_proc (internal)
1278 static void CALLBACK
timerqueue_thread_proc( void *param
)
1280 ULONGLONG timeout_lower
, timeout_upper
, new_timeout
;
1281 struct threadpool_object
*other_timer
;
1282 LARGE_INTEGER now
, timeout
;
1285 TRACE( "starting timer queue thread\n" );
1287 RtlEnterCriticalSection( &timerqueue
.cs
);
1290 NtQuerySystemTime( &now
);
1292 /* Check for expired timers. */
1293 while ((ptr
= list_head( &timerqueue
.pending_timers
)))
1295 struct threadpool_object
*timer
= LIST_ENTRY( ptr
, struct threadpool_object
, u
.timer
.timer_entry
);
1296 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1297 assert( timer
->u
.timer
.timer_pending
);
1298 if (timer
->u
.timer
.timeout
> now
.QuadPart
)
1301 /* Queue a new callback in one of the worker threads. */
1302 list_remove( &timer
->u
.timer
.timer_entry
);
1303 timer
->u
.timer
.timer_pending
= FALSE
;
1304 tp_object_submit( timer
, FALSE
);
1306 /* Insert the timer back into the queue, except its marked for shutdown. */
1307 if (timer
->u
.timer
.period
&& !timer
->shutdown
)
1309 timer
->u
.timer
.timeout
+= (ULONGLONG
)timer
->u
.timer
.period
* 10000;
1310 if (timer
->u
.timer
.timeout
<= now
.QuadPart
)
1311 timer
->u
.timer
.timeout
= now
.QuadPart
+ 1;
1313 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1314 struct threadpool_object
, u
.timer
.timer_entry
)
1316 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1317 if (timer
->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
1320 list_add_before( &other_timer
->u
.timer
.timer_entry
, &timer
->u
.timer
.timer_entry
);
1321 timer
->u
.timer
.timer_pending
= TRUE
;
1325 timeout_lower
= TIMEOUT_INFINITE
;
1326 timeout_upper
= TIMEOUT_INFINITE
;
1328 /* Determine next timeout and use the window length to optimize wakeup times. */
1329 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1330 struct threadpool_object
, u
.timer
.timer_entry
)
1332 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1333 if (other_timer
->u
.timer
.timeout
>= timeout_upper
)
1336 timeout_lower
= other_timer
->u
.timer
.timeout
;
1337 new_timeout
= timeout_lower
+ (ULONGLONG
)other_timer
->u
.timer
.window_length
* 10000;
1338 if (new_timeout
< timeout_upper
)
1339 timeout_upper
= new_timeout
;
1342 /* Wait for timer update events or until the next timer expires. */
1343 if (timerqueue
.objcount
)
1345 timeout
.QuadPart
= timeout_lower
;
1346 RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
, &timeout
);
1350 /* All timers have been destroyed, if no new timers are created
1351 * within some amount of time, then we can shutdown this thread. */
1352 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1353 if (RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
,
1354 &timeout
) == STATUS_TIMEOUT
&& !timerqueue
.objcount
)
1360 timerqueue
.thread_running
= FALSE
;
1361 RtlLeaveCriticalSection( &timerqueue
.cs
);
1363 TRACE( "terminating timer queue thread\n" );
1366 /***********************************************************************
1367 * tp_timerqueue_lock (internal)
1369 * Acquires a lock on the global timerqueue. When the lock is acquired
1370 * successfully, it is guaranteed that the timer thread is running.
1372 static NTSTATUS
tp_timerqueue_lock( struct threadpool_object
*timer
)
1374 NTSTATUS status
= STATUS_SUCCESS
;
1375 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1377 timer
->u
.timer
.timer_initialized
= FALSE
;
1378 timer
->u
.timer
.timer_pending
= FALSE
;
1379 timer
->u
.timer
.timer_set
= FALSE
;
1380 timer
->u
.timer
.timeout
= 0;
1381 timer
->u
.timer
.period
= 0;
1382 timer
->u
.timer
.window_length
= 0;
1384 RtlEnterCriticalSection( &timerqueue
.cs
);
1386 /* Make sure that the timerqueue thread is running. */
1387 if (!timerqueue
.thread_running
)
1390 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1391 timerqueue_thread_proc
, NULL
, &thread
, NULL
);
1392 if (status
== STATUS_SUCCESS
)
1394 timerqueue
.thread_running
= TRUE
;
1399 if (status
== STATUS_SUCCESS
)
1401 timer
->u
.timer
.timer_initialized
= TRUE
;
1402 timerqueue
.objcount
++;
1405 RtlLeaveCriticalSection( &timerqueue
.cs
);
1409 /***********************************************************************
1410 * tp_timerqueue_unlock (internal)
1412 * Releases a lock on the global timerqueue.
1414 static void tp_timerqueue_unlock( struct threadpool_object
*timer
)
1416 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1418 RtlEnterCriticalSection( &timerqueue
.cs
);
1419 if (timer
->u
.timer
.timer_initialized
)
1421 /* If timer was pending, remove it. */
1422 if (timer
->u
.timer
.timer_pending
)
1424 list_remove( &timer
->u
.timer
.timer_entry
);
1425 timer
->u
.timer
.timer_pending
= FALSE
;
1428 /* If the last timer object was destroyed, then wake up the thread. */
1429 if (!--timerqueue
.objcount
)
1431 assert( list_empty( &timerqueue
.pending_timers
) );
1432 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
1435 timer
->u
.timer
.timer_initialized
= FALSE
;
1437 RtlLeaveCriticalSection( &timerqueue
.cs
);
1440 /***********************************************************************
1441 * waitqueue_thread_proc (internal)
1443 static void CALLBACK
waitqueue_thread_proc( void *param
)
1445 struct threadpool_object
*objects
[MAXIMUM_WAITQUEUE_OBJECTS
];
1446 HANDLE handles
[MAXIMUM_WAITQUEUE_OBJECTS
+ 1];
1447 struct waitqueue_bucket
*bucket
= param
;
1448 struct threadpool_object
*wait
, *next
;
1449 LARGE_INTEGER now
, timeout
;
1453 TRACE( "starting wait queue thread\n" );
1455 RtlEnterCriticalSection( &waitqueue
.cs
);
1459 NtQuerySystemTime( &now
);
1460 timeout
.QuadPart
= TIMEOUT_INFINITE
;
1463 LIST_FOR_EACH_ENTRY_SAFE( wait
, next
, &bucket
->waiting
, struct threadpool_object
,
1466 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1467 if (wait
->u
.wait
.timeout
<= now
.QuadPart
)
1469 /* Wait object timed out. */
1470 list_remove( &wait
->u
.wait
.wait_entry
);
1471 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1472 tp_object_submit( wait
, FALSE
);
1476 if (wait
->u
.wait
.timeout
< timeout
.QuadPart
)
1477 timeout
.QuadPart
= wait
->u
.wait
.timeout
;
1479 assert( num_handles
< MAXIMUM_WAITQUEUE_OBJECTS
);
1480 interlocked_inc( &wait
->refcount
);
1481 objects
[num_handles
] = wait
;
1482 handles
[num_handles
] = wait
->u
.wait
.handle
;
1487 if (!bucket
->objcount
)
1489 /* All wait objects have been destroyed, if no new wait objects are created
1490 * within some amount of time, then we can shutdown this thread. */
1491 assert( num_handles
== 0 );
1492 RtlLeaveCriticalSection( &waitqueue
.cs
);
1493 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1494 status
= NtWaitForMultipleObjects( 1, &bucket
->update_event
, TRUE
, FALSE
, &timeout
);
1495 RtlEnterCriticalSection( &waitqueue
.cs
);
1497 if (status
== STATUS_TIMEOUT
&& !bucket
->objcount
)
1502 handles
[num_handles
] = bucket
->update_event
;
1503 RtlLeaveCriticalSection( &waitqueue
.cs
);
1504 status
= NtWaitForMultipleObjects( num_handles
+ 1, handles
, TRUE
, FALSE
, &timeout
);
1505 RtlEnterCriticalSection( &waitqueue
.cs
);
1507 if (status
>= STATUS_WAIT_0
&& status
< STATUS_WAIT_0
+ num_handles
)
1509 wait
= objects
[status
- STATUS_WAIT_0
];
1510 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1511 if (wait
->u
.wait
.bucket
)
1513 /* Wait object signaled. */
1514 assert( wait
->u
.wait
.bucket
== bucket
);
1515 list_remove( &wait
->u
.wait
.wait_entry
);
1516 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1517 tp_object_submit( wait
, TRUE
);
1520 ERR("wait object %p triggered while object was destroyed\n", wait
);
1523 /* Release temporary references to wait objects. */
1526 wait
= objects
[--num_handles
];
1527 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1528 tp_object_release( wait
);
1532 /* Try to merge bucket with other threads. */
1533 if (waitqueue
.num_buckets
> 1 && bucket
->objcount
&&
1534 bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 1 / 3)
1536 struct waitqueue_bucket
*other_bucket
;
1537 LIST_FOR_EACH_ENTRY( other_bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1539 if (other_bucket
!= bucket
&& other_bucket
->objcount
&&
1540 other_bucket
->objcount
+ bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 2 / 3)
1542 other_bucket
->objcount
+= bucket
->objcount
;
1543 bucket
->objcount
= 0;
1545 /* Update reserved list. */
1546 LIST_FOR_EACH_ENTRY( wait
, &bucket
->reserved
, struct threadpool_object
, u
.wait
.wait_entry
)
1548 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1549 wait
->u
.wait
.bucket
= other_bucket
;
1551 list_move_tail( &other_bucket
->reserved
, &bucket
->reserved
);
1553 /* Update waiting list. */
1554 LIST_FOR_EACH_ENTRY( wait
, &bucket
->waiting
, struct threadpool_object
, u
.wait
.wait_entry
)
1556 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1557 wait
->u
.wait
.bucket
= other_bucket
;
1559 list_move_tail( &other_bucket
->waiting
, &bucket
->waiting
);
1561 /* Move bucket to the end, to keep the probability of
1562 * newly added wait objects as small as possible. */
1563 list_remove( &bucket
->bucket_entry
);
1564 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1566 NtSetEvent( other_bucket
->update_event
, NULL
);
1573 /* Remove this bucket from the list. */
1574 list_remove( &bucket
->bucket_entry
);
1575 if (!--waitqueue
.num_buckets
)
1576 assert( list_empty( &waitqueue
.buckets
) );
1578 RtlLeaveCriticalSection( &waitqueue
.cs
);
1580 TRACE( "terminating wait queue thread\n" );
1582 assert( bucket
->objcount
== 0 );
1583 assert( list_empty( &bucket
->reserved
) );
1584 assert( list_empty( &bucket
->waiting
) );
1585 NtClose( bucket
->update_event
);
1587 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1590 /***********************************************************************
1591 * tp_waitqueue_lock (internal)
1593 static NTSTATUS
tp_waitqueue_lock( struct threadpool_object
*wait
)
1595 struct waitqueue_bucket
*bucket
;
1598 assert( wait
->type
= TP_OBJECT_TYPE_WAIT
);
1600 wait
->u
.wait
.signaled
= 0;
1601 wait
->u
.wait
.bucket
= NULL
;
1602 wait
->u
.wait
.wait_pending
= FALSE
;
1603 wait
->u
.wait
.timeout
= 0;
1604 wait
->u
.wait
.handle
= INVALID_HANDLE_VALUE
;
1606 RtlEnterCriticalSection( &waitqueue
.cs
);
1608 /* Try to assign to existing bucket if possible. */
1609 LIST_FOR_EACH_ENTRY( bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1611 if (bucket
->objcount
< MAXIMUM_WAITQUEUE_OBJECTS
)
1613 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1614 wait
->u
.wait
.bucket
= bucket
;
1617 status
= STATUS_SUCCESS
;
1622 /* Create a new bucket and corresponding worker thread. */
1623 bucket
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket
) );
1626 status
= STATUS_NO_MEMORY
;
1630 bucket
->objcount
= 0;
1631 list_init( &bucket
->reserved
);
1632 list_init( &bucket
->waiting
);
1634 status
= NtCreateEvent( &bucket
->update_event
, EVENT_ALL_ACCESS
,
1635 NULL
, SynchronizationEvent
, FALSE
);
1638 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1642 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1643 waitqueue_thread_proc
, bucket
, &thread
, NULL
);
1644 if (status
== STATUS_SUCCESS
)
1646 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1647 waitqueue
.num_buckets
++;
1649 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1650 wait
->u
.wait
.bucket
= bucket
;
1657 NtClose( bucket
->update_event
);
1658 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1662 RtlLeaveCriticalSection( &waitqueue
.cs
);
1666 /***********************************************************************
1667 * tp_waitqueue_unlock (internal)
1669 static void tp_waitqueue_unlock( struct threadpool_object
*wait
)
1671 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1673 RtlEnterCriticalSection( &waitqueue
.cs
);
1674 if (wait
->u
.wait
.bucket
)
1676 struct waitqueue_bucket
*bucket
= wait
->u
.wait
.bucket
;
1677 assert( bucket
->objcount
> 0 );
1679 list_remove( &wait
->u
.wait
.wait_entry
);
1680 wait
->u
.wait
.bucket
= NULL
;
1683 NtSetEvent( bucket
->update_event
, NULL
);
1685 RtlLeaveCriticalSection( &waitqueue
.cs
);
1688 /***********************************************************************
1689 * tp_threadpool_alloc (internal)
1691 * Allocates a new threadpool object.
1693 static NTSTATUS
tp_threadpool_alloc( struct threadpool
**out
)
1695 struct threadpool
*pool
;
1697 pool
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool
) );
1699 return STATUS_NO_MEMORY
;
1703 pool
->shutdown
= FALSE
;
1705 RtlInitializeCriticalSection( &pool
->cs
);
1706 pool
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool.cs");
1708 list_init( &pool
->pool
);
1709 RtlInitializeConditionVariable( &pool
->update_event
);
1711 pool
->max_workers
= 500;
1712 pool
->min_workers
= 0;
1713 pool
->num_workers
= 0;
1714 pool
->num_busy_workers
= 0;
1716 TRACE( "allocated threadpool %p\n", pool
);
1719 return STATUS_SUCCESS
;
1722 /***********************************************************************
1723 * tp_threadpool_shutdown (internal)
1725 * Prepares the shutdown of a threadpool object and notifies all worker
1726 * threads to terminate (after all remaining work items have been
1729 static void tp_threadpool_shutdown( struct threadpool
*pool
)
1731 assert( pool
!= default_threadpool
);
1733 pool
->shutdown
= TRUE
;
1734 RtlWakeAllConditionVariable( &pool
->update_event
);
1737 /***********************************************************************
1738 * tp_threadpool_release (internal)
1740 * Releases a reference to a threadpool object.
1742 static BOOL
tp_threadpool_release( struct threadpool
*pool
)
1744 if (interlocked_dec( &pool
->refcount
))
1747 TRACE( "destroying threadpool %p\n", pool
);
1749 assert( pool
->shutdown
);
1750 assert( !pool
->objcount
);
1751 assert( list_empty( &pool
->pool
) );
1753 pool
->cs
.DebugInfo
->Spare
[0] = 0;
1754 RtlDeleteCriticalSection( &pool
->cs
);
1756 RtlFreeHeap( GetProcessHeap(), 0, pool
);
1760 /***********************************************************************
1761 * tp_threadpool_lock (internal)
1763 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1764 * block. When the lock is acquired successfully, it is guaranteed that
1765 * there is at least one worker thread to process tasks.
1767 static NTSTATUS
tp_threadpool_lock( struct threadpool
**out
, TP_CALLBACK_ENVIRON
*environment
)
1769 struct threadpool
*pool
= NULL
;
1770 NTSTATUS status
= STATUS_SUCCESS
;
1773 pool
= (struct threadpool
*)environment
->Pool
;
1777 if (!default_threadpool
)
1779 status
= tp_threadpool_alloc( &pool
);
1780 if (status
!= STATUS_SUCCESS
)
1783 if (interlocked_cmpxchg_ptr( (void *)&default_threadpool
, pool
, NULL
) != NULL
)
1785 tp_threadpool_shutdown( pool
);
1786 tp_threadpool_release( pool
);
1790 pool
= default_threadpool
;
1793 RtlEnterCriticalSection( &pool
->cs
);
1795 /* Make sure that the threadpool has at least one thread. */
1796 if (!pool
->num_workers
)
1799 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1800 threadpool_worker_proc
, pool
, &thread
, NULL
);
1801 if (status
== STATUS_SUCCESS
)
1803 interlocked_inc( &pool
->refcount
);
1804 pool
->num_workers
++;
1809 /* Keep a reference, and increment objcount to ensure that the
1810 * last thread doesn't terminate. */
1811 if (status
== STATUS_SUCCESS
)
1813 interlocked_inc( &pool
->refcount
);
1817 RtlLeaveCriticalSection( &pool
->cs
);
1819 if (status
!= STATUS_SUCCESS
)
1823 return STATUS_SUCCESS
;
1826 /***********************************************************************
1827 * tp_threadpool_unlock (internal)
1829 * Releases a lock on a threadpool.
1831 static void tp_threadpool_unlock( struct threadpool
*pool
)
1833 RtlEnterCriticalSection( &pool
->cs
);
1835 RtlLeaveCriticalSection( &pool
->cs
);
1836 tp_threadpool_release( pool
);
1839 /***********************************************************************
1840 * tp_group_alloc (internal)
1842 * Allocates a new threadpool group object.
1844 static NTSTATUS
tp_group_alloc( struct threadpool_group
**out
)
1846 struct threadpool_group
*group
;
1848 group
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group
) );
1850 return STATUS_NO_MEMORY
;
1852 group
->refcount
= 1;
1853 group
->shutdown
= FALSE
;
1855 RtlInitializeCriticalSection( &group
->cs
);
1856 group
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool_group.cs");
1858 list_init( &group
->members
);
1860 TRACE( "allocated group %p\n", group
);
1863 return STATUS_SUCCESS
;
1866 /***********************************************************************
1867 * tp_group_shutdown (internal)
1869 * Marks the group object for shutdown.
1871 static void tp_group_shutdown( struct threadpool_group
*group
)
1873 group
->shutdown
= TRUE
;
1876 /***********************************************************************
1877 * tp_group_release (internal)
1879 * Releases a reference to a group object.
1881 static BOOL
tp_group_release( struct threadpool_group
*group
)
1883 if (interlocked_dec( &group
->refcount
))
1886 TRACE( "destroying group %p\n", group
);
1888 assert( group
->shutdown
);
1889 assert( list_empty( &group
->members
) );
1891 group
->cs
.DebugInfo
->Spare
[0] = 0;
1892 RtlDeleteCriticalSection( &group
->cs
);
1894 RtlFreeHeap( GetProcessHeap(), 0, group
);
1898 /***********************************************************************
1899 * tp_object_initialize (internal)
1901 * Initializes members of a threadpool object.
1903 static void tp_object_initialize( struct threadpool_object
*object
, struct threadpool
*pool
,
1904 PVOID userdata
, TP_CALLBACK_ENVIRON
*environment
)
1906 BOOL is_simple_callback
= (object
->type
== TP_OBJECT_TYPE_SIMPLE
);
1908 object
->refcount
= 1;
1909 object
->shutdown
= FALSE
;
1911 object
->pool
= pool
;
1912 object
->group
= NULL
;
1913 object
->userdata
= userdata
;
1914 object
->group_cancel_callback
= NULL
;
1915 object
->finalization_callback
= NULL
;
1916 object
->may_run_long
= 0;
1917 object
->race_dll
= NULL
;
1919 memset( &object
->group_entry
, 0, sizeof(object
->group_entry
) );
1920 object
->is_group_member
= FALSE
;
1922 memset( &object
->pool_entry
, 0, sizeof(object
->pool_entry
) );
1923 RtlInitializeConditionVariable( &object
->finished_event
);
1924 RtlInitializeConditionVariable( &object
->group_finished_event
);
1925 object
->num_pending_callbacks
= 0;
1926 object
->num_running_callbacks
= 0;
1927 object
->num_associated_callbacks
= 0;
1931 if (environment
->Version
!= 1)
1932 FIXME( "unsupported environment version %u\n", environment
->Version
);
1934 object
->group
= impl_from_TP_CLEANUP_GROUP( environment
->CleanupGroup
);
1935 object
->group_cancel_callback
= environment
->CleanupGroupCancelCallback
;
1936 object
->finalization_callback
= environment
->FinalizationCallback
;
1937 object
->may_run_long
= environment
->u
.s
.LongFunction
!= 0;
1938 object
->race_dll
= environment
->RaceDll
;
1940 if (environment
->ActivationContext
)
1941 FIXME( "activation context not supported yet\n" );
1943 if (environment
->u
.s
.Persistent
)
1944 FIXME( "persistent threads not supported yet\n" );
1947 if (object
->race_dll
)
1948 LdrAddRefDll( 0, object
->race_dll
);
1950 TRACE( "allocated object %p of type %u\n", object
, object
->type
);
1952 /* For simple callbacks we have to run tp_object_submit before adding this object
1953 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1954 * will be set, and tp_object_submit would fail with an assertion. */
1956 if (is_simple_callback
)
1957 tp_object_submit( object
, FALSE
);
1961 struct threadpool_group
*group
= object
->group
;
1962 interlocked_inc( &group
->refcount
);
1964 RtlEnterCriticalSection( &group
->cs
);
1965 list_add_tail( &group
->members
, &object
->group_entry
);
1966 object
->is_group_member
= TRUE
;
1967 RtlLeaveCriticalSection( &group
->cs
);
1970 if (is_simple_callback
)
1972 tp_object_shutdown( object
);
1973 tp_object_release( object
);
1977 /***********************************************************************
1978 * tp_object_submit (internal)
1980 * Submits a threadpool object to the associcated threadpool. This
1981 * function has to be VOID because TpPostWork can never fail on Windows.
1983 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
)
1985 struct threadpool
*pool
= object
->pool
;
1986 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
1988 assert( !object
->shutdown
);
1989 assert( !pool
->shutdown
);
1991 RtlEnterCriticalSection( &pool
->cs
);
1993 /* Start new worker threads if required. */
1994 if (pool
->num_busy_workers
>= pool
->num_workers
&&
1995 pool
->num_workers
< pool
->max_workers
)
1998 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1999 threadpool_worker_proc
, pool
, &thread
, NULL
);
2000 if (status
== STATUS_SUCCESS
)
2002 interlocked_inc( &pool
->refcount
);
2003 pool
->num_workers
++;
2008 /* Queue work item and increment refcount. */
2009 interlocked_inc( &object
->refcount
);
2010 if (!object
->num_pending_callbacks
++)
2011 list_add_tail( &pool
->pool
, &object
->pool_entry
);
2013 /* Count how often the object was signaled. */
2014 if (object
->type
== TP_OBJECT_TYPE_WAIT
&& signaled
)
2015 object
->u
.wait
.signaled
++;
2017 /* No new thread started - wake up one existing thread. */
2018 if (status
!= STATUS_SUCCESS
)
2020 assert( pool
->num_workers
> 0 );
2021 RtlWakeConditionVariable( &pool
->update_event
);
2024 RtlLeaveCriticalSection( &pool
->cs
);
2027 /***********************************************************************
2028 * tp_object_cancel (internal)
2030 * Cancels all currently pending callbacks for a specific object.
2032 static void tp_object_cancel( struct threadpool_object
*object
, BOOL group_cancel
, PVOID userdata
)
2034 struct threadpool
*pool
= object
->pool
;
2035 LONG pending_callbacks
= 0;
2037 RtlEnterCriticalSection( &pool
->cs
);
2038 if (object
->num_pending_callbacks
)
2040 pending_callbacks
= object
->num_pending_callbacks
;
2041 object
->num_pending_callbacks
= 0;
2042 list_remove( &object
->pool_entry
);
2044 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2045 object
->u
.wait
.signaled
= 0;
2047 RtlLeaveCriticalSection( &pool
->cs
);
2049 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2050 if (pending_callbacks
&& group_cancel
&& object
->group_cancel_callback
)
2052 TRACE( "executing group cancel callback %p(%p, %p)\n", object
->group_cancel_callback
, object
, userdata
);
2053 object
->group_cancel_callback( object
, userdata
);
2054 TRACE( "callback %p returned\n", object
->group_cancel_callback
);
2057 while (pending_callbacks
--)
2058 tp_object_release( object
);
2061 /***********************************************************************
2062 * tp_object_wait (internal)
2064 * Waits until all pending and running callbacks of a specific object
2065 * have been processed.
2067 static void tp_object_wait( struct threadpool_object
*object
, BOOL group_wait
)
2069 struct threadpool
*pool
= object
->pool
;
2071 RtlEnterCriticalSection( &pool
->cs
);
2074 while (object
->num_pending_callbacks
|| object
->num_running_callbacks
)
2075 RtlSleepConditionVariableCS( &object
->group_finished_event
, &pool
->cs
, NULL
);
2079 while (object
->num_pending_callbacks
|| object
->num_associated_callbacks
)
2080 RtlSleepConditionVariableCS( &object
->finished_event
, &pool
->cs
, NULL
);
2082 RtlLeaveCriticalSection( &pool
->cs
);
2085 /***********************************************************************
2086 * tp_object_shutdown (internal)
2088 * Marks a threadpool object for shutdown (which means that no further
2089 * tasks can be submitted).
2091 static void tp_object_shutdown( struct threadpool_object
*object
)
2093 if (object
->type
== TP_OBJECT_TYPE_TIMER
)
2094 tp_timerqueue_unlock( object
);
2095 else if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2096 tp_waitqueue_unlock( object
);
2098 object
->shutdown
= TRUE
;
2101 /***********************************************************************
2102 * tp_object_release (internal)
2104 * Releases a reference to a threadpool object.
2106 static BOOL
tp_object_release( struct threadpool_object
*object
)
2108 if (interlocked_dec( &object
->refcount
))
2111 TRACE( "destroying object %p of type %u\n", object
, object
->type
);
2113 assert( object
->shutdown
);
2114 assert( !object
->num_pending_callbacks
);
2115 assert( !object
->num_running_callbacks
);
2116 assert( !object
->num_associated_callbacks
);
2118 /* release reference to the group */
2121 struct threadpool_group
*group
= object
->group
;
2123 RtlEnterCriticalSection( &group
->cs
);
2124 if (object
->is_group_member
)
2126 list_remove( &object
->group_entry
);
2127 object
->is_group_member
= FALSE
;
2129 RtlLeaveCriticalSection( &group
->cs
);
2131 tp_group_release( group
);
2134 tp_threadpool_unlock( object
->pool
);
2136 if (object
->race_dll
)
2137 LdrUnloadDll( object
->race_dll
);
2139 RtlFreeHeap( GetProcessHeap(), 0, object
);
2143 /***********************************************************************
2144 * threadpool_worker_proc (internal)
2146 static void CALLBACK
threadpool_worker_proc( void *param
)
2148 TP_CALLBACK_INSTANCE
*callback_instance
;
2149 struct threadpool_instance instance
;
2150 struct threadpool
*pool
= param
;
2151 TP_WAIT_RESULT wait_result
= 0;
2152 LARGE_INTEGER timeout
;
2156 TRACE( "starting worker thread for pool %p\n", pool
);
2158 RtlEnterCriticalSection( &pool
->cs
);
2161 while ((ptr
= list_head( &pool
->pool
)))
2163 struct threadpool_object
*object
= LIST_ENTRY( ptr
, struct threadpool_object
, pool_entry
);
2164 assert( object
->num_pending_callbacks
> 0 );
2166 /* If further pending callbacks are queued, move the work item to
2167 * the end of the pool list. Otherwise remove it from the pool. */
2168 list_remove( &object
->pool_entry
);
2169 if (--object
->num_pending_callbacks
)
2170 list_add_tail( &pool
->pool
, &object
->pool_entry
);
2172 /* For wait objects check if they were signaled or have timed out. */
2173 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2175 wait_result
= object
->u
.wait
.signaled
? WAIT_OBJECT_0
: WAIT_TIMEOUT
;
2176 if (wait_result
== WAIT_OBJECT_0
) object
->u
.wait
.signaled
--;
2179 /* Leave critical section and do the actual callback. */
2180 object
->num_associated_callbacks
++;
2181 object
->num_running_callbacks
++;
2182 pool
->num_busy_workers
++;
2183 RtlLeaveCriticalSection( &pool
->cs
);
2185 /* Initialize threadpool instance struct. */
2186 callback_instance
= (TP_CALLBACK_INSTANCE
*)&instance
;
2187 instance
.object
= object
;
2188 instance
.threadid
= GetCurrentThreadId();
2189 instance
.associated
= TRUE
;
2190 instance
.may_run_long
= object
->may_run_long
;
2191 instance
.cleanup
.critical_section
= NULL
;
2192 instance
.cleanup
.mutex
= NULL
;
2193 instance
.cleanup
.semaphore
= NULL
;
2194 instance
.cleanup
.semaphore_count
= 0;
2195 instance
.cleanup
.event
= NULL
;
2196 instance
.cleanup
.library
= NULL
;
2198 switch (object
->type
)
2200 case TP_OBJECT_TYPE_SIMPLE
:
2202 TRACE( "executing simple callback %p(%p, %p)\n",
2203 object
->u
.simple
.callback
, callback_instance
, object
->userdata
);
2204 object
->u
.simple
.callback( callback_instance
, object
->userdata
);
2205 TRACE( "callback %p returned\n", object
->u
.simple
.callback
);
2209 case TP_OBJECT_TYPE_WORK
:
2211 TRACE( "executing work callback %p(%p, %p, %p)\n",
2212 object
->u
.work
.callback
, callback_instance
, object
->userdata
, object
);
2213 object
->u
.work
.callback( callback_instance
, object
->userdata
, (TP_WORK
*)object
);
2214 TRACE( "callback %p returned\n", object
->u
.work
.callback
);
2218 case TP_OBJECT_TYPE_TIMER
:
2220 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2221 object
->u
.timer
.callback
, callback_instance
, object
->userdata
, object
);
2222 object
->u
.timer
.callback( callback_instance
, object
->userdata
, (TP_TIMER
*)object
);
2223 TRACE( "callback %p returned\n", object
->u
.timer
.callback
);
2227 case TP_OBJECT_TYPE_WAIT
:
2229 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2230 object
->u
.wait
.callback
, callback_instance
, object
->userdata
, object
, wait_result
);
2231 object
->u
.wait
.callback( callback_instance
, object
->userdata
, (TP_WAIT
*)object
, wait_result
);
2232 TRACE( "callback %p returned\n", object
->u
.wait
.callback
);
2241 /* Execute finalization callback. */
2242 if (object
->finalization_callback
)
2244 TRACE( "executing finalization callback %p(%p, %p)\n",
2245 object
->finalization_callback
, callback_instance
, object
->userdata
);
2246 object
->finalization_callback( callback_instance
, object
->userdata
);
2247 TRACE( "callback %p returned\n", object
->finalization_callback
);
2250 /* Execute cleanup tasks. */
2251 if (instance
.cleanup
.critical_section
)
2253 RtlLeaveCriticalSection( instance
.cleanup
.critical_section
);
2255 if (instance
.cleanup
.mutex
)
2257 status
= NtReleaseMutant( instance
.cleanup
.mutex
, NULL
);
2258 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2260 if (instance
.cleanup
.semaphore
)
2262 status
= NtReleaseSemaphore( instance
.cleanup
.semaphore
, instance
.cleanup
.semaphore_count
, NULL
);
2263 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2265 if (instance
.cleanup
.event
)
2267 status
= NtSetEvent( instance
.cleanup
.event
, NULL
);
2268 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2270 if (instance
.cleanup
.library
)
2272 LdrUnloadDll( instance
.cleanup
.library
);
2276 RtlEnterCriticalSection( &pool
->cs
);
2277 pool
->num_busy_workers
--;
2279 object
->num_running_callbacks
--;
2280 if (!object
->num_pending_callbacks
&& !object
->num_running_callbacks
)
2281 RtlWakeAllConditionVariable( &object
->group_finished_event
);
2283 if (instance
.associated
)
2285 object
->num_associated_callbacks
--;
2286 if (!object
->num_pending_callbacks
&& !object
->num_associated_callbacks
)
2287 RtlWakeAllConditionVariable( &object
->finished_event
);
2290 tp_object_release( object
);
2293 /* Shutdown worker thread if requested. */
2297 /* Wait for new tasks or until the timeout expires. A thread only terminates
2298 * when no new tasks are available, and the number of threads can be
2299 * decreased without violating the min_workers limit. An exception is when
2300 * min_workers == 0, then objcount is used to detect if the last thread
2301 * can be terminated. */
2302 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
2303 if (RtlSleepConditionVariableCS( &pool
->update_event
, &pool
->cs
, &timeout
) == STATUS_TIMEOUT
&&
2304 !list_head( &pool
->pool
) && (pool
->num_workers
> max( pool
->min_workers
, 1 ) ||
2305 (!pool
->min_workers
&& !pool
->objcount
)))
2310 pool
->num_workers
--;
2311 RtlLeaveCriticalSection( &pool
->cs
);
2313 TRACE( "terminating worker thread for pool %p\n", pool
);
2314 tp_threadpool_release( pool
);
2317 /***********************************************************************
2318 * TpAllocCleanupGroup (NTDLL.@)
2320 NTSTATUS WINAPI
TpAllocCleanupGroup( TP_CLEANUP_GROUP
**out
)
2322 TRACE( "%p\n", out
);
2324 return tp_group_alloc( (struct threadpool_group
**)out
);
2327 /***********************************************************************
2328 * TpAllocPool (NTDLL.@)
2330 NTSTATUS WINAPI
TpAllocPool( TP_POOL
**out
, PVOID reserved
)
2332 TRACE( "%p %p\n", out
, reserved
);
2335 FIXME( "reserved argument is nonzero (%p)", reserved
);
2337 return tp_threadpool_alloc( (struct threadpool
**)out
);
2340 /***********************************************************************
2341 * TpAllocTimer (NTDLL.@)
2343 NTSTATUS WINAPI
TpAllocTimer( TP_TIMER
**out
, PTP_TIMER_CALLBACK callback
, PVOID userdata
,
2344 TP_CALLBACK_ENVIRON
*environment
)
2346 struct threadpool_object
*object
;
2347 struct threadpool
*pool
;
2350 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2352 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2354 return STATUS_NO_MEMORY
;
2356 status
= tp_threadpool_lock( &pool
, environment
);
2359 RtlFreeHeap( GetProcessHeap(), 0, object
);
2363 object
->type
= TP_OBJECT_TYPE_TIMER
;
2364 object
->u
.timer
.callback
= callback
;
2366 status
= tp_timerqueue_lock( object
);
2369 tp_threadpool_unlock( pool
);
2370 RtlFreeHeap( GetProcessHeap(), 0, object
);
2374 tp_object_initialize( object
, pool
, userdata
, environment
);
2376 *out
= (TP_TIMER
*)object
;
2377 return STATUS_SUCCESS
;
2380 /***********************************************************************
2381 * TpAllocWait (NTDLL.@)
2383 NTSTATUS WINAPI
TpAllocWait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2384 TP_CALLBACK_ENVIRON
*environment
)
2386 struct threadpool_object
*object
;
2387 struct threadpool
*pool
;
2390 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2392 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2394 return STATUS_NO_MEMORY
;
2396 status
= tp_threadpool_lock( &pool
, environment
);
2399 RtlFreeHeap( GetProcessHeap(), 0, object
);
2403 object
->type
= TP_OBJECT_TYPE_WAIT
;
2404 object
->u
.wait
.callback
= callback
;
2406 status
= tp_waitqueue_lock( object
);
2409 tp_threadpool_unlock( pool
);
2410 RtlFreeHeap( GetProcessHeap(), 0, object
);
2414 tp_object_initialize( object
, pool
, userdata
, environment
);
2416 *out
= (TP_WAIT
*)object
;
2417 return STATUS_SUCCESS
;
2420 /***********************************************************************
2421 * TpAllocWork (NTDLL.@)
2423 NTSTATUS WINAPI
TpAllocWork( TP_WORK
**out
, PTP_WORK_CALLBACK callback
, PVOID userdata
,
2424 TP_CALLBACK_ENVIRON
*environment
)
2426 struct threadpool_object
*object
;
2427 struct threadpool
*pool
;
2430 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2432 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2434 return STATUS_NO_MEMORY
;
2436 status
= tp_threadpool_lock( &pool
, environment
);
2439 RtlFreeHeap( GetProcessHeap(), 0, object
);
2443 object
->type
= TP_OBJECT_TYPE_WORK
;
2444 object
->u
.work
.callback
= callback
;
2445 tp_object_initialize( object
, pool
, userdata
, environment
);
2447 *out
= (TP_WORK
*)object
;
2448 return STATUS_SUCCESS
;
2451 /***********************************************************************
2452 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2454 VOID WINAPI
TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE
*instance
, CRITICAL_SECTION
*crit
)
2456 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2458 TRACE( "%p %p\n", instance
, crit
);
2460 if (!this->cleanup
.critical_section
)
2461 this->cleanup
.critical_section
= crit
;
2464 /***********************************************************************
2465 * TpCallbackMayRunLong (NTDLL.@)
2467 NTSTATUS WINAPI
TpCallbackMayRunLong( TP_CALLBACK_INSTANCE
*instance
)
2469 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2470 struct threadpool_object
*object
= this->object
;
2471 struct threadpool
*pool
;
2472 NTSTATUS status
= STATUS_SUCCESS
;
2474 TRACE( "%p\n", instance
);
2476 if (this->threadid
!= GetCurrentThreadId())
2478 ERR("called from wrong thread, ignoring\n");
2479 return STATUS_UNSUCCESSFUL
; /* FIXME */
2482 if (this->may_run_long
)
2483 return STATUS_SUCCESS
;
2485 pool
= object
->pool
;
2486 RtlEnterCriticalSection( &pool
->cs
);
2488 /* Start new worker threads if required. */
2489 if (pool
->num_busy_workers
>= pool
->num_workers
)
2491 if (pool
->num_workers
< pool
->max_workers
)
2494 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
2495 threadpool_worker_proc
, pool
, &thread
, NULL
);
2496 if (status
== STATUS_SUCCESS
)
2498 interlocked_inc( &pool
->refcount
);
2499 pool
->num_workers
++;
2505 status
= STATUS_TOO_MANY_THREADS
;
2509 RtlLeaveCriticalSection( &pool
->cs
);
2510 this->may_run_long
= TRUE
;
2514 /***********************************************************************
2515 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2517 VOID WINAPI
TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE mutex
)
2519 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2521 TRACE( "%p %p\n", instance
, mutex
);
2523 if (!this->cleanup
.mutex
)
2524 this->cleanup
.mutex
= mutex
;
2527 /***********************************************************************
2528 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2530 VOID WINAPI
TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE semaphore
, DWORD count
)
2532 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2534 TRACE( "%p %p %u\n", instance
, semaphore
, count
);
2536 if (!this->cleanup
.semaphore
)
2538 this->cleanup
.semaphore
= semaphore
;
2539 this->cleanup
.semaphore_count
= count
;
2543 /***********************************************************************
2544 * TpCallbackSetEventOnCompletion (NTDLL.@)
2546 VOID WINAPI
TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE event
)
2548 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2550 TRACE( "%p %p\n", instance
, event
);
2552 if (!this->cleanup
.event
)
2553 this->cleanup
.event
= event
;
2556 /***********************************************************************
2557 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2559 VOID WINAPI
TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HMODULE module
)
2561 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2563 TRACE( "%p %p\n", instance
, module
);
2565 if (!this->cleanup
.library
)
2566 this->cleanup
.library
= module
;
2569 /***********************************************************************
2570 * TpDisassociateCallback (NTDLL.@)
2572 VOID WINAPI
TpDisassociateCallback( TP_CALLBACK_INSTANCE
*instance
)
2574 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2575 struct threadpool_object
*object
= this->object
;
2576 struct threadpool
*pool
;
2578 TRACE( "%p\n", instance
);
2580 if (this->threadid
!= GetCurrentThreadId())
2582 ERR("called from wrong thread, ignoring\n");
2586 if (!this->associated
)
2589 pool
= object
->pool
;
2590 RtlEnterCriticalSection( &pool
->cs
);
2592 object
->num_associated_callbacks
--;
2593 if (!object
->num_pending_callbacks
&& !object
->num_associated_callbacks
)
2594 RtlWakeAllConditionVariable( &object
->finished_event
);
2596 RtlLeaveCriticalSection( &pool
->cs
);
2597 this->associated
= FALSE
;
2600 /***********************************************************************
2601 * TpIsTimerSet (NTDLL.@)
2603 BOOL WINAPI
TpIsTimerSet( TP_TIMER
*timer
)
2605 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2607 TRACE( "%p\n", timer
);
2609 return this->u
.timer
.timer_set
;
2612 /***********************************************************************
2613 * TpPostWork (NTDLL.@)
2615 VOID WINAPI
TpPostWork( TP_WORK
*work
)
2617 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2619 TRACE( "%p\n", work
);
2621 tp_object_submit( this, FALSE
);
2624 /***********************************************************************
2625 * TpReleaseCleanupGroup (NTDLL.@)
2627 VOID WINAPI
TpReleaseCleanupGroup( TP_CLEANUP_GROUP
*group
)
2629 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2631 TRACE( "%p\n", group
);
2633 tp_group_shutdown( this );
2634 tp_group_release( this );
2637 /***********************************************************************
2638 * TpReleaseCleanupGroupMembers (NTDLL.@)
2640 VOID WINAPI
TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP
*group
, BOOL cancel_pending
, PVOID userdata
)
2642 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2643 struct threadpool_object
*object
, *next
;
2644 struct list members
;
2646 TRACE( "%p %u %p\n", group
, cancel_pending
, userdata
);
2648 RtlEnterCriticalSection( &this->cs
);
2650 /* Unset group, increase references, and mark objects for shutdown */
2651 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &this->members
, struct threadpool_object
, group_entry
)
2653 assert( object
->group
== this );
2654 assert( object
->is_group_member
);
2656 /* Simple callbacks are very special. The user doesn't hold any reference, so
2657 * they would be released too early. Add one additional temporary reference. */
2658 if (object
->type
== TP_OBJECT_TYPE_SIMPLE
)
2660 if (interlocked_inc( &object
->refcount
) == 1)
2662 /* Object is basically already destroyed, but group reference
2663 * was not deleted yet. We can safely ignore this object. */
2664 interlocked_dec( &object
->refcount
);
2665 list_remove( &object
->group_entry
);
2666 object
->is_group_member
= FALSE
;
2671 object
->is_group_member
= FALSE
;
2672 tp_object_shutdown( object
);
2675 /* Move members to a new temporary list */
2676 list_init( &members
);
2677 list_move_tail( &members
, &this->members
);
2679 RtlLeaveCriticalSection( &this->cs
);
2681 /* Cancel pending callbacks if requested */
2684 LIST_FOR_EACH_ENTRY( object
, &members
, struct threadpool_object
, group_entry
)
2686 tp_object_cancel( object
, TRUE
, userdata
);
2690 /* Wait for remaining callbacks to finish */
2691 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &members
, struct threadpool_object
, group_entry
)
2693 tp_object_wait( object
, TRUE
);
2694 tp_object_release( object
);
2698 /***********************************************************************
2699 * TpReleasePool (NTDLL.@)
2701 VOID WINAPI
TpReleasePool( TP_POOL
*pool
)
2703 struct threadpool
*this = impl_from_TP_POOL( pool
);
2705 TRACE( "%p\n", pool
);
2707 tp_threadpool_shutdown( this );
2708 tp_threadpool_release( this );
2711 /***********************************************************************
2712 * TpReleaseTimer (NTDLL.@)
2714 VOID WINAPI
TpReleaseTimer( TP_TIMER
*timer
)
2716 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2718 TRACE( "%p\n", timer
);
2720 tp_object_shutdown( this );
2721 tp_object_release( this );
2724 /***********************************************************************
2725 * TpReleaseWait (NTDLL.@)
2727 VOID WINAPI
TpReleaseWait( TP_WAIT
*wait
)
2729 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2731 TRACE( "%p\n", wait
);
2733 tp_object_shutdown( this );
2734 tp_object_release( this );
2737 /***********************************************************************
2738 * TpReleaseWork (NTDLL.@)
2740 VOID WINAPI
TpReleaseWork( TP_WORK
*work
)
2742 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2744 TRACE( "%p\n", work
);
2746 tp_object_shutdown( this );
2747 tp_object_release( this );
2750 /***********************************************************************
2751 * TpSetPoolMaxThreads (NTDLL.@)
2753 VOID WINAPI
TpSetPoolMaxThreads( TP_POOL
*pool
, DWORD maximum
)
2755 struct threadpool
*this = impl_from_TP_POOL( pool
);
2757 TRACE( "%p %u\n", pool
, maximum
);
2759 RtlEnterCriticalSection( &this->cs
);
2760 this->max_workers
= max( maximum
, 1 );
2761 this->min_workers
= min( this->min_workers
, this->max_workers
);
2762 RtlLeaveCriticalSection( &this->cs
);
2765 /***********************************************************************
2766 * TpSetPoolMinThreads (NTDLL.@)
2768 BOOL WINAPI
TpSetPoolMinThreads( TP_POOL
*pool
, DWORD minimum
)
2770 struct threadpool
*this = impl_from_TP_POOL( pool
);
2771 NTSTATUS status
= STATUS_SUCCESS
;
2773 TRACE( "%p %u\n", pool
, minimum
);
2775 RtlEnterCriticalSection( &this->cs
);
2777 while (this->num_workers
< minimum
)
2780 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
2781 threadpool_worker_proc
, this, &thread
, NULL
);
2782 if (status
!= STATUS_SUCCESS
)
2785 interlocked_inc( &this->refcount
);
2786 this->num_workers
++;
2790 if (status
== STATUS_SUCCESS
)
2792 this->min_workers
= minimum
;
2793 this->max_workers
= max( this->min_workers
, this->max_workers
);
2796 RtlLeaveCriticalSection( &this->cs
);
2800 /***********************************************************************
2801 * TpSetTimer (NTDLL.@)
2803 VOID WINAPI
TpSetTimer( TP_TIMER
*timer
, LARGE_INTEGER
*timeout
, LONG period
, LONG window_length
)
2805 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2806 struct threadpool_object
*other_timer
;
2807 BOOL submit_timer
= FALSE
;
2808 ULONGLONG timestamp
;
2810 TRACE( "%p %p %u %u\n", timer
, timeout
, period
, window_length
);
2812 RtlEnterCriticalSection( &timerqueue
.cs
);
2814 assert( this->u
.timer
.timer_initialized
);
2815 this->u
.timer
.timer_set
= timeout
!= NULL
;
2817 /* Convert relative timeout to absolute timestamp and handle a timeout
2818 * of zero, which means that the timer is submitted immediately. */
2821 timestamp
= timeout
->QuadPart
;
2822 if ((LONGLONG
)timestamp
< 0)
2825 NtQuerySystemTime( &now
);
2826 timestamp
= now
.QuadPart
- timestamp
;
2828 else if (!timestamp
)
2835 NtQuerySystemTime( &now
);
2836 timestamp
= now
.QuadPart
+ (ULONGLONG
)period
* 10000;
2838 submit_timer
= TRUE
;
2842 /* First remove existing timeout. */
2843 if (this->u
.timer
.timer_pending
)
2845 list_remove( &this->u
.timer
.timer_entry
);
2846 this->u
.timer
.timer_pending
= FALSE
;
2849 /* If the timer was enabled, then add it back to the queue. */
2852 this->u
.timer
.timeout
= timestamp
;
2853 this->u
.timer
.period
= period
;
2854 this->u
.timer
.window_length
= window_length
;
2856 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
2857 struct threadpool_object
, u
.timer
.timer_entry
)
2859 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
2860 if (this->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
2863 list_add_before( &other_timer
->u
.timer
.timer_entry
, &this->u
.timer
.timer_entry
);
2865 /* Wake up the timer thread when the timeout has to be updated. */
2866 if (list_head( &timerqueue
.pending_timers
) == &this->u
.timer
.timer_entry
)
2867 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
2869 this->u
.timer
.timer_pending
= TRUE
;
2872 RtlLeaveCriticalSection( &timerqueue
.cs
);
2875 tp_object_submit( this, FALSE
);
2878 /***********************************************************************
2879 * TpSetWait (NTDLL.@)
2881 VOID WINAPI
TpSetWait( TP_WAIT
*wait
, HANDLE handle
, LARGE_INTEGER
*timeout
)
2883 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2884 ULONGLONG timestamp
= TIMEOUT_INFINITE
;
2885 BOOL submit_wait
= FALSE
;
2887 TRACE( "%p %p %p\n", wait
, handle
, timeout
);
2889 RtlEnterCriticalSection( &waitqueue
.cs
);
2891 assert( this->u
.wait
.bucket
);
2892 this->u
.wait
.handle
= handle
;
2894 if (handle
|| this->u
.wait
.wait_pending
)
2896 struct waitqueue_bucket
*bucket
= this->u
.wait
.bucket
;
2897 list_remove( &this->u
.wait
.wait_entry
);
2899 /* Convert relative timeout to absolute timestamp. */
2900 if (handle
&& timeout
)
2902 timestamp
= timeout
->QuadPart
;
2903 if ((LONGLONG
)timestamp
< 0)
2906 NtQuerySystemTime( &now
);
2907 timestamp
= now
.QuadPart
- timestamp
;
2909 else if (!timestamp
)
2916 /* Add wait object back into one of the queues. */
2919 list_add_tail( &bucket
->waiting
, &this->u
.wait
.wait_entry
);
2920 this->u
.wait
.wait_pending
= TRUE
;
2921 this->u
.wait
.timeout
= timestamp
;
2925 list_add_tail( &bucket
->reserved
, &this->u
.wait
.wait_entry
);
2926 this->u
.wait
.wait_pending
= FALSE
;
2929 /* Wake up the wait queue thread. */
2930 NtSetEvent( bucket
->update_event
, NULL
);
2933 RtlLeaveCriticalSection( &waitqueue
.cs
);
2936 tp_object_submit( this, FALSE
);
2939 /***********************************************************************
2940 * TpSimpleTryPost (NTDLL.@)
2942 NTSTATUS WINAPI
TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback
, PVOID userdata
,
2943 TP_CALLBACK_ENVIRON
*environment
)
2945 struct threadpool_object
*object
;
2946 struct threadpool
*pool
;
2949 TRACE( "%p %p %p\n", callback
, userdata
, environment
);
2951 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2953 return STATUS_NO_MEMORY
;
2955 status
= tp_threadpool_lock( &pool
, environment
);
2958 RtlFreeHeap( GetProcessHeap(), 0, object
);
2962 object
->type
= TP_OBJECT_TYPE_SIMPLE
;
2963 object
->u
.simple
.callback
= callback
;
2964 tp_object_initialize( object
, pool
, userdata
, environment
);
2966 return STATUS_SUCCESS
;
2969 /***********************************************************************
2970 * TpWaitForTimer (NTDLL.@)
2972 VOID WINAPI
TpWaitForTimer( TP_TIMER
*timer
, BOOL cancel_pending
)
2974 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2976 TRACE( "%p %d\n", timer
, cancel_pending
);
2979 tp_object_cancel( this, FALSE
, NULL
);
2980 tp_object_wait( this, FALSE
);
2983 /***********************************************************************
2984 * TpWaitForWait (NTDLL.@)
2986 VOID WINAPI
TpWaitForWait( TP_WAIT
*wait
, BOOL cancel_pending
)
2988 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2990 TRACE( "%p %d\n", wait
, cancel_pending
);
2993 tp_object_cancel( this, FALSE
, NULL
);
2994 tp_object_wait( this, FALSE
);
2997 /***********************************************************************
2998 * TpWaitForWork (NTDLL.@)
3000 VOID WINAPI
TpWaitForWork( TP_WORK
*work
, BOOL cancel_pending
)
3002 struct threadpool_object
*this = impl_from_TP_WORK( work
);
3004 TRACE( "%p %u\n", work
, cancel_pending
);
3007 tp_object_cancel( this, FALSE
, NULL
);
3008 tp_object_wait( this, FALSE
);