4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
23 #include "wine/port.h"
29 #define NONAMELESSUNION
31 #define WIN32_NO_STATUS
34 #include "wine/debug.h"
35 #include "wine/list.h"
37 #include "ntdll_misc.h"
39 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
42 * Old thread pooling API
47 PRTL_WORK_ITEM_ROUTINE function
;
51 #define EXPIRE_NEVER (~(ULONGLONG)0)
52 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
54 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
;
59 RTL_CRITICAL_SECTION threadpool_compl_cs
;
63 NULL
, /* compl_port */
64 { &critsect_compl_debug
, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
67 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
69 0, 0, &old_threadpool
.threadpool_compl_cs
,
70 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
71 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
78 WAITORTIMERCALLBACK Callback
;
82 HANDLE CompletionEvent
;
84 int CallbackInProgress
;
90 struct timer_queue
*q
;
92 ULONG runcount
; /* number of callbacks pending execution */
93 RTL_WAITORTIMERCALLBACKFUNC callback
;
98 BOOL destroy
; /* timer should be deleted; once set, never unset */
99 HANDLE event
; /* removal event */
105 RTL_CRITICAL_SECTION cs
;
106 struct list timers
; /* sorted by expiration time */
107 BOOL quit
; /* queue should be deleted; once set, never unset */
113 * Object-oriented thread pooling API
116 #define THREADPOOL_WORKER_TIMEOUT 5000
117 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
119 /* internal threadpool representation */
126 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
127 struct list pools
[3];
128 RTL_CONDITION_VARIABLE update_event
;
129 /* information about worker threads, locked via .cs */
133 int num_busy_workers
;
136 enum threadpool_objtype
138 TP_OBJECT_TYPE_SIMPLE
,
140 TP_OBJECT_TYPE_TIMER
,
144 /* internal threadpool object representation */
145 struct threadpool_object
149 /* read-only information */
150 enum threadpool_objtype type
;
151 struct threadpool
*pool
;
152 struct threadpool_group
*group
;
154 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
;
155 PTP_SIMPLE_CALLBACK finalization_callback
;
158 TP_CALLBACK_PRIORITY priority
;
159 /* information about the group, locked via .group->cs */
160 struct list group_entry
;
161 BOOL is_group_member
;
162 /* information about the pool, locked via .pool->cs */
163 struct list pool_entry
;
164 RTL_CONDITION_VARIABLE finished_event
;
165 RTL_CONDITION_VARIABLE group_finished_event
;
166 LONG num_pending_callbacks
;
167 LONG num_running_callbacks
;
168 LONG num_associated_callbacks
;
169 /* arguments for callback */
174 PTP_SIMPLE_CALLBACK callback
;
178 PTP_WORK_CALLBACK callback
;
182 PTP_TIMER_CALLBACK callback
;
183 /* information about the timer, locked via timerqueue.cs */
184 BOOL timer_initialized
;
186 struct list timer_entry
;
194 PTP_WAIT_CALLBACK callback
;
196 /* information about the wait object, locked via waitqueue.cs */
197 struct waitqueue_bucket
*bucket
;
199 struct list wait_entry
;
206 /* internal threadpool instance representation */
207 struct threadpool_instance
209 struct threadpool_object
*object
;
215 CRITICAL_SECTION
*critical_section
;
218 LONG semaphore_count
;
224 /* internal threadpool group representation */
225 struct threadpool_group
230 /* list of group members, locked via .cs */
234 /* global timerqueue object */
235 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
;
242 struct list pending_timers
;
243 RTL_CONDITION_VARIABLE update_event
;
247 { &timerqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
249 FALSE
, /* thread_running */
250 LIST_INIT( timerqueue
.pending_timers
), /* pending_timers */
251 RTL_CONDITION_VARIABLE_INIT
/* update_event */
254 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
=
256 0, 0, &timerqueue
.cs
,
257 { &timerqueue_debug
.ProcessLocksList
, &timerqueue_debug
.ProcessLocksList
},
258 0, 0, { (DWORD_PTR
)(__FILE__
": timerqueue.cs") }
261 /* global waitqueue object */
262 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
;
272 { &waitqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
274 LIST_INIT( waitqueue
.buckets
) /* buckets */
277 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
=
280 { &waitqueue_debug
.ProcessLocksList
, &waitqueue_debug
.ProcessLocksList
},
281 0, 0, { (DWORD_PTR
)(__FILE__
": waitqueue.cs") }
284 struct waitqueue_bucket
286 struct list bucket_entry
;
288 struct list reserved
;
293 static inline struct threadpool
*impl_from_TP_POOL( TP_POOL
*pool
)
295 return (struct threadpool
*)pool
;
298 static inline struct threadpool_object
*impl_from_TP_WORK( TP_WORK
*work
)
300 struct threadpool_object
*object
= (struct threadpool_object
*)work
;
301 assert( object
->type
== TP_OBJECT_TYPE_WORK
);
305 static inline struct threadpool_object
*impl_from_TP_TIMER( TP_TIMER
*timer
)
307 struct threadpool_object
*object
= (struct threadpool_object
*)timer
;
308 assert( object
->type
== TP_OBJECT_TYPE_TIMER
);
312 static inline struct threadpool_object
*impl_from_TP_WAIT( TP_WAIT
*wait
)
314 struct threadpool_object
*object
= (struct threadpool_object
*)wait
;
315 assert( object
->type
== TP_OBJECT_TYPE_WAIT
);
319 static inline struct threadpool_group
*impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group
)
321 return (struct threadpool_group
*)group
;
324 static inline struct threadpool_instance
*impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE
*instance
)
326 return (struct threadpool_instance
*)instance
;
329 static void CALLBACK
threadpool_worker_proc( void *param
);
330 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
);
331 static void tp_object_prepare_shutdown( struct threadpool_object
*object
);
332 static BOOL
tp_object_release( struct threadpool_object
*object
);
333 static struct threadpool
*default_threadpool
= NULL
;
335 static inline LONG
interlocked_inc( PLONG dest
)
337 return interlocked_xchg_add( dest
, 1 ) + 1;
340 static inline LONG
interlocked_dec( PLONG dest
)
342 return interlocked_xchg_add( dest
, -1 ) - 1;
345 static void CALLBACK
process_rtl_work_item( TP_CALLBACK_INSTANCE
*instance
, void *userdata
)
347 struct rtl_work_item
*item
= userdata
;
349 TRACE("executing %p(%p)\n", item
->function
, item
->context
);
350 item
->function( item
->context
);
352 RtlFreeHeap( GetProcessHeap(), 0, item
);
355 /***********************************************************************
356 * RtlQueueWorkItem (NTDLL.@)
358 * Queues a work item into a thread in the thread pool.
361 * function [I] Work function to execute.
362 * context [I] Context to pass to the work function when it is executed.
363 * flags [I] Flags. See notes.
366 * Success: STATUS_SUCCESS.
367 * Failure: Any NTSTATUS code.
370 * Flags can be one or more of the following:
371 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
372 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
373 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
374 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
375 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
377 NTSTATUS WINAPI
RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function
, PVOID context
, ULONG flags
)
379 TP_CALLBACK_ENVIRON environment
;
380 struct rtl_work_item
*item
;
383 TRACE( "%p %p %u\n", function
, context
, flags
);
385 item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item
) );
387 return STATUS_NO_MEMORY
;
389 memset( &environment
, 0, sizeof(environment
) );
390 environment
.Version
= 1;
391 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
392 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
394 item
->function
= function
;
395 item
->context
= context
;
397 status
= TpSimpleTryPost( process_rtl_work_item
, item
, &environment
);
398 if (status
) RtlFreeHeap( GetProcessHeap(), 0, item
);
402 /***********************************************************************
403 * iocp_poller - get completion events and run callbacks
405 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
411 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
413 IO_STATUS_BLOCK iosb
;
414 NTSTATUS res
= NtRemoveIoCompletion( cport
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
417 ERR("NtRemoveIoCompletion failed: 0x%x\n", res
);
421 DWORD transferred
= 0;
424 if (iosb
.u
.Status
== STATUS_SUCCESS
)
425 transferred
= iosb
.Information
;
427 err
= RtlNtStatusToDosError(iosb
.u
.Status
);
429 callback( err
, transferred
, overlapped
);
435 /***********************************************************************
436 * RtlSetIoCompletionCallback (NTDLL.@)
438 * Binds a handle to a thread pool's completion port, and possibly
439 * starts a non-I/O thread to monitor this port and call functions back.
442 * FileHandle [I] Handle to bind to a completion port.
443 * Function [I] Callback function to call on I/O completions.
444 * Flags [I] Not used.
447 * Success: STATUS_SUCCESS.
448 * Failure: Any NTSTATUS code.
451 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
453 IO_STATUS_BLOCK iosb
;
454 FILE_COMPLETION_INFORMATION info
;
456 if (Flags
) FIXME("Unknown value Flags=0x%x\n", Flags
);
458 if (!old_threadpool
.compl_port
)
460 NTSTATUS res
= STATUS_SUCCESS
;
462 RtlEnterCriticalSection(&old_threadpool
.threadpool_compl_cs
);
463 if (!old_threadpool
.compl_port
)
467 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
470 /* FIXME native can start additional threads in case of e.g. hung callback function. */
471 res
= RtlQueueWorkItem( iocp_poller
, cport
, WT_EXECUTEDEFAULT
);
473 old_threadpool
.compl_port
= cport
;
478 RtlLeaveCriticalSection(&old_threadpool
.threadpool_compl_cs
);
482 info
.CompletionPort
= old_threadpool
.compl_port
;
483 info
.CompletionKey
= (ULONG_PTR
)Function
;
485 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
488 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
490 if (timeout
== INFINITE
) return NULL
;
491 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
495 static void delete_wait_work_item(struct wait_work_item
*wait_work_item
)
497 NtClose( wait_work_item
->CancelEvent
);
498 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
501 static DWORD CALLBACK
wait_thread_proc(LPVOID Arg
)
503 struct wait_work_item
*wait_work_item
= Arg
;
505 BOOLEAN alertable
= (wait_work_item
->Flags
& WT_EXECUTEINIOTHREAD
) != 0;
506 HANDLE handles
[2] = { wait_work_item
->Object
, wait_work_item
->CancelEvent
};
507 LARGE_INTEGER timeout
;
508 HANDLE completion_event
;
514 status
= NtWaitForMultipleObjects( 2, handles
, TRUE
, alertable
,
515 get_nt_timeout( &timeout
, wait_work_item
->Milliseconds
) );
516 if (status
== STATUS_WAIT_0
|| status
== STATUS_TIMEOUT
)
518 BOOLEAN TimerOrWaitFired
;
520 if (status
== STATUS_WAIT_0
)
522 TRACE( "object %p signaled, calling callback %p with context %p\n",
523 wait_work_item
->Object
, wait_work_item
->Callback
,
524 wait_work_item
->Context
);
525 TimerOrWaitFired
= FALSE
;
529 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
530 wait_work_item
->Object
, wait_work_item
->Callback
,
531 wait_work_item
->Context
);
532 TimerOrWaitFired
= TRUE
;
534 interlocked_xchg( &wait_work_item
->CallbackInProgress
, TRUE
);
535 if (wait_work_item
->CompletionEvent
)
537 TRACE( "Work has been canceled.\n" );
540 wait_work_item
->Callback( wait_work_item
->Context
, TimerOrWaitFired
);
541 interlocked_xchg( &wait_work_item
->CallbackInProgress
, FALSE
);
543 if (wait_work_item
->Flags
& WT_EXECUTEONLYONCE
)
546 else if (status
!= STATUS_USER_APC
)
551 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
553 completion_event
= wait_work_item
->CompletionEvent
;
554 delete_wait_work_item( wait_work_item
);
555 if (completion_event
&& completion_event
!= INVALID_HANDLE_VALUE
)
556 NtSetEvent( completion_event
, NULL
);
562 /***********************************************************************
563 * RtlRegisterWait (NTDLL.@)
565 * Registers a wait for a handle to become signaled.
568 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
569 * Object [I] Object to wait to become signaled.
570 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
571 * Context [I] Context to pass to the callback function when it is executed.
572 * Milliseconds [I] Number of milliseconds to wait before timing out.
573 * Flags [I] Flags. See notes.
576 * Success: STATUS_SUCCESS.
577 * Failure: Any NTSTATUS code.
580 * Flags can be one or more of the following:
581 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
582 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
583 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
584 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
585 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
587 NTSTATUS WINAPI
RtlRegisterWait(PHANDLE NewWaitObject
, HANDLE Object
,
588 RTL_WAITORTIMERCALLBACKFUNC Callback
,
589 PVOID Context
, ULONG Milliseconds
, ULONG Flags
)
591 struct wait_work_item
*wait_work_item
;
594 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject
, Object
, Callback
, Context
, Milliseconds
, Flags
);
596 wait_work_item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item
) );
598 return STATUS_NO_MEMORY
;
600 wait_work_item
->Object
= Object
;
601 wait_work_item
->Callback
= Callback
;
602 wait_work_item
->Context
= Context
;
603 wait_work_item
->Milliseconds
= Milliseconds
;
604 wait_work_item
->Flags
= Flags
;
605 wait_work_item
->CallbackInProgress
= FALSE
;
606 wait_work_item
->DeleteCount
= 0;
607 wait_work_item
->CompletionEvent
= NULL
;
609 status
= NtCreateEvent( &wait_work_item
->CancelEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
610 if (status
!= STATUS_SUCCESS
)
612 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
616 Flags
= Flags
& (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
|
617 WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
);
618 status
= RtlQueueWorkItem( wait_thread_proc
, wait_work_item
, Flags
);
619 if (status
!= STATUS_SUCCESS
)
621 delete_wait_work_item( wait_work_item
);
625 *NewWaitObject
= wait_work_item
;
629 /***********************************************************************
630 * RtlDeregisterWaitEx (NTDLL.@)
632 * Cancels a wait operation and frees the resources associated with calling
636 * WaitObject [I] Handle to the wait object to free.
639 * Success: STATUS_SUCCESS.
640 * Failure: Any NTSTATUS code.
642 NTSTATUS WINAPI
RtlDeregisterWaitEx(HANDLE WaitHandle
, HANDLE CompletionEvent
)
644 struct wait_work_item
*wait_work_item
= WaitHandle
;
646 HANDLE LocalEvent
= NULL
;
647 int CallbackInProgress
;
649 TRACE( "(%p %p)\n", WaitHandle
, CompletionEvent
);
651 if (WaitHandle
== NULL
)
652 return STATUS_INVALID_HANDLE
;
654 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, INVALID_HANDLE_VALUE
);
655 CallbackInProgress
= wait_work_item
->CallbackInProgress
;
656 TRACE( "callback in progress %u\n", CallbackInProgress
);
657 if (CompletionEvent
== INVALID_HANDLE_VALUE
|| !CallbackInProgress
)
659 status
= NtCreateEvent( &LocalEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
660 if (status
!= STATUS_SUCCESS
)
662 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, LocalEvent
);
664 else if (CompletionEvent
!= NULL
)
666 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, CompletionEvent
);
669 NtSetEvent( wait_work_item
->CancelEvent
, NULL
);
671 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
673 status
= STATUS_SUCCESS
;
674 delete_wait_work_item( wait_work_item
);
678 TRACE( "Waiting for completion event\n" );
679 NtWaitForSingleObject( LocalEvent
, FALSE
, NULL
);
680 status
= STATUS_SUCCESS
;
684 status
= STATUS_PENDING
;
688 NtClose( LocalEvent
);
693 /***********************************************************************
694 * RtlDeregisterWait (NTDLL.@)
696 * Cancels a wait operation and frees the resources associated with calling
700 * WaitObject [I] Handle to the wait object to free.
703 * Success: STATUS_SUCCESS.
704 * Failure: Any NTSTATUS code.
706 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
708 return RtlDeregisterWaitEx(WaitHandle
, NULL
);
712 /************************** Timer Queue Impl **************************/
714 static void queue_remove_timer(struct queue_timer
*t
)
716 /* We MUST hold the queue cs while calling this function. This ensures
717 that we cannot queue another callback for this timer. The runcount
718 being zero makes sure we don't have any already queued. */
719 struct timer_queue
*q
= t
->q
;
721 assert(t
->runcount
== 0);
724 list_remove(&t
->entry
);
726 NtSetEvent(t
->event
, NULL
);
727 RtlFreeHeap(GetProcessHeap(), 0, t
);
729 if (q
->quit
&& list_empty(&q
->timers
))
730 NtSetEvent(q
->event
, NULL
);
733 static void timer_cleanup_callback(struct queue_timer
*t
)
735 struct timer_queue
*q
= t
->q
;
736 RtlEnterCriticalSection(&q
->cs
);
738 assert(0 < t
->runcount
);
741 if (t
->destroy
&& t
->runcount
== 0)
742 queue_remove_timer(t
);
744 RtlLeaveCriticalSection(&q
->cs
);
747 static DWORD WINAPI
timer_callback_wrapper(LPVOID p
)
749 struct queue_timer
*t
= p
;
750 t
->callback(t
->param
, TRUE
);
751 timer_cleanup_callback(t
);
755 static inline ULONGLONG
queue_current_time(void)
757 LARGE_INTEGER now
, freq
;
758 NtQueryPerformanceCounter(&now
, &freq
);
759 return now
.QuadPart
* 1000 / freq
.QuadPart
;
762 static void queue_add_timer(struct queue_timer
*t
, ULONGLONG time
,
765 /* We MUST hold the queue cs while calling this function. */
766 struct timer_queue
*q
= t
->q
;
767 struct list
*ptr
= &q
->timers
;
769 assert(!q
->quit
|| (t
->destroy
&& time
== EXPIRE_NEVER
));
771 if (time
!= EXPIRE_NEVER
)
772 LIST_FOR_EACH(ptr
, &q
->timers
)
774 struct queue_timer
*cur
= LIST_ENTRY(ptr
, struct queue_timer
, entry
);
775 if (time
< cur
->expire
)
778 list_add_before(ptr
, &t
->entry
);
782 /* If we insert at the head of the list, we need to expire sooner
784 if (set_event
&& &t
->entry
== list_head(&q
->timers
))
785 NtSetEvent(q
->event
, NULL
);
788 static inline void queue_move_timer(struct queue_timer
*t
, ULONGLONG time
,
791 /* We MUST hold the queue cs while calling this function. */
792 list_remove(&t
->entry
);
793 queue_add_timer(t
, time
, set_event
);
796 static void queue_timer_expire(struct timer_queue
*q
)
798 struct queue_timer
*t
= NULL
;
800 RtlEnterCriticalSection(&q
->cs
);
801 if (list_head(&q
->timers
))
804 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
805 if (!t
->destroy
&& t
->expire
<= ((now
= queue_current_time())))
810 next
= t
->expire
+ t
->period
;
811 /* avoid trigger cascade if overloaded / hibernated */
813 next
= now
+ t
->period
;
817 queue_move_timer(t
, next
, FALSE
);
822 RtlLeaveCriticalSection(&q
->cs
);
826 if (t
->flags
& WT_EXECUTEINTIMERTHREAD
)
827 timer_callback_wrapper(t
);
832 & (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
833 | WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
));
834 NTSTATUS status
= RtlQueueWorkItem(timer_callback_wrapper
, t
, flags
);
835 if (status
!= STATUS_SUCCESS
)
836 timer_cleanup_callback(t
);
841 static ULONG
queue_get_timeout(struct timer_queue
*q
)
843 struct queue_timer
*t
;
844 ULONG timeout
= INFINITE
;
846 RtlEnterCriticalSection(&q
->cs
);
847 if (list_head(&q
->timers
))
849 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
850 assert(!t
->destroy
|| t
->expire
== EXPIRE_NEVER
);
852 if (t
->expire
!= EXPIRE_NEVER
)
854 ULONGLONG time
= queue_current_time();
855 timeout
= t
->expire
< time
? 0 : t
->expire
- time
;
858 RtlLeaveCriticalSection(&q
->cs
);
863 static void WINAPI
timer_queue_thread_proc(LPVOID p
)
865 struct timer_queue
*q
= p
;
868 timeout_ms
= INFINITE
;
871 LARGE_INTEGER timeout
;
875 status
= NtWaitForSingleObject(
876 q
->event
, FALSE
, get_nt_timeout(&timeout
, timeout_ms
));
878 if (status
== STATUS_WAIT_0
)
880 /* There are two possible ways to trigger the event. Either
881 we are quitting and the last timer got removed, or a new
882 timer got put at the head of the list so we need to adjust
884 RtlEnterCriticalSection(&q
->cs
);
885 if (q
->quit
&& list_empty(&q
->timers
))
887 RtlLeaveCriticalSection(&q
->cs
);
889 else if (status
== STATUS_TIMEOUT
)
890 queue_timer_expire(q
);
895 timeout_ms
= queue_get_timeout(q
);
899 RtlDeleteCriticalSection(&q
->cs
);
901 RtlFreeHeap(GetProcessHeap(), 0, q
);
902 RtlExitUserThread( 0 );
905 static void queue_destroy_timer(struct queue_timer
*t
)
907 /* We MUST hold the queue cs while calling this function. */
909 if (t
->runcount
== 0)
910 /* Ensure a timer is promptly removed. If callbacks are pending,
911 it will be removed after the last one finishes by the callback
913 queue_remove_timer(t
);
915 /* Make sure no destroyed timer masks an active timer at the head
916 of the sorted list. */
917 queue_move_timer(t
, EXPIRE_NEVER
, FALSE
);
920 /***********************************************************************
921 * RtlCreateTimerQueue (NTDLL.@)
923 * Creates a timer queue object and returns a handle to it.
926 * NewTimerQueue [O] The newly created queue.
929 * Success: STATUS_SUCCESS.
930 * Failure: Any NTSTATUS code.
932 NTSTATUS WINAPI
RtlCreateTimerQueue(PHANDLE NewTimerQueue
)
935 struct timer_queue
*q
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q
);
937 return STATUS_NO_MEMORY
;
939 RtlInitializeCriticalSection(&q
->cs
);
940 list_init(&q
->timers
);
942 q
->magic
= TIMER_QUEUE_MAGIC
;
943 status
= NtCreateEvent(&q
->event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
944 if (status
!= STATUS_SUCCESS
)
946 RtlFreeHeap(GetProcessHeap(), 0, q
);
949 status
= RtlCreateUserThread(GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
950 timer_queue_thread_proc
, q
, &q
->thread
, NULL
);
951 if (status
!= STATUS_SUCCESS
)
954 RtlFreeHeap(GetProcessHeap(), 0, q
);
959 return STATUS_SUCCESS
;
962 /***********************************************************************
963 * RtlDeleteTimerQueueEx (NTDLL.@)
965 * Deletes a timer queue object.
968 * TimerQueue [I] The timer queue to destroy.
969 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
970 * wait until all timers are finished firing before
971 * returning. Otherwise, return immediately and set the
972 * event when all timers are done.
975 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
976 * Failure: Any NTSTATUS code.
978 NTSTATUS WINAPI
RtlDeleteTimerQueueEx(HANDLE TimerQueue
, HANDLE CompletionEvent
)
980 struct timer_queue
*q
= TimerQueue
;
981 struct queue_timer
*t
, *temp
;
985 if (!q
|| q
->magic
!= TIMER_QUEUE_MAGIC
)
986 return STATUS_INVALID_HANDLE
;
990 RtlEnterCriticalSection(&q
->cs
);
992 if (list_head(&q
->timers
))
993 /* When the last timer is removed, it will signal the timer thread to
995 LIST_FOR_EACH_ENTRY_SAFE(t
, temp
, &q
->timers
, struct queue_timer
, entry
)
996 queue_destroy_timer(t
);
998 /* However if we have none, we must do it ourselves. */
999 NtSetEvent(q
->event
, NULL
);
1000 RtlLeaveCriticalSection(&q
->cs
);
1002 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1004 NtWaitForSingleObject(thread
, FALSE
, NULL
);
1005 status
= STATUS_SUCCESS
;
1009 if (CompletionEvent
)
1011 FIXME("asynchronous return on completion event unimplemented\n");
1012 NtWaitForSingleObject(thread
, FALSE
, NULL
);
1013 NtSetEvent(CompletionEvent
, NULL
);
1015 status
= STATUS_PENDING
;
1022 static struct timer_queue
*get_timer_queue(HANDLE TimerQueue
)
1024 static struct timer_queue
*default_timer_queue
;
1030 if (!default_timer_queue
)
1033 NTSTATUS status
= RtlCreateTimerQueue(&q
);
1034 if (status
== STATUS_SUCCESS
)
1036 PVOID p
= interlocked_cmpxchg_ptr(
1037 (void **) &default_timer_queue
, q
, NULL
);
1039 /* Got beat to the punch. */
1040 RtlDeleteTimerQueueEx(q
, NULL
);
1043 return default_timer_queue
;
1047 /***********************************************************************
1048 * RtlCreateTimer (NTDLL.@)
1050 * Creates a new timer associated with the given queue.
1053 * NewTimer [O] The newly created timer.
1054 * TimerQueue [I] The queue to hold the timer.
1055 * Callback [I] The callback to fire.
1056 * Parameter [I] The argument for the callback.
1057 * DueTime [I] The delay, in milliseconds, before first firing the
1059 * Period [I] The period, in milliseconds, at which to fire the timer
1060 * after the first callback. If zero, the timer will only
1061 * fire once. It still needs to be deleted with
1063 * Flags [I] Flags controlling the execution of the callback. In
1064 * addition to the WT_* thread pool flags (see
1065 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1066 * WT_EXECUTEONLYONCE are supported.
1069 * Success: STATUS_SUCCESS.
1070 * Failure: Any NTSTATUS code.
1072 NTSTATUS WINAPI
RtlCreateTimer(PHANDLE NewTimer
, HANDLE TimerQueue
,
1073 RTL_WAITORTIMERCALLBACKFUNC Callback
,
1074 PVOID Parameter
, DWORD DueTime
, DWORD Period
,
1078 struct queue_timer
*t
;
1079 struct timer_queue
*q
= get_timer_queue(TimerQueue
);
1081 if (!q
) return STATUS_NO_MEMORY
;
1082 if (q
->magic
!= TIMER_QUEUE_MAGIC
) return STATUS_INVALID_HANDLE
;
1084 t
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t
);
1086 return STATUS_NO_MEMORY
;
1090 t
->callback
= Callback
;
1091 t
->param
= Parameter
;
1097 status
= STATUS_SUCCESS
;
1098 RtlEnterCriticalSection(&q
->cs
);
1100 status
= STATUS_INVALID_HANDLE
;
1102 queue_add_timer(t
, queue_current_time() + DueTime
, TRUE
);
1103 RtlLeaveCriticalSection(&q
->cs
);
1105 if (status
== STATUS_SUCCESS
)
1108 RtlFreeHeap(GetProcessHeap(), 0, t
);
1113 /***********************************************************************
1114 * RtlUpdateTimer (NTDLL.@)
1116 * Changes the time at which a timer expires.
1119 * TimerQueue [I] The queue that holds the timer.
1120 * Timer [I] The timer to update.
1121 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1122 * Period [I] The period, in milliseconds, at which to fire the timer
1123 * after the first callback. If zero, the timer will not
1124 * refire once. It still needs to be deleted with
1128 * Success: STATUS_SUCCESS.
1129 * Failure: Any NTSTATUS code.
1131 NTSTATUS WINAPI
RtlUpdateTimer(HANDLE TimerQueue
, HANDLE Timer
,
1132 DWORD DueTime
, DWORD Period
)
1134 struct queue_timer
*t
= Timer
;
1135 struct timer_queue
*q
= t
->q
;
1137 RtlEnterCriticalSection(&q
->cs
);
1138 /* Can't change a timer if it was once-only or destroyed. */
1139 if (t
->expire
!= EXPIRE_NEVER
)
1142 queue_move_timer(t
, queue_current_time() + DueTime
, TRUE
);
1144 RtlLeaveCriticalSection(&q
->cs
);
1146 return STATUS_SUCCESS
;
1149 /***********************************************************************
1150 * RtlDeleteTimer (NTDLL.@)
1152 * Cancels a timer-queue timer.
1155 * TimerQueue [I] The queue that holds the timer.
1156 * Timer [I] The timer to update.
1157 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1158 * wait until the timer is finished firing all pending
1159 * callbacks before returning. Otherwise, return
1160 * immediately and set the timer is done.
1163 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1164 or if the completion event is NULL.
1165 * Failure: Any NTSTATUS code.
1167 NTSTATUS WINAPI
RtlDeleteTimer(HANDLE TimerQueue
, HANDLE Timer
,
1168 HANDLE CompletionEvent
)
1170 struct queue_timer
*t
= Timer
;
1171 struct timer_queue
*q
;
1172 NTSTATUS status
= STATUS_PENDING
;
1173 HANDLE event
= NULL
;
1176 return STATUS_INVALID_PARAMETER_1
;
1178 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1180 status
= NtCreateEvent(&event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1181 if (status
== STATUS_SUCCESS
)
1182 status
= STATUS_PENDING
;
1184 else if (CompletionEvent
)
1185 event
= CompletionEvent
;
1187 RtlEnterCriticalSection(&q
->cs
);
1189 if (t
->runcount
== 0 && event
)
1190 status
= STATUS_SUCCESS
;
1191 queue_destroy_timer(t
);
1192 RtlLeaveCriticalSection(&q
->cs
);
1194 if (CompletionEvent
== INVALID_HANDLE_VALUE
&& event
)
1196 if (status
== STATUS_PENDING
)
1198 NtWaitForSingleObject(event
, FALSE
, NULL
);
1199 status
= STATUS_SUCCESS
;
1207 /***********************************************************************
1208 * timerqueue_thread_proc (internal)
1210 static void CALLBACK
timerqueue_thread_proc( void *param
)
1212 ULONGLONG timeout_lower
, timeout_upper
, new_timeout
;
1213 struct threadpool_object
*other_timer
;
1214 LARGE_INTEGER now
, timeout
;
1217 TRACE( "starting timer queue thread\n" );
1219 RtlEnterCriticalSection( &timerqueue
.cs
);
1222 NtQuerySystemTime( &now
);
1224 /* Check for expired timers. */
1225 while ((ptr
= list_head( &timerqueue
.pending_timers
)))
1227 struct threadpool_object
*timer
= LIST_ENTRY( ptr
, struct threadpool_object
, u
.timer
.timer_entry
);
1228 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1229 assert( timer
->u
.timer
.timer_pending
);
1230 if (timer
->u
.timer
.timeout
> now
.QuadPart
)
1233 /* Queue a new callback in one of the worker threads. */
1234 list_remove( &timer
->u
.timer
.timer_entry
);
1235 timer
->u
.timer
.timer_pending
= FALSE
;
1236 tp_object_submit( timer
, FALSE
);
1238 /* Insert the timer back into the queue, except it's marked for shutdown. */
1239 if (timer
->u
.timer
.period
&& !timer
->shutdown
)
1241 timer
->u
.timer
.timeout
+= (ULONGLONG
)timer
->u
.timer
.period
* 10000;
1242 if (timer
->u
.timer
.timeout
<= now
.QuadPart
)
1243 timer
->u
.timer
.timeout
= now
.QuadPart
+ 1;
1245 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1246 struct threadpool_object
, u
.timer
.timer_entry
)
1248 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1249 if (timer
->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
1252 list_add_before( &other_timer
->u
.timer
.timer_entry
, &timer
->u
.timer
.timer_entry
);
1253 timer
->u
.timer
.timer_pending
= TRUE
;
1257 timeout_lower
= TIMEOUT_INFINITE
;
1258 timeout_upper
= TIMEOUT_INFINITE
;
1260 /* Determine next timeout and use the window length to optimize wakeup times. */
1261 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1262 struct threadpool_object
, u
.timer
.timer_entry
)
1264 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1265 if (other_timer
->u
.timer
.timeout
>= timeout_upper
)
1268 timeout_lower
= other_timer
->u
.timer
.timeout
;
1269 new_timeout
= timeout_lower
+ (ULONGLONG
)other_timer
->u
.timer
.window_length
* 10000;
1270 if (new_timeout
< timeout_upper
)
1271 timeout_upper
= new_timeout
;
1274 /* Wait for timer update events or until the next timer expires. */
1275 if (timerqueue
.objcount
)
1277 timeout
.QuadPart
= timeout_lower
;
1278 RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
, &timeout
);
1282 /* All timers have been destroyed, if no new timers are created
1283 * within some amount of time, then we can shutdown this thread. */
1284 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1285 if (RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
,
1286 &timeout
) == STATUS_TIMEOUT
&& !timerqueue
.objcount
)
1292 timerqueue
.thread_running
= FALSE
;
1293 RtlLeaveCriticalSection( &timerqueue
.cs
);
1295 TRACE( "terminating timer queue thread\n" );
1296 RtlExitUserThread( 0 );
1299 /***********************************************************************
1300 * tp_new_worker_thread (internal)
1302 * Create and account a new worker thread for the desired pool.
1304 static NTSTATUS
tp_new_worker_thread( struct threadpool
*pool
)
1309 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1310 threadpool_worker_proc
, pool
, &thread
, NULL
);
1311 if (status
== STATUS_SUCCESS
)
1313 interlocked_inc( &pool
->refcount
);
1314 pool
->num_workers
++;
1315 pool
->num_busy_workers
++;
1321 /***********************************************************************
1322 * tp_timerqueue_lock (internal)
1324 * Acquires a lock on the global timerqueue. When the lock is acquired
1325 * successfully, it is guaranteed that the timer thread is running.
1327 static NTSTATUS
tp_timerqueue_lock( struct threadpool_object
*timer
)
1329 NTSTATUS status
= STATUS_SUCCESS
;
1330 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1332 timer
->u
.timer
.timer_initialized
= FALSE
;
1333 timer
->u
.timer
.timer_pending
= FALSE
;
1334 timer
->u
.timer
.timer_set
= FALSE
;
1335 timer
->u
.timer
.timeout
= 0;
1336 timer
->u
.timer
.period
= 0;
1337 timer
->u
.timer
.window_length
= 0;
1339 RtlEnterCriticalSection( &timerqueue
.cs
);
1341 /* Make sure that the timerqueue thread is running. */
1342 if (!timerqueue
.thread_running
)
1345 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1346 timerqueue_thread_proc
, NULL
, &thread
, NULL
);
1347 if (status
== STATUS_SUCCESS
)
1349 timerqueue
.thread_running
= TRUE
;
1354 if (status
== STATUS_SUCCESS
)
1356 timer
->u
.timer
.timer_initialized
= TRUE
;
1357 timerqueue
.objcount
++;
1360 RtlLeaveCriticalSection( &timerqueue
.cs
);
1364 /***********************************************************************
1365 * tp_timerqueue_unlock (internal)
1367 * Releases a lock on the global timerqueue.
1369 static void tp_timerqueue_unlock( struct threadpool_object
*timer
)
1371 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1373 RtlEnterCriticalSection( &timerqueue
.cs
);
1374 if (timer
->u
.timer
.timer_initialized
)
1376 /* If timer was pending, remove it. */
1377 if (timer
->u
.timer
.timer_pending
)
1379 list_remove( &timer
->u
.timer
.timer_entry
);
1380 timer
->u
.timer
.timer_pending
= FALSE
;
1383 /* If the last timer object was destroyed, then wake up the thread. */
1384 if (!--timerqueue
.objcount
)
1386 assert( list_empty( &timerqueue
.pending_timers
) );
1387 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
1390 timer
->u
.timer
.timer_initialized
= FALSE
;
1392 RtlLeaveCriticalSection( &timerqueue
.cs
);
1395 /***********************************************************************
1396 * waitqueue_thread_proc (internal)
1398 static void CALLBACK
waitqueue_thread_proc( void *param
)
1400 struct threadpool_object
*objects
[MAXIMUM_WAITQUEUE_OBJECTS
];
1401 HANDLE handles
[MAXIMUM_WAITQUEUE_OBJECTS
+ 1];
1402 struct waitqueue_bucket
*bucket
= param
;
1403 struct threadpool_object
*wait
, *next
;
1404 LARGE_INTEGER now
, timeout
;
1408 TRACE( "starting wait queue thread\n" );
1410 RtlEnterCriticalSection( &waitqueue
.cs
);
1414 NtQuerySystemTime( &now
);
1415 timeout
.QuadPart
= TIMEOUT_INFINITE
;
1418 LIST_FOR_EACH_ENTRY_SAFE( wait
, next
, &bucket
->waiting
, struct threadpool_object
,
1421 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1422 if (wait
->u
.wait
.timeout
<= now
.QuadPart
)
1424 /* Wait object timed out. */
1425 list_remove( &wait
->u
.wait
.wait_entry
);
1426 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1427 tp_object_submit( wait
, FALSE
);
1431 if (wait
->u
.wait
.timeout
< timeout
.QuadPart
)
1432 timeout
.QuadPart
= wait
->u
.wait
.timeout
;
1434 assert( num_handles
< MAXIMUM_WAITQUEUE_OBJECTS
);
1435 interlocked_inc( &wait
->refcount
);
1436 objects
[num_handles
] = wait
;
1437 handles
[num_handles
] = wait
->u
.wait
.handle
;
1442 if (!bucket
->objcount
)
1444 /* All wait objects have been destroyed, if no new wait objects are created
1445 * within some amount of time, then we can shutdown this thread. */
1446 assert( num_handles
== 0 );
1447 RtlLeaveCriticalSection( &waitqueue
.cs
);
1448 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1449 status
= NtWaitForMultipleObjects( 1, &bucket
->update_event
, TRUE
, FALSE
, &timeout
);
1450 RtlEnterCriticalSection( &waitqueue
.cs
);
1452 if (status
== STATUS_TIMEOUT
&& !bucket
->objcount
)
1457 handles
[num_handles
] = bucket
->update_event
;
1458 RtlLeaveCriticalSection( &waitqueue
.cs
);
1459 status
= NtWaitForMultipleObjects( num_handles
+ 1, handles
, TRUE
, FALSE
, &timeout
);
1460 RtlEnterCriticalSection( &waitqueue
.cs
);
1462 if (status
>= STATUS_WAIT_0
&& status
< STATUS_WAIT_0
+ num_handles
)
1464 wait
= objects
[status
- STATUS_WAIT_0
];
1465 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1466 if (wait
->u
.wait
.bucket
)
1468 /* Wait object signaled. */
1469 assert( wait
->u
.wait
.bucket
== bucket
);
1470 list_remove( &wait
->u
.wait
.wait_entry
);
1471 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1472 tp_object_submit( wait
, TRUE
);
1475 WARN("wait object %p triggered while object was destroyed\n", wait
);
1478 /* Release temporary references to wait objects. */
1481 wait
= objects
[--num_handles
];
1482 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1483 tp_object_release( wait
);
1487 /* Try to merge bucket with other threads. */
1488 if (waitqueue
.num_buckets
> 1 && bucket
->objcount
&&
1489 bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 1 / 3)
1491 struct waitqueue_bucket
*other_bucket
;
1492 LIST_FOR_EACH_ENTRY( other_bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1494 if (other_bucket
!= bucket
&& other_bucket
->objcount
&&
1495 other_bucket
->objcount
+ bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 2 / 3)
1497 other_bucket
->objcount
+= bucket
->objcount
;
1498 bucket
->objcount
= 0;
1500 /* Update reserved list. */
1501 LIST_FOR_EACH_ENTRY( wait
, &bucket
->reserved
, struct threadpool_object
, u
.wait
.wait_entry
)
1503 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1504 wait
->u
.wait
.bucket
= other_bucket
;
1506 list_move_tail( &other_bucket
->reserved
, &bucket
->reserved
);
1508 /* Update waiting list. */
1509 LIST_FOR_EACH_ENTRY( wait
, &bucket
->waiting
, struct threadpool_object
, u
.wait
.wait_entry
)
1511 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1512 wait
->u
.wait
.bucket
= other_bucket
;
1514 list_move_tail( &other_bucket
->waiting
, &bucket
->waiting
);
1516 /* Move bucket to the end, to keep the probability of
1517 * newly added wait objects as small as possible. */
1518 list_remove( &bucket
->bucket_entry
);
1519 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1521 NtSetEvent( other_bucket
->update_event
, NULL
);
1528 /* Remove this bucket from the list. */
1529 list_remove( &bucket
->bucket_entry
);
1530 if (!--waitqueue
.num_buckets
)
1531 assert( list_empty( &waitqueue
.buckets
) );
1533 RtlLeaveCriticalSection( &waitqueue
.cs
);
1535 TRACE( "terminating wait queue thread\n" );
1537 assert( bucket
->objcount
== 0 );
1538 assert( list_empty( &bucket
->reserved
) );
1539 assert( list_empty( &bucket
->waiting
) );
1540 NtClose( bucket
->update_event
);
1542 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1543 RtlExitUserThread( 0 );
1546 /***********************************************************************
1547 * tp_waitqueue_lock (internal)
1549 static NTSTATUS
tp_waitqueue_lock( struct threadpool_object
*wait
)
1551 struct waitqueue_bucket
*bucket
;
1554 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1556 wait
->u
.wait
.signaled
= 0;
1557 wait
->u
.wait
.bucket
= NULL
;
1558 wait
->u
.wait
.wait_pending
= FALSE
;
1559 wait
->u
.wait
.timeout
= 0;
1560 wait
->u
.wait
.handle
= INVALID_HANDLE_VALUE
;
1562 RtlEnterCriticalSection( &waitqueue
.cs
);
1564 /* Try to assign to existing bucket if possible. */
1565 LIST_FOR_EACH_ENTRY( bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1567 if (bucket
->objcount
< MAXIMUM_WAITQUEUE_OBJECTS
)
1569 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1570 wait
->u
.wait
.bucket
= bucket
;
1573 status
= STATUS_SUCCESS
;
1578 /* Create a new bucket and corresponding worker thread. */
1579 bucket
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket
) );
1582 status
= STATUS_NO_MEMORY
;
1586 bucket
->objcount
= 0;
1587 list_init( &bucket
->reserved
);
1588 list_init( &bucket
->waiting
);
1590 status
= NtCreateEvent( &bucket
->update_event
, EVENT_ALL_ACCESS
,
1591 NULL
, SynchronizationEvent
, FALSE
);
1594 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1598 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1599 waitqueue_thread_proc
, bucket
, &thread
, NULL
);
1600 if (status
== STATUS_SUCCESS
)
1602 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1603 waitqueue
.num_buckets
++;
1605 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1606 wait
->u
.wait
.bucket
= bucket
;
1613 NtClose( bucket
->update_event
);
1614 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1618 RtlLeaveCriticalSection( &waitqueue
.cs
);
1622 /***********************************************************************
1623 * tp_waitqueue_unlock (internal)
1625 static void tp_waitqueue_unlock( struct threadpool_object
*wait
)
1627 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1629 RtlEnterCriticalSection( &waitqueue
.cs
);
1630 if (wait
->u
.wait
.bucket
)
1632 struct waitqueue_bucket
*bucket
= wait
->u
.wait
.bucket
;
1633 assert( bucket
->objcount
> 0 );
1635 list_remove( &wait
->u
.wait
.wait_entry
);
1636 wait
->u
.wait
.bucket
= NULL
;
1639 NtSetEvent( bucket
->update_event
, NULL
);
1641 RtlLeaveCriticalSection( &waitqueue
.cs
);
1644 /***********************************************************************
1645 * tp_threadpool_alloc (internal)
1647 * Allocates a new threadpool object.
1649 static NTSTATUS
tp_threadpool_alloc( struct threadpool
**out
)
1651 struct threadpool
*pool
;
1654 pool
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool
) );
1656 return STATUS_NO_MEMORY
;
1660 pool
->shutdown
= FALSE
;
1662 RtlInitializeCriticalSection( &pool
->cs
);
1663 pool
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool.cs");
1665 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1666 list_init( &pool
->pools
[i
] );
1667 RtlInitializeConditionVariable( &pool
->update_event
);
1669 pool
->max_workers
= 500;
1670 pool
->min_workers
= 0;
1671 pool
->num_workers
= 0;
1672 pool
->num_busy_workers
= 0;
1674 TRACE( "allocated threadpool %p\n", pool
);
1677 return STATUS_SUCCESS
;
1680 /***********************************************************************
1681 * tp_threadpool_shutdown (internal)
1683 * Prepares the shutdown of a threadpool object and notifies all worker
1684 * threads to terminate (after all remaining work items have been
1687 static void tp_threadpool_shutdown( struct threadpool
*pool
)
1689 assert( pool
!= default_threadpool
);
1691 pool
->shutdown
= TRUE
;
1692 RtlWakeAllConditionVariable( &pool
->update_event
);
1695 /***********************************************************************
1696 * tp_threadpool_release (internal)
1698 * Releases a reference to a threadpool object.
1700 static BOOL
tp_threadpool_release( struct threadpool
*pool
)
1704 if (interlocked_dec( &pool
->refcount
))
1707 TRACE( "destroying threadpool %p\n", pool
);
1709 assert( pool
->shutdown
);
1710 assert( !pool
->objcount
);
1711 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1712 assert( list_empty( &pool
->pools
[i
] ) );
1714 pool
->cs
.DebugInfo
->Spare
[0] = 0;
1715 RtlDeleteCriticalSection( &pool
->cs
);
1717 RtlFreeHeap( GetProcessHeap(), 0, pool
);
1721 /***********************************************************************
1722 * tp_threadpool_lock (internal)
1724 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1725 * block. When the lock is acquired successfully, it is guaranteed that
1726 * there is at least one worker thread to process tasks.
1728 static NTSTATUS
tp_threadpool_lock( struct threadpool
**out
, TP_CALLBACK_ENVIRON
*environment
)
1730 struct threadpool
*pool
= NULL
;
1731 NTSTATUS status
= STATUS_SUCCESS
;
1735 /* Validate environment parameters. */
1736 if (environment
->Version
== 3)
1738 TP_CALLBACK_ENVIRON_V3
*environment3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1740 switch (environment3
->CallbackPriority
)
1742 case TP_CALLBACK_PRIORITY_HIGH
:
1743 case TP_CALLBACK_PRIORITY_NORMAL
:
1744 case TP_CALLBACK_PRIORITY_LOW
:
1747 return STATUS_INVALID_PARAMETER
;
1751 pool
= (struct threadpool
*)environment
->Pool
;
1756 if (!default_threadpool
)
1758 status
= tp_threadpool_alloc( &pool
);
1759 if (status
!= STATUS_SUCCESS
)
1762 if (interlocked_cmpxchg_ptr( (void *)&default_threadpool
, pool
, NULL
) != NULL
)
1764 tp_threadpool_shutdown( pool
);
1765 tp_threadpool_release( pool
);
1769 pool
= default_threadpool
;
1772 RtlEnterCriticalSection( &pool
->cs
);
1774 /* Make sure that the threadpool has at least one thread. */
1775 if (!pool
->num_workers
)
1776 status
= tp_new_worker_thread( pool
);
1778 /* Keep a reference, and increment objcount to ensure that the
1779 * last thread doesn't terminate. */
1780 if (status
== STATUS_SUCCESS
)
1782 interlocked_inc( &pool
->refcount
);
1786 RtlLeaveCriticalSection( &pool
->cs
);
1788 if (status
!= STATUS_SUCCESS
)
1792 return STATUS_SUCCESS
;
1795 /***********************************************************************
1796 * tp_threadpool_unlock (internal)
1798 * Releases a lock on a threadpool.
1800 static void tp_threadpool_unlock( struct threadpool
*pool
)
1802 RtlEnterCriticalSection( &pool
->cs
);
1804 RtlLeaveCriticalSection( &pool
->cs
);
1805 tp_threadpool_release( pool
);
1808 /***********************************************************************
1809 * tp_group_alloc (internal)
1811 * Allocates a new threadpool group object.
1813 static NTSTATUS
tp_group_alloc( struct threadpool_group
**out
)
1815 struct threadpool_group
*group
;
1817 group
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group
) );
1819 return STATUS_NO_MEMORY
;
1821 group
->refcount
= 1;
1822 group
->shutdown
= FALSE
;
1824 RtlInitializeCriticalSection( &group
->cs
);
1825 group
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool_group.cs");
1827 list_init( &group
->members
);
1829 TRACE( "allocated group %p\n", group
);
1832 return STATUS_SUCCESS
;
1835 /***********************************************************************
1836 * tp_group_shutdown (internal)
1838 * Marks the group object for shutdown.
1840 static void tp_group_shutdown( struct threadpool_group
*group
)
1842 group
->shutdown
= TRUE
;
1845 /***********************************************************************
1846 * tp_group_release (internal)
1848 * Releases a reference to a group object.
1850 static BOOL
tp_group_release( struct threadpool_group
*group
)
1852 if (interlocked_dec( &group
->refcount
))
1855 TRACE( "destroying group %p\n", group
);
1857 assert( group
->shutdown
);
1858 assert( list_empty( &group
->members
) );
1860 group
->cs
.DebugInfo
->Spare
[0] = 0;
1861 RtlDeleteCriticalSection( &group
->cs
);
1863 RtlFreeHeap( GetProcessHeap(), 0, group
);
1867 /***********************************************************************
1868 * tp_object_initialize (internal)
1870 * Initializes members of a threadpool object.
1872 static void tp_object_initialize( struct threadpool_object
*object
, struct threadpool
*pool
,
1873 PVOID userdata
, TP_CALLBACK_ENVIRON
*environment
)
1875 BOOL is_simple_callback
= (object
->type
== TP_OBJECT_TYPE_SIMPLE
);
1877 object
->refcount
= 1;
1878 object
->shutdown
= FALSE
;
1880 object
->pool
= pool
;
1881 object
->group
= NULL
;
1882 object
->userdata
= userdata
;
1883 object
->group_cancel_callback
= NULL
;
1884 object
->finalization_callback
= NULL
;
1885 object
->may_run_long
= 0;
1886 object
->race_dll
= NULL
;
1887 object
->priority
= TP_CALLBACK_PRIORITY_NORMAL
;
1889 memset( &object
->group_entry
, 0, sizeof(object
->group_entry
) );
1890 object
->is_group_member
= FALSE
;
1892 memset( &object
->pool_entry
, 0, sizeof(object
->pool_entry
) );
1893 RtlInitializeConditionVariable( &object
->finished_event
);
1894 RtlInitializeConditionVariable( &object
->group_finished_event
);
1895 object
->num_pending_callbacks
= 0;
1896 object
->num_running_callbacks
= 0;
1897 object
->num_associated_callbacks
= 0;
1901 if (environment
->Version
!= 1 && environment
->Version
!= 3)
1902 FIXME( "unsupported environment version %u\n", environment
->Version
);
1904 object
->group
= impl_from_TP_CLEANUP_GROUP( environment
->CleanupGroup
);
1905 object
->group_cancel_callback
= environment
->CleanupGroupCancelCallback
;
1906 object
->finalization_callback
= environment
->FinalizationCallback
;
1907 object
->may_run_long
= environment
->u
.s
.LongFunction
!= 0;
1908 object
->race_dll
= environment
->RaceDll
;
1909 if (environment
->Version
== 3)
1911 TP_CALLBACK_ENVIRON_V3
*environment_v3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1913 object
->priority
= environment_v3
->CallbackPriority
;
1914 assert( object
->priority
< ARRAY_SIZE(pool
->pools
) );
1917 if (environment
->ActivationContext
)
1918 FIXME( "activation context not supported yet\n" );
1920 if (environment
->u
.s
.Persistent
)
1921 FIXME( "persistent threads not supported yet\n" );
1924 if (object
->race_dll
)
1925 LdrAddRefDll( 0, object
->race_dll
);
1927 TRACE( "allocated object %p of type %u\n", object
, object
->type
);
1929 /* For simple callbacks we have to run tp_object_submit before adding this object
1930 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1931 * will be set, and tp_object_submit would fail with an assertion. */
1933 if (is_simple_callback
)
1934 tp_object_submit( object
, FALSE
);
1938 struct threadpool_group
*group
= object
->group
;
1939 interlocked_inc( &group
->refcount
);
1941 RtlEnterCriticalSection( &group
->cs
);
1942 list_add_tail( &group
->members
, &object
->group_entry
);
1943 object
->is_group_member
= TRUE
;
1944 RtlLeaveCriticalSection( &group
->cs
);
1947 if (is_simple_callback
)
1948 tp_object_release( object
);
1951 static void tp_object_prio_queue( struct threadpool_object
*object
)
1953 list_add_tail( &object
->pool
->pools
[object
->priority
], &object
->pool_entry
);
1956 /***********************************************************************
1957 * tp_object_submit (internal)
1959 * Submits a threadpool object to the associated threadpool. This
1960 * function has to be VOID because TpPostWork can never fail on Windows.
1962 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
)
1964 struct threadpool
*pool
= object
->pool
;
1965 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
1967 assert( !object
->shutdown
);
1968 assert( !pool
->shutdown
);
1970 RtlEnterCriticalSection( &pool
->cs
);
1972 /* Start new worker threads if required. */
1973 if (pool
->num_busy_workers
>= pool
->num_workers
&&
1974 pool
->num_workers
< pool
->max_workers
)
1975 status
= tp_new_worker_thread( pool
);
1977 /* Queue work item and increment refcount. */
1978 interlocked_inc( &object
->refcount
);
1979 if (!object
->num_pending_callbacks
++)
1980 tp_object_prio_queue( object
);
1982 /* Count how often the object was signaled. */
1983 if (object
->type
== TP_OBJECT_TYPE_WAIT
&& signaled
)
1984 object
->u
.wait
.signaled
++;
1986 /* No new thread started - wake up one existing thread. */
1987 if (status
!= STATUS_SUCCESS
)
1989 assert( pool
->num_workers
> 0 );
1990 RtlWakeConditionVariable( &pool
->update_event
);
1993 RtlLeaveCriticalSection( &pool
->cs
);
1996 /***********************************************************************
1997 * tp_object_cancel (internal)
1999 * Cancels all currently pending callbacks for a specific object.
2001 static void tp_object_cancel( struct threadpool_object
*object
)
2003 struct threadpool
*pool
= object
->pool
;
2004 LONG pending_callbacks
= 0;
2006 RtlEnterCriticalSection( &pool
->cs
);
2007 if (object
->num_pending_callbacks
)
2009 pending_callbacks
= object
->num_pending_callbacks
;
2010 object
->num_pending_callbacks
= 0;
2011 list_remove( &object
->pool_entry
);
2013 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2014 object
->u
.wait
.signaled
= 0;
2016 RtlLeaveCriticalSection( &pool
->cs
);
2018 while (pending_callbacks
--)
2019 tp_object_release( object
);
2022 /***********************************************************************
2023 * tp_object_wait (internal)
2025 * Waits until all pending and running callbacks of a specific object
2026 * have been processed.
2028 static void tp_object_wait( struct threadpool_object
*object
, BOOL group_wait
)
2030 struct threadpool
*pool
= object
->pool
;
2032 RtlEnterCriticalSection( &pool
->cs
);
2035 while (object
->num_pending_callbacks
|| object
->num_running_callbacks
)
2036 RtlSleepConditionVariableCS( &object
->group_finished_event
, &pool
->cs
, NULL
);
2040 while (object
->num_pending_callbacks
|| object
->num_associated_callbacks
)
2041 RtlSleepConditionVariableCS( &object
->finished_event
, &pool
->cs
, NULL
);
2043 RtlLeaveCriticalSection( &pool
->cs
);
2046 /***********************************************************************
2047 * tp_object_prepare_shutdown (internal)
2049 * Prepares a threadpool object for shutdown.
2051 static void tp_object_prepare_shutdown( struct threadpool_object
*object
)
2053 if (object
->type
== TP_OBJECT_TYPE_TIMER
)
2054 tp_timerqueue_unlock( object
);
2055 else if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2056 tp_waitqueue_unlock( object
);
2059 /***********************************************************************
2060 * tp_object_release (internal)
2062 * Releases a reference to a threadpool object.
2064 static BOOL
tp_object_release( struct threadpool_object
*object
)
2066 if (interlocked_dec( &object
->refcount
))
2069 TRACE( "destroying object %p of type %u\n", object
, object
->type
);
2071 assert( object
->shutdown
);
2072 assert( !object
->num_pending_callbacks
);
2073 assert( !object
->num_running_callbacks
);
2074 assert( !object
->num_associated_callbacks
);
2076 /* release reference to the group */
2079 struct threadpool_group
*group
= object
->group
;
2081 RtlEnterCriticalSection( &group
->cs
);
2082 if (object
->is_group_member
)
2084 list_remove( &object
->group_entry
);
2085 object
->is_group_member
= FALSE
;
2087 RtlLeaveCriticalSection( &group
->cs
);
2089 tp_group_release( group
);
2092 tp_threadpool_unlock( object
->pool
);
2094 if (object
->race_dll
)
2095 LdrUnloadDll( object
->race_dll
);
2097 RtlFreeHeap( GetProcessHeap(), 0, object
);
2101 static struct list
*threadpool_get_next_item( const struct threadpool
*pool
)
2106 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
2108 if ((ptr
= list_head( &pool
->pools
[i
] )))
2115 /***********************************************************************
2116 * threadpool_worker_proc (internal)
2118 static void CALLBACK
threadpool_worker_proc( void *param
)
2120 TP_CALLBACK_INSTANCE
*callback_instance
;
2121 struct threadpool_instance instance
;
2122 struct threadpool
*pool
= param
;
2123 TP_WAIT_RESULT wait_result
= 0;
2124 LARGE_INTEGER timeout
;
2128 TRACE( "starting worker thread for pool %p\n", pool
);
2130 RtlEnterCriticalSection( &pool
->cs
);
2131 pool
->num_busy_workers
--;
2134 while ((ptr
= threadpool_get_next_item( pool
)))
2136 struct threadpool_object
*object
= LIST_ENTRY( ptr
, struct threadpool_object
, pool_entry
);
2137 assert( object
->num_pending_callbacks
> 0 );
2139 /* If further pending callbacks are queued, move the work item to
2140 * the end of the pool list. Otherwise remove it from the pool. */
2141 list_remove( &object
->pool_entry
);
2142 if (--object
->num_pending_callbacks
)
2143 tp_object_prio_queue( object
);
2145 /* For wait objects check if they were signaled or have timed out. */
2146 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2148 wait_result
= object
->u
.wait
.signaled
? WAIT_OBJECT_0
: WAIT_TIMEOUT
;
2149 if (wait_result
== WAIT_OBJECT_0
) object
->u
.wait
.signaled
--;
2152 /* Leave critical section and do the actual callback. */
2153 object
->num_associated_callbacks
++;
2154 object
->num_running_callbacks
++;
2155 pool
->num_busy_workers
++;
2156 RtlLeaveCriticalSection( &pool
->cs
);
2158 /* Initialize threadpool instance struct. */
2159 callback_instance
= (TP_CALLBACK_INSTANCE
*)&instance
;
2160 instance
.object
= object
;
2161 instance
.threadid
= GetCurrentThreadId();
2162 instance
.associated
= TRUE
;
2163 instance
.may_run_long
= object
->may_run_long
;
2164 instance
.cleanup
.critical_section
= NULL
;
2165 instance
.cleanup
.mutex
= NULL
;
2166 instance
.cleanup
.semaphore
= NULL
;
2167 instance
.cleanup
.semaphore_count
= 0;
2168 instance
.cleanup
.event
= NULL
;
2169 instance
.cleanup
.library
= NULL
;
2171 switch (object
->type
)
2173 case TP_OBJECT_TYPE_SIMPLE
:
2175 TRACE( "executing simple callback %p(%p, %p)\n",
2176 object
->u
.simple
.callback
, callback_instance
, object
->userdata
);
2177 object
->u
.simple
.callback( callback_instance
, object
->userdata
);
2178 TRACE( "callback %p returned\n", object
->u
.simple
.callback
);
2182 case TP_OBJECT_TYPE_WORK
:
2184 TRACE( "executing work callback %p(%p, %p, %p)\n",
2185 object
->u
.work
.callback
, callback_instance
, object
->userdata
, object
);
2186 object
->u
.work
.callback( callback_instance
, object
->userdata
, (TP_WORK
*)object
);
2187 TRACE( "callback %p returned\n", object
->u
.work
.callback
);
2191 case TP_OBJECT_TYPE_TIMER
:
2193 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2194 object
->u
.timer
.callback
, callback_instance
, object
->userdata
, object
);
2195 object
->u
.timer
.callback( callback_instance
, object
->userdata
, (TP_TIMER
*)object
);
2196 TRACE( "callback %p returned\n", object
->u
.timer
.callback
);
2200 case TP_OBJECT_TYPE_WAIT
:
2202 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2203 object
->u
.wait
.callback
, callback_instance
, object
->userdata
, object
, wait_result
);
2204 object
->u
.wait
.callback( callback_instance
, object
->userdata
, (TP_WAIT
*)object
, wait_result
);
2205 TRACE( "callback %p returned\n", object
->u
.wait
.callback
);
2214 /* Execute finalization callback. */
2215 if (object
->finalization_callback
)
2217 TRACE( "executing finalization callback %p(%p, %p)\n",
2218 object
->finalization_callback
, callback_instance
, object
->userdata
);
2219 object
->finalization_callback( callback_instance
, object
->userdata
);
2220 TRACE( "callback %p returned\n", object
->finalization_callback
);
2223 /* Execute cleanup tasks. */
2224 if (instance
.cleanup
.critical_section
)
2226 RtlLeaveCriticalSection( instance
.cleanup
.critical_section
);
2228 if (instance
.cleanup
.mutex
)
2230 status
= NtReleaseMutant( instance
.cleanup
.mutex
, NULL
);
2231 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2233 if (instance
.cleanup
.semaphore
)
2235 status
= NtReleaseSemaphore( instance
.cleanup
.semaphore
, instance
.cleanup
.semaphore_count
, NULL
);
2236 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2238 if (instance
.cleanup
.event
)
2240 status
= NtSetEvent( instance
.cleanup
.event
, NULL
);
2241 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2243 if (instance
.cleanup
.library
)
2245 LdrUnloadDll( instance
.cleanup
.library
);
2249 RtlEnterCriticalSection( &pool
->cs
);
2250 pool
->num_busy_workers
--;
2252 /* Simple callbacks are automatically shutdown after execution. */
2253 if (object
->type
== TP_OBJECT_TYPE_SIMPLE
)
2255 tp_object_prepare_shutdown( object
);
2256 object
->shutdown
= TRUE
;
2259 object
->num_running_callbacks
--;
2260 if (!object
->num_pending_callbacks
&& !object
->num_running_callbacks
)
2261 RtlWakeAllConditionVariable( &object
->group_finished_event
);
2263 if (instance
.associated
)
2265 object
->num_associated_callbacks
--;
2266 if (!object
->num_pending_callbacks
&& !object
->num_associated_callbacks
)
2267 RtlWakeAllConditionVariable( &object
->finished_event
);
2270 tp_object_release( object
);
2273 /* Shutdown worker thread if requested. */
2277 /* Wait for new tasks or until the timeout expires. A thread only terminates
2278 * when no new tasks are available, and the number of threads can be
2279 * decreased without violating the min_workers limit. An exception is when
2280 * min_workers == 0, then objcount is used to detect if the last thread
2281 * can be terminated. */
2282 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
2283 if (RtlSleepConditionVariableCS( &pool
->update_event
, &pool
->cs
, &timeout
) == STATUS_TIMEOUT
&&
2284 !threadpool_get_next_item( pool
) && (pool
->num_workers
> max( pool
->min_workers
, 1 ) ||
2285 (!pool
->min_workers
&& !pool
->objcount
)))
2290 pool
->num_workers
--;
2291 RtlLeaveCriticalSection( &pool
->cs
);
2293 TRACE( "terminating worker thread for pool %p\n", pool
);
2294 tp_threadpool_release( pool
);
2295 RtlExitUserThread( 0 );
2298 /***********************************************************************
2299 * TpAllocCleanupGroup (NTDLL.@)
2301 NTSTATUS WINAPI
TpAllocCleanupGroup( TP_CLEANUP_GROUP
**out
)
2303 TRACE( "%p\n", out
);
2305 return tp_group_alloc( (struct threadpool_group
**)out
);
2308 /***********************************************************************
2309 * TpAllocPool (NTDLL.@)
2311 NTSTATUS WINAPI
TpAllocPool( TP_POOL
**out
, PVOID reserved
)
2313 TRACE( "%p %p\n", out
, reserved
);
2316 FIXME( "reserved argument is nonzero (%p)\n", reserved
);
2318 return tp_threadpool_alloc( (struct threadpool
**)out
);
2321 /***********************************************************************
2322 * TpAllocTimer (NTDLL.@)
2324 NTSTATUS WINAPI
TpAllocTimer( TP_TIMER
**out
, PTP_TIMER_CALLBACK callback
, PVOID userdata
,
2325 TP_CALLBACK_ENVIRON
*environment
)
2327 struct threadpool_object
*object
;
2328 struct threadpool
*pool
;
2331 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2333 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2335 return STATUS_NO_MEMORY
;
2337 status
= tp_threadpool_lock( &pool
, environment
);
2340 RtlFreeHeap( GetProcessHeap(), 0, object
);
2344 object
->type
= TP_OBJECT_TYPE_TIMER
;
2345 object
->u
.timer
.callback
= callback
;
2347 status
= tp_timerqueue_lock( object
);
2350 tp_threadpool_unlock( pool
);
2351 RtlFreeHeap( GetProcessHeap(), 0, object
);
2355 tp_object_initialize( object
, pool
, userdata
, environment
);
2357 *out
= (TP_TIMER
*)object
;
2358 return STATUS_SUCCESS
;
2361 /***********************************************************************
2362 * TpAllocWait (NTDLL.@)
2364 NTSTATUS WINAPI
TpAllocWait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2365 TP_CALLBACK_ENVIRON
*environment
)
2367 struct threadpool_object
*object
;
2368 struct threadpool
*pool
;
2371 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2373 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2375 return STATUS_NO_MEMORY
;
2377 status
= tp_threadpool_lock( &pool
, environment
);
2380 RtlFreeHeap( GetProcessHeap(), 0, object
);
2384 object
->type
= TP_OBJECT_TYPE_WAIT
;
2385 object
->u
.wait
.callback
= callback
;
2387 status
= tp_waitqueue_lock( object
);
2390 tp_threadpool_unlock( pool
);
2391 RtlFreeHeap( GetProcessHeap(), 0, object
);
2395 tp_object_initialize( object
, pool
, userdata
, environment
);
2397 *out
= (TP_WAIT
*)object
;
2398 return STATUS_SUCCESS
;
2401 /***********************************************************************
2402 * TpAllocWork (NTDLL.@)
2404 NTSTATUS WINAPI
TpAllocWork( TP_WORK
**out
, PTP_WORK_CALLBACK callback
, PVOID userdata
,
2405 TP_CALLBACK_ENVIRON
*environment
)
2407 struct threadpool_object
*object
;
2408 struct threadpool
*pool
;
2411 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2413 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2415 return STATUS_NO_MEMORY
;
2417 status
= tp_threadpool_lock( &pool
, environment
);
2420 RtlFreeHeap( GetProcessHeap(), 0, object
);
2424 object
->type
= TP_OBJECT_TYPE_WORK
;
2425 object
->u
.work
.callback
= callback
;
2426 tp_object_initialize( object
, pool
, userdata
, environment
);
2428 *out
= (TP_WORK
*)object
;
2429 return STATUS_SUCCESS
;
2432 /***********************************************************************
2433 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2435 VOID WINAPI
TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE
*instance
, CRITICAL_SECTION
*crit
)
2437 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2439 TRACE( "%p %p\n", instance
, crit
);
2441 if (!this->cleanup
.critical_section
)
2442 this->cleanup
.critical_section
= crit
;
2445 /***********************************************************************
2446 * TpCallbackMayRunLong (NTDLL.@)
2448 NTSTATUS WINAPI
TpCallbackMayRunLong( TP_CALLBACK_INSTANCE
*instance
)
2450 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2451 struct threadpool_object
*object
= this->object
;
2452 struct threadpool
*pool
;
2453 NTSTATUS status
= STATUS_SUCCESS
;
2455 TRACE( "%p\n", instance
);
2457 if (this->threadid
!= GetCurrentThreadId())
2459 ERR("called from wrong thread, ignoring\n");
2460 return STATUS_UNSUCCESSFUL
; /* FIXME */
2463 if (this->may_run_long
)
2464 return STATUS_SUCCESS
;
2466 pool
= object
->pool
;
2467 RtlEnterCriticalSection( &pool
->cs
);
2469 /* Start new worker threads if required. */
2470 if (pool
->num_busy_workers
>= pool
->num_workers
)
2472 if (pool
->num_workers
< pool
->max_workers
)
2474 status
= tp_new_worker_thread( pool
);
2478 status
= STATUS_TOO_MANY_THREADS
;
2482 RtlLeaveCriticalSection( &pool
->cs
);
2483 this->may_run_long
= TRUE
;
2487 /***********************************************************************
2488 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2490 VOID WINAPI
TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE mutex
)
2492 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2494 TRACE( "%p %p\n", instance
, mutex
);
2496 if (!this->cleanup
.mutex
)
2497 this->cleanup
.mutex
= mutex
;
2500 /***********************************************************************
2501 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2503 VOID WINAPI
TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE semaphore
, DWORD count
)
2505 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2507 TRACE( "%p %p %u\n", instance
, semaphore
, count
);
2509 if (!this->cleanup
.semaphore
)
2511 this->cleanup
.semaphore
= semaphore
;
2512 this->cleanup
.semaphore_count
= count
;
2516 /***********************************************************************
2517 * TpCallbackSetEventOnCompletion (NTDLL.@)
2519 VOID WINAPI
TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE event
)
2521 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2523 TRACE( "%p %p\n", instance
, event
);
2525 if (!this->cleanup
.event
)
2526 this->cleanup
.event
= event
;
2529 /***********************************************************************
2530 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2532 VOID WINAPI
TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HMODULE module
)
2534 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2536 TRACE( "%p %p\n", instance
, module
);
2538 if (!this->cleanup
.library
)
2539 this->cleanup
.library
= module
;
2542 /***********************************************************************
2543 * TpDisassociateCallback (NTDLL.@)
2545 VOID WINAPI
TpDisassociateCallback( TP_CALLBACK_INSTANCE
*instance
)
2547 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2548 struct threadpool_object
*object
= this->object
;
2549 struct threadpool
*pool
;
2551 TRACE( "%p\n", instance
);
2553 if (this->threadid
!= GetCurrentThreadId())
2555 ERR("called from wrong thread, ignoring\n");
2559 if (!this->associated
)
2562 pool
= object
->pool
;
2563 RtlEnterCriticalSection( &pool
->cs
);
2565 object
->num_associated_callbacks
--;
2566 if (!object
->num_pending_callbacks
&& !object
->num_associated_callbacks
)
2567 RtlWakeAllConditionVariable( &object
->finished_event
);
2569 RtlLeaveCriticalSection( &pool
->cs
);
2570 this->associated
= FALSE
;
2573 /***********************************************************************
2574 * TpIsTimerSet (NTDLL.@)
2576 BOOL WINAPI
TpIsTimerSet( TP_TIMER
*timer
)
2578 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2580 TRACE( "%p\n", timer
);
2582 return this->u
.timer
.timer_set
;
2585 /***********************************************************************
2586 * TpPostWork (NTDLL.@)
2588 VOID WINAPI
TpPostWork( TP_WORK
*work
)
2590 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2592 TRACE( "%p\n", work
);
2594 tp_object_submit( this, FALSE
);
2597 /***********************************************************************
2598 * TpReleaseCleanupGroup (NTDLL.@)
2600 VOID WINAPI
TpReleaseCleanupGroup( TP_CLEANUP_GROUP
*group
)
2602 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2604 TRACE( "%p\n", group
);
2606 tp_group_shutdown( this );
2607 tp_group_release( this );
2610 /***********************************************************************
2611 * TpReleaseCleanupGroupMembers (NTDLL.@)
2613 VOID WINAPI
TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP
*group
, BOOL cancel_pending
, PVOID userdata
)
2615 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2616 struct threadpool_object
*object
, *next
;
2617 struct list members
;
2619 TRACE( "%p %u %p\n", group
, cancel_pending
, userdata
);
2621 RtlEnterCriticalSection( &this->cs
);
2623 /* Unset group, increase references, and mark objects for shutdown */
2624 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &this->members
, struct threadpool_object
, group_entry
)
2626 assert( object
->group
== this );
2627 assert( object
->is_group_member
);
2629 if (interlocked_inc( &object
->refcount
) == 1)
2631 /* Object is basically already destroyed, but group reference
2632 * was not deleted yet. We can safely ignore this object. */
2633 interlocked_dec( &object
->refcount
);
2634 list_remove( &object
->group_entry
);
2635 object
->is_group_member
= FALSE
;
2639 object
->is_group_member
= FALSE
;
2640 tp_object_prepare_shutdown( object
);
2643 /* Move members to a new temporary list */
2644 list_init( &members
);
2645 list_move_tail( &members
, &this->members
);
2647 RtlLeaveCriticalSection( &this->cs
);
2649 /* Cancel pending callbacks if requested */
2652 LIST_FOR_EACH_ENTRY( object
, &members
, struct threadpool_object
, group_entry
)
2654 tp_object_cancel( object
);
2658 /* Wait for remaining callbacks to finish */
2659 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &members
, struct threadpool_object
, group_entry
)
2661 tp_object_wait( object
, TRUE
);
2663 if (!object
->shutdown
)
2665 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2666 if (cancel_pending
&& object
->group_cancel_callback
)
2668 TRACE( "executing group cancel callback %p(%p, %p)\n",
2669 object
->group_cancel_callback
, object
->userdata
, userdata
);
2670 object
->group_cancel_callback( object
->userdata
, userdata
);
2671 TRACE( "callback %p returned\n", object
->group_cancel_callback
);
2674 if (object
->type
!= TP_OBJECT_TYPE_SIMPLE
)
2675 tp_object_release( object
);
2678 object
->shutdown
= TRUE
;
2679 tp_object_release( object
);
2683 /***********************************************************************
2684 * TpReleasePool (NTDLL.@)
2686 VOID WINAPI
TpReleasePool( TP_POOL
*pool
)
2688 struct threadpool
*this = impl_from_TP_POOL( pool
);
2690 TRACE( "%p\n", pool
);
2692 tp_threadpool_shutdown( this );
2693 tp_threadpool_release( this );
2696 /***********************************************************************
2697 * TpReleaseTimer (NTDLL.@)
2699 VOID WINAPI
TpReleaseTimer( TP_TIMER
*timer
)
2701 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2703 TRACE( "%p\n", timer
);
2705 tp_object_prepare_shutdown( this );
2706 this->shutdown
= TRUE
;
2707 tp_object_release( this );
2710 /***********************************************************************
2711 * TpReleaseWait (NTDLL.@)
2713 VOID WINAPI
TpReleaseWait( TP_WAIT
*wait
)
2715 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2717 TRACE( "%p\n", wait
);
2719 tp_object_prepare_shutdown( this );
2720 this->shutdown
= TRUE
;
2721 tp_object_release( this );
2724 /***********************************************************************
2725 * TpReleaseWork (NTDLL.@)
2727 VOID WINAPI
TpReleaseWork( TP_WORK
*work
)
2729 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2731 TRACE( "%p\n", work
);
2733 tp_object_prepare_shutdown( this );
2734 this->shutdown
= TRUE
;
2735 tp_object_release( this );
2738 /***********************************************************************
2739 * TpSetPoolMaxThreads (NTDLL.@)
2741 VOID WINAPI
TpSetPoolMaxThreads( TP_POOL
*pool
, DWORD maximum
)
2743 struct threadpool
*this = impl_from_TP_POOL( pool
);
2745 TRACE( "%p %u\n", pool
, maximum
);
2747 RtlEnterCriticalSection( &this->cs
);
2748 this->max_workers
= max( maximum
, 1 );
2749 this->min_workers
= min( this->min_workers
, this->max_workers
);
2750 RtlLeaveCriticalSection( &this->cs
);
2753 /***********************************************************************
2754 * TpSetPoolMinThreads (NTDLL.@)
2756 BOOL WINAPI
TpSetPoolMinThreads( TP_POOL
*pool
, DWORD minimum
)
2758 struct threadpool
*this = impl_from_TP_POOL( pool
);
2759 NTSTATUS status
= STATUS_SUCCESS
;
2761 TRACE( "%p %u\n", pool
, minimum
);
2763 RtlEnterCriticalSection( &this->cs
);
2765 while (this->num_workers
< minimum
)
2767 status
= tp_new_worker_thread( this );
2768 if (status
!= STATUS_SUCCESS
)
2772 if (status
== STATUS_SUCCESS
)
2774 this->min_workers
= minimum
;
2775 this->max_workers
= max( this->min_workers
, this->max_workers
);
2778 RtlLeaveCriticalSection( &this->cs
);
2782 /***********************************************************************
2783 * TpSetTimer (NTDLL.@)
2785 VOID WINAPI
TpSetTimer( TP_TIMER
*timer
, LARGE_INTEGER
*timeout
, LONG period
, LONG window_length
)
2787 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2788 struct threadpool_object
*other_timer
;
2789 BOOL submit_timer
= FALSE
;
2790 ULONGLONG timestamp
;
2792 TRACE( "%p %p %u %u\n", timer
, timeout
, period
, window_length
);
2794 RtlEnterCriticalSection( &timerqueue
.cs
);
2796 assert( this->u
.timer
.timer_initialized
);
2797 this->u
.timer
.timer_set
= timeout
!= NULL
;
2799 /* Convert relative timeout to absolute timestamp and handle a timeout
2800 * of zero, which means that the timer is submitted immediately. */
2803 timestamp
= timeout
->QuadPart
;
2804 if ((LONGLONG
)timestamp
< 0)
2807 NtQuerySystemTime( &now
);
2808 timestamp
= now
.QuadPart
- timestamp
;
2810 else if (!timestamp
)
2817 NtQuerySystemTime( &now
);
2818 timestamp
= now
.QuadPart
+ (ULONGLONG
)period
* 10000;
2820 submit_timer
= TRUE
;
2824 /* First remove existing timeout. */
2825 if (this->u
.timer
.timer_pending
)
2827 list_remove( &this->u
.timer
.timer_entry
);
2828 this->u
.timer
.timer_pending
= FALSE
;
2831 /* If the timer was enabled, then add it back to the queue. */
2834 this->u
.timer
.timeout
= timestamp
;
2835 this->u
.timer
.period
= period
;
2836 this->u
.timer
.window_length
= window_length
;
2838 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
2839 struct threadpool_object
, u
.timer
.timer_entry
)
2841 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
2842 if (this->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
2845 list_add_before( &other_timer
->u
.timer
.timer_entry
, &this->u
.timer
.timer_entry
);
2847 /* Wake up the timer thread when the timeout has to be updated. */
2848 if (list_head( &timerqueue
.pending_timers
) == &this->u
.timer
.timer_entry
)
2849 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
2851 this->u
.timer
.timer_pending
= TRUE
;
2854 RtlLeaveCriticalSection( &timerqueue
.cs
);
2857 tp_object_submit( this, FALSE
);
2860 /***********************************************************************
2861 * TpSetWait (NTDLL.@)
2863 VOID WINAPI
TpSetWait( TP_WAIT
*wait
, HANDLE handle
, LARGE_INTEGER
*timeout
)
2865 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2866 ULONGLONG timestamp
= TIMEOUT_INFINITE
;
2867 BOOL submit_wait
= FALSE
;
2869 TRACE( "%p %p %p\n", wait
, handle
, timeout
);
2871 RtlEnterCriticalSection( &waitqueue
.cs
);
2873 assert( this->u
.wait
.bucket
);
2874 this->u
.wait
.handle
= handle
;
2876 if (handle
|| this->u
.wait
.wait_pending
)
2878 struct waitqueue_bucket
*bucket
= this->u
.wait
.bucket
;
2879 list_remove( &this->u
.wait
.wait_entry
);
2881 /* Convert relative timeout to absolute timestamp. */
2882 if (handle
&& timeout
)
2884 timestamp
= timeout
->QuadPart
;
2885 if ((LONGLONG
)timestamp
< 0)
2888 NtQuerySystemTime( &now
);
2889 timestamp
= now
.QuadPart
- timestamp
;
2891 else if (!timestamp
)
2898 /* Add wait object back into one of the queues. */
2901 list_add_tail( &bucket
->waiting
, &this->u
.wait
.wait_entry
);
2902 this->u
.wait
.wait_pending
= TRUE
;
2903 this->u
.wait
.timeout
= timestamp
;
2907 list_add_tail( &bucket
->reserved
, &this->u
.wait
.wait_entry
);
2908 this->u
.wait
.wait_pending
= FALSE
;
2911 /* Wake up the wait queue thread. */
2912 NtSetEvent( bucket
->update_event
, NULL
);
2915 RtlLeaveCriticalSection( &waitqueue
.cs
);
2918 tp_object_submit( this, FALSE
);
2921 /***********************************************************************
2922 * TpSimpleTryPost (NTDLL.@)
2924 NTSTATUS WINAPI
TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback
, PVOID userdata
,
2925 TP_CALLBACK_ENVIRON
*environment
)
2927 struct threadpool_object
*object
;
2928 struct threadpool
*pool
;
2931 TRACE( "%p %p %p\n", callback
, userdata
, environment
);
2933 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2935 return STATUS_NO_MEMORY
;
2937 status
= tp_threadpool_lock( &pool
, environment
);
2940 RtlFreeHeap( GetProcessHeap(), 0, object
);
2944 object
->type
= TP_OBJECT_TYPE_SIMPLE
;
2945 object
->u
.simple
.callback
= callback
;
2946 tp_object_initialize( object
, pool
, userdata
, environment
);
2948 return STATUS_SUCCESS
;
2951 /***********************************************************************
2952 * TpWaitForTimer (NTDLL.@)
2954 VOID WINAPI
TpWaitForTimer( TP_TIMER
*timer
, BOOL cancel_pending
)
2956 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2958 TRACE( "%p %d\n", timer
, cancel_pending
);
2961 tp_object_cancel( this );
2962 tp_object_wait( this, FALSE
);
2965 /***********************************************************************
2966 * TpWaitForWait (NTDLL.@)
2968 VOID WINAPI
TpWaitForWait( TP_WAIT
*wait
, BOOL cancel_pending
)
2970 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2972 TRACE( "%p %d\n", wait
, cancel_pending
);
2975 tp_object_cancel( this );
2976 tp_object_wait( this, FALSE
);
2979 /***********************************************************************
2980 * TpWaitForWork (NTDLL.@)
2982 VOID WINAPI
TpWaitForWork( TP_WORK
*work
, BOOL cancel_pending
)
2984 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2986 TRACE( "%p %u\n", work
, cancel_pending
);
2989 tp_object_cancel( this );
2990 tp_object_wait( this, FALSE
);