4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
23 #include "wine/port.h"
29 #define NONAMELESSUNION
31 #define WIN32_NO_STATUS
34 #include "wine/debug.h"
35 #include "wine/list.h"
37 #include "ntdll_misc.h"
39 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
42 * Old thread pooling API
47 PRTL_WORK_ITEM_ROUTINE function
;
51 #define EXPIRE_NEVER (~(ULONGLONG)0)
52 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
54 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
;
59 RTL_CRITICAL_SECTION threadpool_compl_cs
;
63 NULL
, /* compl_port */
64 { &critsect_compl_debug
, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
67 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
69 0, 0, &old_threadpool
.threadpool_compl_cs
,
70 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
71 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
78 WAITORTIMERCALLBACK Callback
;
82 HANDLE CompletionEvent
;
84 BOOLEAN CallbackInProgress
;
90 struct timer_queue
*q
;
92 ULONG runcount
; /* number of callbacks pending execution */
93 RTL_WAITORTIMERCALLBACKFUNC callback
;
98 BOOL destroy
; /* timer should be deleted; once set, never unset */
99 HANDLE event
; /* removal event */
105 RTL_CRITICAL_SECTION cs
;
106 struct list timers
; /* sorted by expiration time */
107 BOOL quit
; /* queue should be deleted; once set, never unset */
113 * Object-oriented thread pooling API
116 #define THREADPOOL_WORKER_TIMEOUT 5000
117 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
119 /* internal threadpool representation */
126 /* pool of work items, locked via .cs */
128 RTL_CONDITION_VARIABLE update_event
;
129 /* information about worker threads, locked via .cs */
133 int num_busy_workers
;
136 enum threadpool_objtype
138 TP_OBJECT_TYPE_SIMPLE
,
140 TP_OBJECT_TYPE_TIMER
,
144 /* internal threadpool object representation */
145 struct threadpool_object
149 /* read-only information */
150 enum threadpool_objtype type
;
151 struct threadpool
*pool
;
152 struct threadpool_group
*group
;
154 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
;
155 PTP_SIMPLE_CALLBACK finalization_callback
;
158 /* information about the group, locked via .group->cs */
159 struct list group_entry
;
160 BOOL is_group_member
;
161 /* information about the pool, locked via .pool->cs */
162 struct list pool_entry
;
163 RTL_CONDITION_VARIABLE finished_event
;
164 RTL_CONDITION_VARIABLE group_finished_event
;
165 LONG num_pending_callbacks
;
166 LONG num_running_callbacks
;
167 LONG num_associated_callbacks
;
168 /* arguments for callback */
173 PTP_SIMPLE_CALLBACK callback
;
177 PTP_WORK_CALLBACK callback
;
181 PTP_TIMER_CALLBACK callback
;
182 /* information about the timer, locked via timerqueue.cs */
183 BOOL timer_initialized
;
185 struct list timer_entry
;
193 PTP_WAIT_CALLBACK callback
;
195 /* information about the wait object, locked via waitqueue.cs */
196 struct waitqueue_bucket
*bucket
;
198 struct list wait_entry
;
205 /* internal threadpool instance representation */
206 struct threadpool_instance
208 struct threadpool_object
*object
;
214 CRITICAL_SECTION
*critical_section
;
217 LONG semaphore_count
;
223 /* internal threadpool group representation */
224 struct threadpool_group
229 /* list of group members, locked via .cs */
233 /* global timerqueue object */
234 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
;
241 struct list pending_timers
;
242 RTL_CONDITION_VARIABLE update_event
;
246 { &timerqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
248 FALSE
, /* thread_running */
249 LIST_INIT( timerqueue
.pending_timers
), /* pending_timers */
250 RTL_CONDITION_VARIABLE_INIT
/* update_event */
253 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
=
255 0, 0, &timerqueue
.cs
,
256 { &timerqueue_debug
.ProcessLocksList
, &timerqueue_debug
.ProcessLocksList
},
257 0, 0, { (DWORD_PTR
)(__FILE__
": timerqueue.cs") }
260 /* global waitqueue object */
261 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
;
271 { &waitqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
273 LIST_INIT( waitqueue
.buckets
) /* buckets */
276 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
=
279 { &waitqueue_debug
.ProcessLocksList
, &waitqueue_debug
.ProcessLocksList
},
280 0, 0, { (DWORD_PTR
)(__FILE__
": waitqueue.cs") }
283 struct waitqueue_bucket
285 struct list bucket_entry
;
287 struct list reserved
;
292 static inline struct threadpool
*impl_from_TP_POOL( TP_POOL
*pool
)
294 return (struct threadpool
*)pool
;
297 static inline struct threadpool_object
*impl_from_TP_WORK( TP_WORK
*work
)
299 struct threadpool_object
*object
= (struct threadpool_object
*)work
;
300 assert( object
->type
== TP_OBJECT_TYPE_WORK
);
304 static inline struct threadpool_object
*impl_from_TP_TIMER( TP_TIMER
*timer
)
306 struct threadpool_object
*object
= (struct threadpool_object
*)timer
;
307 assert( object
->type
== TP_OBJECT_TYPE_TIMER
);
311 static inline struct threadpool_object
*impl_from_TP_WAIT( TP_WAIT
*wait
)
313 struct threadpool_object
*object
= (struct threadpool_object
*)wait
;
314 assert( object
->type
== TP_OBJECT_TYPE_WAIT
);
318 static inline struct threadpool_group
*impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group
)
320 return (struct threadpool_group
*)group
;
323 static inline struct threadpool_instance
*impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE
*instance
)
325 return (struct threadpool_instance
*)instance
;
328 static void CALLBACK
threadpool_worker_proc( void *param
);
329 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
);
330 static void tp_object_prepare_shutdown( struct threadpool_object
*object
);
331 static BOOL
tp_object_release( struct threadpool_object
*object
);
332 static struct threadpool
*default_threadpool
= NULL
;
334 static inline LONG
interlocked_inc( PLONG dest
)
336 return interlocked_xchg_add( dest
, 1 ) + 1;
339 static inline LONG
interlocked_dec( PLONG dest
)
341 return interlocked_xchg_add( dest
, -1 ) - 1;
344 static void CALLBACK
process_rtl_work_item( TP_CALLBACK_INSTANCE
*instance
, void *userdata
)
346 struct rtl_work_item
*item
= userdata
;
348 TRACE("executing %p(%p)\n", item
->function
, item
->context
);
349 item
->function( item
->context
);
351 RtlFreeHeap( GetProcessHeap(), 0, item
);
354 /***********************************************************************
355 * RtlQueueWorkItem (NTDLL.@)
357 * Queues a work item into a thread in the thread pool.
360 * function [I] Work function to execute.
361 * context [I] Context to pass to the work function when it is executed.
362 * flags [I] Flags. See notes.
365 * Success: STATUS_SUCCESS.
366 * Failure: Any NTSTATUS code.
369 * Flags can be one or more of the following:
370 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
371 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
372 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
373 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
374 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
376 NTSTATUS WINAPI
RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function
, PVOID context
, ULONG flags
)
378 TP_CALLBACK_ENVIRON environment
;
379 struct rtl_work_item
*item
;
382 TRACE( "%p %p %u\n", function
, context
, flags
);
384 item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item
) );
386 return STATUS_NO_MEMORY
;
388 memset( &environment
, 0, sizeof(environment
) );
389 environment
.Version
= 1;
390 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
391 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
393 item
->function
= function
;
394 item
->context
= context
;
396 status
= TpSimpleTryPost( process_rtl_work_item
, item
, &environment
);
397 if (status
) RtlFreeHeap( GetProcessHeap(), 0, item
);
401 /***********************************************************************
402 * iocp_poller - get completion events and run callbacks
404 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
410 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
412 IO_STATUS_BLOCK iosb
;
413 NTSTATUS res
= NtRemoveIoCompletion( cport
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
416 ERR("NtRemoveIoCompletion failed: 0x%x\n", res
);
420 DWORD transferred
= 0;
423 if (iosb
.u
.Status
== STATUS_SUCCESS
)
424 transferred
= iosb
.Information
;
426 err
= RtlNtStatusToDosError(iosb
.u
.Status
);
428 callback( err
, transferred
, overlapped
);
434 /***********************************************************************
435 * RtlSetIoCompletionCallback (NTDLL.@)
437 * Binds a handle to a thread pool's completion port, and possibly
438 * starts a non-I/O thread to monitor this port and call functions back.
441 * FileHandle [I] Handle to bind to a completion port.
442 * Function [I] Callback function to call on I/O completions.
443 * Flags [I] Not used.
446 * Success: STATUS_SUCCESS.
447 * Failure: Any NTSTATUS code.
450 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
452 IO_STATUS_BLOCK iosb
;
453 FILE_COMPLETION_INFORMATION info
;
455 if (Flags
) FIXME("Unknown value Flags=0x%x\n", Flags
);
457 if (!old_threadpool
.compl_port
)
459 NTSTATUS res
= STATUS_SUCCESS
;
461 RtlEnterCriticalSection(&old_threadpool
.threadpool_compl_cs
);
462 if (!old_threadpool
.compl_port
)
466 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
469 /* FIXME native can start additional threads in case of e.g. hung callback function. */
470 res
= RtlQueueWorkItem( iocp_poller
, cport
, WT_EXECUTEDEFAULT
);
472 old_threadpool
.compl_port
= cport
;
477 RtlLeaveCriticalSection(&old_threadpool
.threadpool_compl_cs
);
481 info
.CompletionPort
= old_threadpool
.compl_port
;
482 info
.CompletionKey
= (ULONG_PTR
)Function
;
484 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
487 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
489 if (timeout
== INFINITE
) return NULL
;
490 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
494 static void delete_wait_work_item(struct wait_work_item
*wait_work_item
)
496 NtClose( wait_work_item
->CancelEvent
);
497 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
500 static DWORD CALLBACK
wait_thread_proc(LPVOID Arg
)
502 struct wait_work_item
*wait_work_item
= Arg
;
504 BOOLEAN alertable
= (wait_work_item
->Flags
& WT_EXECUTEINIOTHREAD
) != 0;
505 HANDLE handles
[2] = { wait_work_item
->Object
, wait_work_item
->CancelEvent
};
506 LARGE_INTEGER timeout
;
507 HANDLE completion_event
;
513 status
= NtWaitForMultipleObjects( 2, handles
, TRUE
, alertable
,
514 get_nt_timeout( &timeout
, wait_work_item
->Milliseconds
) );
515 if (status
== STATUS_WAIT_0
|| status
== STATUS_TIMEOUT
)
517 BOOLEAN TimerOrWaitFired
;
519 if (status
== STATUS_WAIT_0
)
521 TRACE( "object %p signaled, calling callback %p with context %p\n",
522 wait_work_item
->Object
, wait_work_item
->Callback
,
523 wait_work_item
->Context
);
524 TimerOrWaitFired
= FALSE
;
528 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
529 wait_work_item
->Object
, wait_work_item
->Callback
,
530 wait_work_item
->Context
);
531 TimerOrWaitFired
= TRUE
;
533 wait_work_item
->CallbackInProgress
= TRUE
;
534 wait_work_item
->Callback( wait_work_item
->Context
, TimerOrWaitFired
);
535 wait_work_item
->CallbackInProgress
= FALSE
;
537 if (wait_work_item
->Flags
& WT_EXECUTEONLYONCE
)
540 else if (status
!= STATUS_USER_APC
)
544 completion_event
= wait_work_item
->CompletionEvent
;
545 if (completion_event
) NtSetEvent( completion_event
, NULL
);
547 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
548 delete_wait_work_item( wait_work_item
);
553 /***********************************************************************
554 * RtlRegisterWait (NTDLL.@)
556 * Registers a wait for a handle to become signaled.
559 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
560 * Object [I] Object to wait to become signaled.
561 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
562 * Context [I] Context to pass to the callback function when it is executed.
563 * Milliseconds [I] Number of milliseconds to wait before timing out.
564 * Flags [I] Flags. See notes.
567 * Success: STATUS_SUCCESS.
568 * Failure: Any NTSTATUS code.
571 * Flags can be one or more of the following:
572 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
573 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
574 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
575 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
576 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
578 NTSTATUS WINAPI
RtlRegisterWait(PHANDLE NewWaitObject
, HANDLE Object
,
579 RTL_WAITORTIMERCALLBACKFUNC Callback
,
580 PVOID Context
, ULONG Milliseconds
, ULONG Flags
)
582 struct wait_work_item
*wait_work_item
;
585 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject
, Object
, Callback
, Context
, Milliseconds
, Flags
);
587 wait_work_item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item
) );
589 return STATUS_NO_MEMORY
;
591 wait_work_item
->Object
= Object
;
592 wait_work_item
->Callback
= Callback
;
593 wait_work_item
->Context
= Context
;
594 wait_work_item
->Milliseconds
= Milliseconds
;
595 wait_work_item
->Flags
= Flags
;
596 wait_work_item
->CallbackInProgress
= FALSE
;
597 wait_work_item
->DeleteCount
= 0;
598 wait_work_item
->CompletionEvent
= NULL
;
600 status
= NtCreateEvent( &wait_work_item
->CancelEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
601 if (status
!= STATUS_SUCCESS
)
603 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
607 Flags
= Flags
& (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
|
608 WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
);
609 status
= RtlQueueWorkItem( wait_thread_proc
, wait_work_item
, Flags
);
610 if (status
!= STATUS_SUCCESS
)
612 delete_wait_work_item( wait_work_item
);
616 *NewWaitObject
= wait_work_item
;
620 /***********************************************************************
621 * RtlDeregisterWaitEx (NTDLL.@)
623 * Cancels a wait operation and frees the resources associated with calling
627 * WaitObject [I] Handle to the wait object to free.
630 * Success: STATUS_SUCCESS.
631 * Failure: Any NTSTATUS code.
633 NTSTATUS WINAPI
RtlDeregisterWaitEx(HANDLE WaitHandle
, HANDLE CompletionEvent
)
635 struct wait_work_item
*wait_work_item
= WaitHandle
;
636 NTSTATUS status
= STATUS_SUCCESS
;
638 TRACE( "(%p)\n", WaitHandle
);
640 if (WaitHandle
== NULL
)
641 return STATUS_INVALID_HANDLE
;
643 NtSetEvent( wait_work_item
->CancelEvent
, NULL
);
644 if (wait_work_item
->CallbackInProgress
)
646 if (CompletionEvent
!= NULL
)
648 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
650 status
= NtCreateEvent( &CompletionEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
651 if (status
!= STATUS_SUCCESS
)
653 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, CompletionEvent
);
654 if (wait_work_item
->CallbackInProgress
)
655 NtWaitForSingleObject( CompletionEvent
, FALSE
, NULL
);
656 NtClose( CompletionEvent
);
660 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, CompletionEvent
);
661 if (wait_work_item
->CallbackInProgress
)
662 status
= STATUS_PENDING
;
666 status
= STATUS_PENDING
;
669 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
671 status
= STATUS_SUCCESS
;
672 delete_wait_work_item( wait_work_item
);
678 /***********************************************************************
679 * RtlDeregisterWait (NTDLL.@)
681 * Cancels a wait operation and frees the resources associated with calling
685 * WaitObject [I] Handle to the wait object to free.
688 * Success: STATUS_SUCCESS.
689 * Failure: Any NTSTATUS code.
691 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
693 return RtlDeregisterWaitEx(WaitHandle
, NULL
);
697 /************************** Timer Queue Impl **************************/
699 static void queue_remove_timer(struct queue_timer
*t
)
701 /* We MUST hold the queue cs while calling this function. This ensures
702 that we cannot queue another callback for this timer. The runcount
703 being zero makes sure we don't have any already queued. */
704 struct timer_queue
*q
= t
->q
;
706 assert(t
->runcount
== 0);
709 list_remove(&t
->entry
);
711 NtSetEvent(t
->event
, NULL
);
712 RtlFreeHeap(GetProcessHeap(), 0, t
);
714 if (q
->quit
&& list_empty(&q
->timers
))
715 NtSetEvent(q
->event
, NULL
);
718 static void timer_cleanup_callback(struct queue_timer
*t
)
720 struct timer_queue
*q
= t
->q
;
721 RtlEnterCriticalSection(&q
->cs
);
723 assert(0 < t
->runcount
);
726 if (t
->destroy
&& t
->runcount
== 0)
727 queue_remove_timer(t
);
729 RtlLeaveCriticalSection(&q
->cs
);
732 static DWORD WINAPI
timer_callback_wrapper(LPVOID p
)
734 struct queue_timer
*t
= p
;
735 t
->callback(t
->param
, TRUE
);
736 timer_cleanup_callback(t
);
740 static inline ULONGLONG
queue_current_time(void)
742 LARGE_INTEGER now
, freq
;
743 NtQueryPerformanceCounter(&now
, &freq
);
744 return now
.QuadPart
* 1000 / freq
.QuadPart
;
747 static void queue_add_timer(struct queue_timer
*t
, ULONGLONG time
,
750 /* We MUST hold the queue cs while calling this function. */
751 struct timer_queue
*q
= t
->q
;
752 struct list
*ptr
= &q
->timers
;
754 assert(!q
->quit
|| (t
->destroy
&& time
== EXPIRE_NEVER
));
756 if (time
!= EXPIRE_NEVER
)
757 LIST_FOR_EACH(ptr
, &q
->timers
)
759 struct queue_timer
*cur
= LIST_ENTRY(ptr
, struct queue_timer
, entry
);
760 if (time
< cur
->expire
)
763 list_add_before(ptr
, &t
->entry
);
767 /* If we insert at the head of the list, we need to expire sooner
769 if (set_event
&& &t
->entry
== list_head(&q
->timers
))
770 NtSetEvent(q
->event
, NULL
);
773 static inline void queue_move_timer(struct queue_timer
*t
, ULONGLONG time
,
776 /* We MUST hold the queue cs while calling this function. */
777 list_remove(&t
->entry
);
778 queue_add_timer(t
, time
, set_event
);
781 static void queue_timer_expire(struct timer_queue
*q
)
783 struct queue_timer
*t
= NULL
;
785 RtlEnterCriticalSection(&q
->cs
);
786 if (list_head(&q
->timers
))
789 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
790 if (!t
->destroy
&& t
->expire
<= ((now
= queue_current_time())))
795 next
= t
->expire
+ t
->period
;
796 /* avoid trigger cascade if overloaded / hibernated */
798 next
= now
+ t
->period
;
802 queue_move_timer(t
, next
, FALSE
);
807 RtlLeaveCriticalSection(&q
->cs
);
811 if (t
->flags
& WT_EXECUTEINTIMERTHREAD
)
812 timer_callback_wrapper(t
);
817 & (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
818 | WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
));
819 NTSTATUS status
= RtlQueueWorkItem(timer_callback_wrapper
, t
, flags
);
820 if (status
!= STATUS_SUCCESS
)
821 timer_cleanup_callback(t
);
826 static ULONG
queue_get_timeout(struct timer_queue
*q
)
828 struct queue_timer
*t
;
829 ULONG timeout
= INFINITE
;
831 RtlEnterCriticalSection(&q
->cs
);
832 if (list_head(&q
->timers
))
834 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
835 assert(!t
->destroy
|| t
->expire
== EXPIRE_NEVER
);
837 if (t
->expire
!= EXPIRE_NEVER
)
839 ULONGLONG time
= queue_current_time();
840 timeout
= t
->expire
< time
? 0 : t
->expire
- time
;
843 RtlLeaveCriticalSection(&q
->cs
);
848 static void WINAPI
timer_queue_thread_proc(LPVOID p
)
850 struct timer_queue
*q
= p
;
853 timeout_ms
= INFINITE
;
856 LARGE_INTEGER timeout
;
860 status
= NtWaitForSingleObject(
861 q
->event
, FALSE
, get_nt_timeout(&timeout
, timeout_ms
));
863 if (status
== STATUS_WAIT_0
)
865 /* There are two possible ways to trigger the event. Either
866 we are quitting and the last timer got removed, or a new
867 timer got put at the head of the list so we need to adjust
869 RtlEnterCriticalSection(&q
->cs
);
870 if (q
->quit
&& list_empty(&q
->timers
))
872 RtlLeaveCriticalSection(&q
->cs
);
874 else if (status
== STATUS_TIMEOUT
)
875 queue_timer_expire(q
);
880 timeout_ms
= queue_get_timeout(q
);
884 RtlDeleteCriticalSection(&q
->cs
);
886 RtlFreeHeap(GetProcessHeap(), 0, q
);
887 RtlExitUserThread( 0 );
890 static void queue_destroy_timer(struct queue_timer
*t
)
892 /* We MUST hold the queue cs while calling this function. */
894 if (t
->runcount
== 0)
895 /* Ensure a timer is promptly removed. If callbacks are pending,
896 it will be removed after the last one finishes by the callback
898 queue_remove_timer(t
);
900 /* Make sure no destroyed timer masks an active timer at the head
901 of the sorted list. */
902 queue_move_timer(t
, EXPIRE_NEVER
, FALSE
);
905 /***********************************************************************
906 * RtlCreateTimerQueue (NTDLL.@)
908 * Creates a timer queue object and returns a handle to it.
911 * NewTimerQueue [O] The newly created queue.
914 * Success: STATUS_SUCCESS.
915 * Failure: Any NTSTATUS code.
917 NTSTATUS WINAPI
RtlCreateTimerQueue(PHANDLE NewTimerQueue
)
920 struct timer_queue
*q
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q
);
922 return STATUS_NO_MEMORY
;
924 RtlInitializeCriticalSection(&q
->cs
);
925 list_init(&q
->timers
);
927 q
->magic
= TIMER_QUEUE_MAGIC
;
928 status
= NtCreateEvent(&q
->event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
929 if (status
!= STATUS_SUCCESS
)
931 RtlFreeHeap(GetProcessHeap(), 0, q
);
934 status
= RtlCreateUserThread(GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
935 timer_queue_thread_proc
, q
, &q
->thread
, NULL
);
936 if (status
!= STATUS_SUCCESS
)
939 RtlFreeHeap(GetProcessHeap(), 0, q
);
944 return STATUS_SUCCESS
;
947 /***********************************************************************
948 * RtlDeleteTimerQueueEx (NTDLL.@)
950 * Deletes a timer queue object.
953 * TimerQueue [I] The timer queue to destroy.
954 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
955 * wait until all timers are finished firing before
956 * returning. Otherwise, return immediately and set the
957 * event when all timers are done.
960 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
961 * Failure: Any NTSTATUS code.
963 NTSTATUS WINAPI
RtlDeleteTimerQueueEx(HANDLE TimerQueue
, HANDLE CompletionEvent
)
965 struct timer_queue
*q
= TimerQueue
;
966 struct queue_timer
*t
, *temp
;
970 if (!q
|| q
->magic
!= TIMER_QUEUE_MAGIC
)
971 return STATUS_INVALID_HANDLE
;
975 RtlEnterCriticalSection(&q
->cs
);
977 if (list_head(&q
->timers
))
978 /* When the last timer is removed, it will signal the timer thread to
980 LIST_FOR_EACH_ENTRY_SAFE(t
, temp
, &q
->timers
, struct queue_timer
, entry
)
981 queue_destroy_timer(t
);
983 /* However if we have none, we must do it ourselves. */
984 NtSetEvent(q
->event
, NULL
);
985 RtlLeaveCriticalSection(&q
->cs
);
987 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
989 NtWaitForSingleObject(thread
, FALSE
, NULL
);
990 status
= STATUS_SUCCESS
;
996 FIXME("asynchronous return on completion event unimplemented\n");
997 NtWaitForSingleObject(thread
, FALSE
, NULL
);
998 NtSetEvent(CompletionEvent
, NULL
);
1000 status
= STATUS_PENDING
;
1007 static struct timer_queue
*get_timer_queue(HANDLE TimerQueue
)
1009 static struct timer_queue
*default_timer_queue
;
1015 if (!default_timer_queue
)
1018 NTSTATUS status
= RtlCreateTimerQueue(&q
);
1019 if (status
== STATUS_SUCCESS
)
1021 PVOID p
= interlocked_cmpxchg_ptr(
1022 (void **) &default_timer_queue
, q
, NULL
);
1024 /* Got beat to the punch. */
1025 RtlDeleteTimerQueueEx(q
, NULL
);
1028 return default_timer_queue
;
1032 /***********************************************************************
1033 * RtlCreateTimer (NTDLL.@)
1035 * Creates a new timer associated with the given queue.
1038 * NewTimer [O] The newly created timer.
1039 * TimerQueue [I] The queue to hold the timer.
1040 * Callback [I] The callback to fire.
1041 * Parameter [I] The argument for the callback.
1042 * DueTime [I] The delay, in milliseconds, before first firing the
1044 * Period [I] The period, in milliseconds, at which to fire the timer
1045 * after the first callback. If zero, the timer will only
1046 * fire once. It still needs to be deleted with
1048 * Flags [I] Flags controlling the execution of the callback. In
1049 * addition to the WT_* thread pool flags (see
1050 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1051 * WT_EXECUTEONLYONCE are supported.
1054 * Success: STATUS_SUCCESS.
1055 * Failure: Any NTSTATUS code.
1057 NTSTATUS WINAPI
RtlCreateTimer(PHANDLE NewTimer
, HANDLE TimerQueue
,
1058 RTL_WAITORTIMERCALLBACKFUNC Callback
,
1059 PVOID Parameter
, DWORD DueTime
, DWORD Period
,
1063 struct queue_timer
*t
;
1064 struct timer_queue
*q
= get_timer_queue(TimerQueue
);
1066 if (!q
) return STATUS_NO_MEMORY
;
1067 if (q
->magic
!= TIMER_QUEUE_MAGIC
) return STATUS_INVALID_HANDLE
;
1069 t
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t
);
1071 return STATUS_NO_MEMORY
;
1075 t
->callback
= Callback
;
1076 t
->param
= Parameter
;
1082 status
= STATUS_SUCCESS
;
1083 RtlEnterCriticalSection(&q
->cs
);
1085 status
= STATUS_INVALID_HANDLE
;
1087 queue_add_timer(t
, queue_current_time() + DueTime
, TRUE
);
1088 RtlLeaveCriticalSection(&q
->cs
);
1090 if (status
== STATUS_SUCCESS
)
1093 RtlFreeHeap(GetProcessHeap(), 0, t
);
1098 /***********************************************************************
1099 * RtlUpdateTimer (NTDLL.@)
1101 * Changes the time at which a timer expires.
1104 * TimerQueue [I] The queue that holds the timer.
1105 * Timer [I] The timer to update.
1106 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1107 * Period [I] The period, in milliseconds, at which to fire the timer
1108 * after the first callback. If zero, the timer will not
1109 * refire once. It still needs to be deleted with
1113 * Success: STATUS_SUCCESS.
1114 * Failure: Any NTSTATUS code.
1116 NTSTATUS WINAPI
RtlUpdateTimer(HANDLE TimerQueue
, HANDLE Timer
,
1117 DWORD DueTime
, DWORD Period
)
1119 struct queue_timer
*t
= Timer
;
1120 struct timer_queue
*q
= t
->q
;
1122 RtlEnterCriticalSection(&q
->cs
);
1123 /* Can't change a timer if it was once-only or destroyed. */
1124 if (t
->expire
!= EXPIRE_NEVER
)
1127 queue_move_timer(t
, queue_current_time() + DueTime
, TRUE
);
1129 RtlLeaveCriticalSection(&q
->cs
);
1131 return STATUS_SUCCESS
;
1134 /***********************************************************************
1135 * RtlDeleteTimer (NTDLL.@)
1137 * Cancels a timer-queue timer.
1140 * TimerQueue [I] The queue that holds the timer.
1141 * Timer [I] The timer to update.
1142 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1143 * wait until the timer is finished firing all pending
1144 * callbacks before returning. Otherwise, return
1145 * immediately and set the timer is done.
1148 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1149 or if the completion event is NULL.
1150 * Failure: Any NTSTATUS code.
1152 NTSTATUS WINAPI
RtlDeleteTimer(HANDLE TimerQueue
, HANDLE Timer
,
1153 HANDLE CompletionEvent
)
1155 struct queue_timer
*t
= Timer
;
1156 struct timer_queue
*q
;
1157 NTSTATUS status
= STATUS_PENDING
;
1158 HANDLE event
= NULL
;
1161 return STATUS_INVALID_PARAMETER_1
;
1163 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1165 status
= NtCreateEvent(&event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1166 if (status
== STATUS_SUCCESS
)
1167 status
= STATUS_PENDING
;
1169 else if (CompletionEvent
)
1170 event
= CompletionEvent
;
1172 RtlEnterCriticalSection(&q
->cs
);
1174 if (t
->runcount
== 0 && event
)
1175 status
= STATUS_SUCCESS
;
1176 queue_destroy_timer(t
);
1177 RtlLeaveCriticalSection(&q
->cs
);
1179 if (CompletionEvent
== INVALID_HANDLE_VALUE
&& event
)
1181 if (status
== STATUS_PENDING
)
1183 NtWaitForSingleObject(event
, FALSE
, NULL
);
1184 status
= STATUS_SUCCESS
;
1192 /***********************************************************************
1193 * timerqueue_thread_proc (internal)
1195 static void CALLBACK
timerqueue_thread_proc( void *param
)
1197 ULONGLONG timeout_lower
, timeout_upper
, new_timeout
;
1198 struct threadpool_object
*other_timer
;
1199 LARGE_INTEGER now
, timeout
;
1202 TRACE( "starting timer queue thread\n" );
1204 RtlEnterCriticalSection( &timerqueue
.cs
);
1207 NtQuerySystemTime( &now
);
1209 /* Check for expired timers. */
1210 while ((ptr
= list_head( &timerqueue
.pending_timers
)))
1212 struct threadpool_object
*timer
= LIST_ENTRY( ptr
, struct threadpool_object
, u
.timer
.timer_entry
);
1213 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1214 assert( timer
->u
.timer
.timer_pending
);
1215 if (timer
->u
.timer
.timeout
> now
.QuadPart
)
1218 /* Queue a new callback in one of the worker threads. */
1219 list_remove( &timer
->u
.timer
.timer_entry
);
1220 timer
->u
.timer
.timer_pending
= FALSE
;
1221 tp_object_submit( timer
, FALSE
);
1223 /* Insert the timer back into the queue, except it's marked for shutdown. */
1224 if (timer
->u
.timer
.period
&& !timer
->shutdown
)
1226 timer
->u
.timer
.timeout
+= (ULONGLONG
)timer
->u
.timer
.period
* 10000;
1227 if (timer
->u
.timer
.timeout
<= now
.QuadPart
)
1228 timer
->u
.timer
.timeout
= now
.QuadPart
+ 1;
1230 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1231 struct threadpool_object
, u
.timer
.timer_entry
)
1233 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1234 if (timer
->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
1237 list_add_before( &other_timer
->u
.timer
.timer_entry
, &timer
->u
.timer
.timer_entry
);
1238 timer
->u
.timer
.timer_pending
= TRUE
;
1242 timeout_lower
= TIMEOUT_INFINITE
;
1243 timeout_upper
= TIMEOUT_INFINITE
;
1245 /* Determine next timeout and use the window length to optimize wakeup times. */
1246 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1247 struct threadpool_object
, u
.timer
.timer_entry
)
1249 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1250 if (other_timer
->u
.timer
.timeout
>= timeout_upper
)
1253 timeout_lower
= other_timer
->u
.timer
.timeout
;
1254 new_timeout
= timeout_lower
+ (ULONGLONG
)other_timer
->u
.timer
.window_length
* 10000;
1255 if (new_timeout
< timeout_upper
)
1256 timeout_upper
= new_timeout
;
1259 /* Wait for timer update events or until the next timer expires. */
1260 if (timerqueue
.objcount
)
1262 timeout
.QuadPart
= timeout_lower
;
1263 RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
, &timeout
);
1267 /* All timers have been destroyed, if no new timers are created
1268 * within some amount of time, then we can shutdown this thread. */
1269 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1270 if (RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
,
1271 &timeout
) == STATUS_TIMEOUT
&& !timerqueue
.objcount
)
1277 timerqueue
.thread_running
= FALSE
;
1278 RtlLeaveCriticalSection( &timerqueue
.cs
);
1280 TRACE( "terminating timer queue thread\n" );
1281 RtlExitUserThread( 0 );
1284 /***********************************************************************
1285 * tp_new_worker_thread (internal)
1287 * Create and account a new worker thread for the desired pool.
1289 static NTSTATUS
tp_new_worker_thread( struct threadpool
*pool
)
1294 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1295 threadpool_worker_proc
, pool
, &thread
, NULL
);
1296 if (status
== STATUS_SUCCESS
)
1298 interlocked_inc( &pool
->refcount
);
1299 pool
->num_workers
++;
1300 pool
->num_busy_workers
++;
1306 /***********************************************************************
1307 * tp_timerqueue_lock (internal)
1309 * Acquires a lock on the global timerqueue. When the lock is acquired
1310 * successfully, it is guaranteed that the timer thread is running.
1312 static NTSTATUS
tp_timerqueue_lock( struct threadpool_object
*timer
)
1314 NTSTATUS status
= STATUS_SUCCESS
;
1315 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1317 timer
->u
.timer
.timer_initialized
= FALSE
;
1318 timer
->u
.timer
.timer_pending
= FALSE
;
1319 timer
->u
.timer
.timer_set
= FALSE
;
1320 timer
->u
.timer
.timeout
= 0;
1321 timer
->u
.timer
.period
= 0;
1322 timer
->u
.timer
.window_length
= 0;
1324 RtlEnterCriticalSection( &timerqueue
.cs
);
1326 /* Make sure that the timerqueue thread is running. */
1327 if (!timerqueue
.thread_running
)
1330 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1331 timerqueue_thread_proc
, NULL
, &thread
, NULL
);
1332 if (status
== STATUS_SUCCESS
)
1334 timerqueue
.thread_running
= TRUE
;
1339 if (status
== STATUS_SUCCESS
)
1341 timer
->u
.timer
.timer_initialized
= TRUE
;
1342 timerqueue
.objcount
++;
1345 RtlLeaveCriticalSection( &timerqueue
.cs
);
1349 /***********************************************************************
1350 * tp_timerqueue_unlock (internal)
1352 * Releases a lock on the global timerqueue.
1354 static void tp_timerqueue_unlock( struct threadpool_object
*timer
)
1356 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1358 RtlEnterCriticalSection( &timerqueue
.cs
);
1359 if (timer
->u
.timer
.timer_initialized
)
1361 /* If timer was pending, remove it. */
1362 if (timer
->u
.timer
.timer_pending
)
1364 list_remove( &timer
->u
.timer
.timer_entry
);
1365 timer
->u
.timer
.timer_pending
= FALSE
;
1368 /* If the last timer object was destroyed, then wake up the thread. */
1369 if (!--timerqueue
.objcount
)
1371 assert( list_empty( &timerqueue
.pending_timers
) );
1372 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
1375 timer
->u
.timer
.timer_initialized
= FALSE
;
1377 RtlLeaveCriticalSection( &timerqueue
.cs
);
1380 /***********************************************************************
1381 * waitqueue_thread_proc (internal)
1383 static void CALLBACK
waitqueue_thread_proc( void *param
)
1385 struct threadpool_object
*objects
[MAXIMUM_WAITQUEUE_OBJECTS
];
1386 HANDLE handles
[MAXIMUM_WAITQUEUE_OBJECTS
+ 1];
1387 struct waitqueue_bucket
*bucket
= param
;
1388 struct threadpool_object
*wait
, *next
;
1389 LARGE_INTEGER now
, timeout
;
1393 TRACE( "starting wait queue thread\n" );
1395 RtlEnterCriticalSection( &waitqueue
.cs
);
1399 NtQuerySystemTime( &now
);
1400 timeout
.QuadPart
= TIMEOUT_INFINITE
;
1403 LIST_FOR_EACH_ENTRY_SAFE( wait
, next
, &bucket
->waiting
, struct threadpool_object
,
1406 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1407 if (wait
->u
.wait
.timeout
<= now
.QuadPart
)
1409 /* Wait object timed out. */
1410 list_remove( &wait
->u
.wait
.wait_entry
);
1411 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1412 tp_object_submit( wait
, FALSE
);
1416 if (wait
->u
.wait
.timeout
< timeout
.QuadPart
)
1417 timeout
.QuadPart
= wait
->u
.wait
.timeout
;
1419 assert( num_handles
< MAXIMUM_WAITQUEUE_OBJECTS
);
1420 interlocked_inc( &wait
->refcount
);
1421 objects
[num_handles
] = wait
;
1422 handles
[num_handles
] = wait
->u
.wait
.handle
;
1427 if (!bucket
->objcount
)
1429 /* All wait objects have been destroyed, if no new wait objects are created
1430 * within some amount of time, then we can shutdown this thread. */
1431 assert( num_handles
== 0 );
1432 RtlLeaveCriticalSection( &waitqueue
.cs
);
1433 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1434 status
= NtWaitForMultipleObjects( 1, &bucket
->update_event
, TRUE
, FALSE
, &timeout
);
1435 RtlEnterCriticalSection( &waitqueue
.cs
);
1437 if (status
== STATUS_TIMEOUT
&& !bucket
->objcount
)
1442 handles
[num_handles
] = bucket
->update_event
;
1443 RtlLeaveCriticalSection( &waitqueue
.cs
);
1444 status
= NtWaitForMultipleObjects( num_handles
+ 1, handles
, TRUE
, FALSE
, &timeout
);
1445 RtlEnterCriticalSection( &waitqueue
.cs
);
1447 if (status
>= STATUS_WAIT_0
&& status
< STATUS_WAIT_0
+ num_handles
)
1449 wait
= objects
[status
- STATUS_WAIT_0
];
1450 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1451 if (wait
->u
.wait
.bucket
)
1453 /* Wait object signaled. */
1454 assert( wait
->u
.wait
.bucket
== bucket
);
1455 list_remove( &wait
->u
.wait
.wait_entry
);
1456 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1457 tp_object_submit( wait
, TRUE
);
1460 WARN("wait object %p triggered while object was destroyed\n", wait
);
1463 /* Release temporary references to wait objects. */
1466 wait
= objects
[--num_handles
];
1467 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1468 tp_object_release( wait
);
1472 /* Try to merge bucket with other threads. */
1473 if (waitqueue
.num_buckets
> 1 && bucket
->objcount
&&
1474 bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 1 / 3)
1476 struct waitqueue_bucket
*other_bucket
;
1477 LIST_FOR_EACH_ENTRY( other_bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1479 if (other_bucket
!= bucket
&& other_bucket
->objcount
&&
1480 other_bucket
->objcount
+ bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 2 / 3)
1482 other_bucket
->objcount
+= bucket
->objcount
;
1483 bucket
->objcount
= 0;
1485 /* Update reserved list. */
1486 LIST_FOR_EACH_ENTRY( wait
, &bucket
->reserved
, struct threadpool_object
, u
.wait
.wait_entry
)
1488 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1489 wait
->u
.wait
.bucket
= other_bucket
;
1491 list_move_tail( &other_bucket
->reserved
, &bucket
->reserved
);
1493 /* Update waiting list. */
1494 LIST_FOR_EACH_ENTRY( wait
, &bucket
->waiting
, struct threadpool_object
, u
.wait
.wait_entry
)
1496 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1497 wait
->u
.wait
.bucket
= other_bucket
;
1499 list_move_tail( &other_bucket
->waiting
, &bucket
->waiting
);
1501 /* Move bucket to the end, to keep the probability of
1502 * newly added wait objects as small as possible. */
1503 list_remove( &bucket
->bucket_entry
);
1504 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1506 NtSetEvent( other_bucket
->update_event
, NULL
);
1513 /* Remove this bucket from the list. */
1514 list_remove( &bucket
->bucket_entry
);
1515 if (!--waitqueue
.num_buckets
)
1516 assert( list_empty( &waitqueue
.buckets
) );
1518 RtlLeaveCriticalSection( &waitqueue
.cs
);
1520 TRACE( "terminating wait queue thread\n" );
1522 assert( bucket
->objcount
== 0 );
1523 assert( list_empty( &bucket
->reserved
) );
1524 assert( list_empty( &bucket
->waiting
) );
1525 NtClose( bucket
->update_event
);
1527 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1528 RtlExitUserThread( 0 );
1531 /***********************************************************************
1532 * tp_waitqueue_lock (internal)
1534 static NTSTATUS
tp_waitqueue_lock( struct threadpool_object
*wait
)
1536 struct waitqueue_bucket
*bucket
;
1539 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1541 wait
->u
.wait
.signaled
= 0;
1542 wait
->u
.wait
.bucket
= NULL
;
1543 wait
->u
.wait
.wait_pending
= FALSE
;
1544 wait
->u
.wait
.timeout
= 0;
1545 wait
->u
.wait
.handle
= INVALID_HANDLE_VALUE
;
1547 RtlEnterCriticalSection( &waitqueue
.cs
);
1549 /* Try to assign to existing bucket if possible. */
1550 LIST_FOR_EACH_ENTRY( bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1552 if (bucket
->objcount
< MAXIMUM_WAITQUEUE_OBJECTS
)
1554 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1555 wait
->u
.wait
.bucket
= bucket
;
1558 status
= STATUS_SUCCESS
;
1563 /* Create a new bucket and corresponding worker thread. */
1564 bucket
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket
) );
1567 status
= STATUS_NO_MEMORY
;
1571 bucket
->objcount
= 0;
1572 list_init( &bucket
->reserved
);
1573 list_init( &bucket
->waiting
);
1575 status
= NtCreateEvent( &bucket
->update_event
, EVENT_ALL_ACCESS
,
1576 NULL
, SynchronizationEvent
, FALSE
);
1579 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1583 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1584 waitqueue_thread_proc
, bucket
, &thread
, NULL
);
1585 if (status
== STATUS_SUCCESS
)
1587 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1588 waitqueue
.num_buckets
++;
1590 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1591 wait
->u
.wait
.bucket
= bucket
;
1598 NtClose( bucket
->update_event
);
1599 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1603 RtlLeaveCriticalSection( &waitqueue
.cs
);
1607 /***********************************************************************
1608 * tp_waitqueue_unlock (internal)
1610 static void tp_waitqueue_unlock( struct threadpool_object
*wait
)
1612 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1614 RtlEnterCriticalSection( &waitqueue
.cs
);
1615 if (wait
->u
.wait
.bucket
)
1617 struct waitqueue_bucket
*bucket
= wait
->u
.wait
.bucket
;
1618 assert( bucket
->objcount
> 0 );
1620 list_remove( &wait
->u
.wait
.wait_entry
);
1621 wait
->u
.wait
.bucket
= NULL
;
1624 NtSetEvent( bucket
->update_event
, NULL
);
1626 RtlLeaveCriticalSection( &waitqueue
.cs
);
1629 /***********************************************************************
1630 * tp_threadpool_alloc (internal)
1632 * Allocates a new threadpool object.
1634 static NTSTATUS
tp_threadpool_alloc( struct threadpool
**out
)
1636 struct threadpool
*pool
;
1638 pool
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool
) );
1640 return STATUS_NO_MEMORY
;
1644 pool
->shutdown
= FALSE
;
1646 RtlInitializeCriticalSection( &pool
->cs
);
1647 pool
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool.cs");
1649 list_init( &pool
->pool
);
1650 RtlInitializeConditionVariable( &pool
->update_event
);
1652 pool
->max_workers
= 500;
1653 pool
->min_workers
= 0;
1654 pool
->num_workers
= 0;
1655 pool
->num_busy_workers
= 0;
1657 TRACE( "allocated threadpool %p\n", pool
);
1660 return STATUS_SUCCESS
;
1663 /***********************************************************************
1664 * tp_threadpool_shutdown (internal)
1666 * Prepares the shutdown of a threadpool object and notifies all worker
1667 * threads to terminate (after all remaining work items have been
1670 static void tp_threadpool_shutdown( struct threadpool
*pool
)
1672 assert( pool
!= default_threadpool
);
1674 pool
->shutdown
= TRUE
;
1675 RtlWakeAllConditionVariable( &pool
->update_event
);
1678 /***********************************************************************
1679 * tp_threadpool_release (internal)
1681 * Releases a reference to a threadpool object.
1683 static BOOL
tp_threadpool_release( struct threadpool
*pool
)
1685 if (interlocked_dec( &pool
->refcount
))
1688 TRACE( "destroying threadpool %p\n", pool
);
1690 assert( pool
->shutdown
);
1691 assert( !pool
->objcount
);
1692 assert( list_empty( &pool
->pool
) );
1694 pool
->cs
.DebugInfo
->Spare
[0] = 0;
1695 RtlDeleteCriticalSection( &pool
->cs
);
1697 RtlFreeHeap( GetProcessHeap(), 0, pool
);
1701 /***********************************************************************
1702 * tp_threadpool_lock (internal)
1704 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1705 * block. When the lock is acquired successfully, it is guaranteed that
1706 * there is at least one worker thread to process tasks.
1708 static NTSTATUS
tp_threadpool_lock( struct threadpool
**out
, TP_CALLBACK_ENVIRON
*environment
)
1710 struct threadpool
*pool
= NULL
;
1711 NTSTATUS status
= STATUS_SUCCESS
;
1714 pool
= (struct threadpool
*)environment
->Pool
;
1718 if (!default_threadpool
)
1720 status
= tp_threadpool_alloc( &pool
);
1721 if (status
!= STATUS_SUCCESS
)
1724 if (interlocked_cmpxchg_ptr( (void *)&default_threadpool
, pool
, NULL
) != NULL
)
1726 tp_threadpool_shutdown( pool
);
1727 tp_threadpool_release( pool
);
1731 pool
= default_threadpool
;
1734 RtlEnterCriticalSection( &pool
->cs
);
1736 /* Make sure that the threadpool has at least one thread. */
1737 if (!pool
->num_workers
)
1738 status
= tp_new_worker_thread( pool
);
1740 /* Keep a reference, and increment objcount to ensure that the
1741 * last thread doesn't terminate. */
1742 if (status
== STATUS_SUCCESS
)
1744 interlocked_inc( &pool
->refcount
);
1748 RtlLeaveCriticalSection( &pool
->cs
);
1750 if (status
!= STATUS_SUCCESS
)
1754 return STATUS_SUCCESS
;
1757 /***********************************************************************
1758 * tp_threadpool_unlock (internal)
1760 * Releases a lock on a threadpool.
1762 static void tp_threadpool_unlock( struct threadpool
*pool
)
1764 RtlEnterCriticalSection( &pool
->cs
);
1766 RtlLeaveCriticalSection( &pool
->cs
);
1767 tp_threadpool_release( pool
);
1770 /***********************************************************************
1771 * tp_group_alloc (internal)
1773 * Allocates a new threadpool group object.
1775 static NTSTATUS
tp_group_alloc( struct threadpool_group
**out
)
1777 struct threadpool_group
*group
;
1779 group
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group
) );
1781 return STATUS_NO_MEMORY
;
1783 group
->refcount
= 1;
1784 group
->shutdown
= FALSE
;
1786 RtlInitializeCriticalSection( &group
->cs
);
1787 group
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool_group.cs");
1789 list_init( &group
->members
);
1791 TRACE( "allocated group %p\n", group
);
1794 return STATUS_SUCCESS
;
1797 /***********************************************************************
1798 * tp_group_shutdown (internal)
1800 * Marks the group object for shutdown.
1802 static void tp_group_shutdown( struct threadpool_group
*group
)
1804 group
->shutdown
= TRUE
;
1807 /***********************************************************************
1808 * tp_group_release (internal)
1810 * Releases a reference to a group object.
1812 static BOOL
tp_group_release( struct threadpool_group
*group
)
1814 if (interlocked_dec( &group
->refcount
))
1817 TRACE( "destroying group %p\n", group
);
1819 assert( group
->shutdown
);
1820 assert( list_empty( &group
->members
) );
1822 group
->cs
.DebugInfo
->Spare
[0] = 0;
1823 RtlDeleteCriticalSection( &group
->cs
);
1825 RtlFreeHeap( GetProcessHeap(), 0, group
);
1829 /***********************************************************************
1830 * tp_object_initialize (internal)
1832 * Initializes members of a threadpool object.
1834 static void tp_object_initialize( struct threadpool_object
*object
, struct threadpool
*pool
,
1835 PVOID userdata
, TP_CALLBACK_ENVIRON
*environment
)
1837 BOOL is_simple_callback
= (object
->type
== TP_OBJECT_TYPE_SIMPLE
);
1839 object
->refcount
= 1;
1840 object
->shutdown
= FALSE
;
1842 object
->pool
= pool
;
1843 object
->group
= NULL
;
1844 object
->userdata
= userdata
;
1845 object
->group_cancel_callback
= NULL
;
1846 object
->finalization_callback
= NULL
;
1847 object
->may_run_long
= 0;
1848 object
->race_dll
= NULL
;
1850 memset( &object
->group_entry
, 0, sizeof(object
->group_entry
) );
1851 object
->is_group_member
= FALSE
;
1853 memset( &object
->pool_entry
, 0, sizeof(object
->pool_entry
) );
1854 RtlInitializeConditionVariable( &object
->finished_event
);
1855 RtlInitializeConditionVariable( &object
->group_finished_event
);
1856 object
->num_pending_callbacks
= 0;
1857 object
->num_running_callbacks
= 0;
1858 object
->num_associated_callbacks
= 0;
1862 if (environment
->Version
!= 1 && environment
->Version
!= 3)
1863 FIXME( "unsupported environment version %u\n", environment
->Version
);
1865 object
->group
= impl_from_TP_CLEANUP_GROUP( environment
->CleanupGroup
);
1866 object
->group_cancel_callback
= environment
->CleanupGroupCancelCallback
;
1867 object
->finalization_callback
= environment
->FinalizationCallback
;
1868 object
->may_run_long
= environment
->u
.s
.LongFunction
!= 0;
1869 object
->race_dll
= environment
->RaceDll
;
1871 if (environment
->ActivationContext
)
1872 FIXME( "activation context not supported yet\n" );
1874 if (environment
->u
.s
.Persistent
)
1875 FIXME( "persistent threads not supported yet\n" );
1878 if (object
->race_dll
)
1879 LdrAddRefDll( 0, object
->race_dll
);
1881 TRACE( "allocated object %p of type %u\n", object
, object
->type
);
1883 /* For simple callbacks we have to run tp_object_submit before adding this object
1884 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1885 * will be set, and tp_object_submit would fail with an assertion. */
1887 if (is_simple_callback
)
1888 tp_object_submit( object
, FALSE
);
1892 struct threadpool_group
*group
= object
->group
;
1893 interlocked_inc( &group
->refcount
);
1895 RtlEnterCriticalSection( &group
->cs
);
1896 list_add_tail( &group
->members
, &object
->group_entry
);
1897 object
->is_group_member
= TRUE
;
1898 RtlLeaveCriticalSection( &group
->cs
);
1901 if (is_simple_callback
)
1902 tp_object_release( object
);
1905 /***********************************************************************
1906 * tp_object_submit (internal)
1908 * Submits a threadpool object to the associated threadpool. This
1909 * function has to be VOID because TpPostWork can never fail on Windows.
1911 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
)
1913 struct threadpool
*pool
= object
->pool
;
1914 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
1916 assert( !object
->shutdown
);
1917 assert( !pool
->shutdown
);
1919 RtlEnterCriticalSection( &pool
->cs
);
1921 /* Start new worker threads if required. */
1922 if (pool
->num_busy_workers
>= pool
->num_workers
&&
1923 pool
->num_workers
< pool
->max_workers
)
1924 status
= tp_new_worker_thread( pool
);
1926 /* Queue work item and increment refcount. */
1927 interlocked_inc( &object
->refcount
);
1928 if (!object
->num_pending_callbacks
++)
1929 list_add_tail( &pool
->pool
, &object
->pool_entry
);
1931 /* Count how often the object was signaled. */
1932 if (object
->type
== TP_OBJECT_TYPE_WAIT
&& signaled
)
1933 object
->u
.wait
.signaled
++;
1935 /* No new thread started - wake up one existing thread. */
1936 if (status
!= STATUS_SUCCESS
)
1938 assert( pool
->num_workers
> 0 );
1939 RtlWakeConditionVariable( &pool
->update_event
);
1942 RtlLeaveCriticalSection( &pool
->cs
);
1945 /***********************************************************************
1946 * tp_object_cancel (internal)
1948 * Cancels all currently pending callbacks for a specific object.
1950 static void tp_object_cancel( struct threadpool_object
*object
)
1952 struct threadpool
*pool
= object
->pool
;
1953 LONG pending_callbacks
= 0;
1955 RtlEnterCriticalSection( &pool
->cs
);
1956 if (object
->num_pending_callbacks
)
1958 pending_callbacks
= object
->num_pending_callbacks
;
1959 object
->num_pending_callbacks
= 0;
1960 list_remove( &object
->pool_entry
);
1962 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
1963 object
->u
.wait
.signaled
= 0;
1965 RtlLeaveCriticalSection( &pool
->cs
);
1967 while (pending_callbacks
--)
1968 tp_object_release( object
);
1971 /***********************************************************************
1972 * tp_object_wait (internal)
1974 * Waits until all pending and running callbacks of a specific object
1975 * have been processed.
1977 static void tp_object_wait( struct threadpool_object
*object
, BOOL group_wait
)
1979 struct threadpool
*pool
= object
->pool
;
1981 RtlEnterCriticalSection( &pool
->cs
);
1984 while (object
->num_pending_callbacks
|| object
->num_running_callbacks
)
1985 RtlSleepConditionVariableCS( &object
->group_finished_event
, &pool
->cs
, NULL
);
1989 while (object
->num_pending_callbacks
|| object
->num_associated_callbacks
)
1990 RtlSleepConditionVariableCS( &object
->finished_event
, &pool
->cs
, NULL
);
1992 RtlLeaveCriticalSection( &pool
->cs
);
1995 /***********************************************************************
1996 * tp_object_prepare_shutdown (internal)
1998 * Prepares a threadpool object for shutdown.
2000 static void tp_object_prepare_shutdown( struct threadpool_object
*object
)
2002 if (object
->type
== TP_OBJECT_TYPE_TIMER
)
2003 tp_timerqueue_unlock( object
);
2004 else if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2005 tp_waitqueue_unlock( object
);
2008 /***********************************************************************
2009 * tp_object_release (internal)
2011 * Releases a reference to a threadpool object.
2013 static BOOL
tp_object_release( struct threadpool_object
*object
)
2015 if (interlocked_dec( &object
->refcount
))
2018 TRACE( "destroying object %p of type %u\n", object
, object
->type
);
2020 assert( object
->shutdown
);
2021 assert( !object
->num_pending_callbacks
);
2022 assert( !object
->num_running_callbacks
);
2023 assert( !object
->num_associated_callbacks
);
2025 /* release reference to the group */
2028 struct threadpool_group
*group
= object
->group
;
2030 RtlEnterCriticalSection( &group
->cs
);
2031 if (object
->is_group_member
)
2033 list_remove( &object
->group_entry
);
2034 object
->is_group_member
= FALSE
;
2036 RtlLeaveCriticalSection( &group
->cs
);
2038 tp_group_release( group
);
2041 tp_threadpool_unlock( object
->pool
);
2043 if (object
->race_dll
)
2044 LdrUnloadDll( object
->race_dll
);
2046 RtlFreeHeap( GetProcessHeap(), 0, object
);
2050 /***********************************************************************
2051 * threadpool_worker_proc (internal)
2053 static void CALLBACK
threadpool_worker_proc( void *param
)
2055 TP_CALLBACK_INSTANCE
*callback_instance
;
2056 struct threadpool_instance instance
;
2057 struct threadpool
*pool
= param
;
2058 TP_WAIT_RESULT wait_result
= 0;
2059 LARGE_INTEGER timeout
;
2063 TRACE( "starting worker thread for pool %p\n", pool
);
2065 RtlEnterCriticalSection( &pool
->cs
);
2066 pool
->num_busy_workers
--;
2069 while ((ptr
= list_head( &pool
->pool
)))
2071 struct threadpool_object
*object
= LIST_ENTRY( ptr
, struct threadpool_object
, pool_entry
);
2072 assert( object
->num_pending_callbacks
> 0 );
2074 /* If further pending callbacks are queued, move the work item to
2075 * the end of the pool list. Otherwise remove it from the pool. */
2076 list_remove( &object
->pool_entry
);
2077 if (--object
->num_pending_callbacks
)
2078 list_add_tail( &pool
->pool
, &object
->pool_entry
);
2080 /* For wait objects check if they were signaled or have timed out. */
2081 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2083 wait_result
= object
->u
.wait
.signaled
? WAIT_OBJECT_0
: WAIT_TIMEOUT
;
2084 if (wait_result
== WAIT_OBJECT_0
) object
->u
.wait
.signaled
--;
2087 /* Leave critical section and do the actual callback. */
2088 object
->num_associated_callbacks
++;
2089 object
->num_running_callbacks
++;
2090 pool
->num_busy_workers
++;
2091 RtlLeaveCriticalSection( &pool
->cs
);
2093 /* Initialize threadpool instance struct. */
2094 callback_instance
= (TP_CALLBACK_INSTANCE
*)&instance
;
2095 instance
.object
= object
;
2096 instance
.threadid
= GetCurrentThreadId();
2097 instance
.associated
= TRUE
;
2098 instance
.may_run_long
= object
->may_run_long
;
2099 instance
.cleanup
.critical_section
= NULL
;
2100 instance
.cleanup
.mutex
= NULL
;
2101 instance
.cleanup
.semaphore
= NULL
;
2102 instance
.cleanup
.semaphore_count
= 0;
2103 instance
.cleanup
.event
= NULL
;
2104 instance
.cleanup
.library
= NULL
;
2106 switch (object
->type
)
2108 case TP_OBJECT_TYPE_SIMPLE
:
2110 TRACE( "executing simple callback %p(%p, %p)\n",
2111 object
->u
.simple
.callback
, callback_instance
, object
->userdata
);
2112 object
->u
.simple
.callback( callback_instance
, object
->userdata
);
2113 TRACE( "callback %p returned\n", object
->u
.simple
.callback
);
2117 case TP_OBJECT_TYPE_WORK
:
2119 TRACE( "executing work callback %p(%p, %p, %p)\n",
2120 object
->u
.work
.callback
, callback_instance
, object
->userdata
, object
);
2121 object
->u
.work
.callback( callback_instance
, object
->userdata
, (TP_WORK
*)object
);
2122 TRACE( "callback %p returned\n", object
->u
.work
.callback
);
2126 case TP_OBJECT_TYPE_TIMER
:
2128 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2129 object
->u
.timer
.callback
, callback_instance
, object
->userdata
, object
);
2130 object
->u
.timer
.callback( callback_instance
, object
->userdata
, (TP_TIMER
*)object
);
2131 TRACE( "callback %p returned\n", object
->u
.timer
.callback
);
2135 case TP_OBJECT_TYPE_WAIT
:
2137 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2138 object
->u
.wait
.callback
, callback_instance
, object
->userdata
, object
, wait_result
);
2139 object
->u
.wait
.callback( callback_instance
, object
->userdata
, (TP_WAIT
*)object
, wait_result
);
2140 TRACE( "callback %p returned\n", object
->u
.wait
.callback
);
2149 /* Execute finalization callback. */
2150 if (object
->finalization_callback
)
2152 TRACE( "executing finalization callback %p(%p, %p)\n",
2153 object
->finalization_callback
, callback_instance
, object
->userdata
);
2154 object
->finalization_callback( callback_instance
, object
->userdata
);
2155 TRACE( "callback %p returned\n", object
->finalization_callback
);
2158 /* Execute cleanup tasks. */
2159 if (instance
.cleanup
.critical_section
)
2161 RtlLeaveCriticalSection( instance
.cleanup
.critical_section
);
2163 if (instance
.cleanup
.mutex
)
2165 status
= NtReleaseMutant( instance
.cleanup
.mutex
, NULL
);
2166 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2168 if (instance
.cleanup
.semaphore
)
2170 status
= NtReleaseSemaphore( instance
.cleanup
.semaphore
, instance
.cleanup
.semaphore_count
, NULL
);
2171 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2173 if (instance
.cleanup
.event
)
2175 status
= NtSetEvent( instance
.cleanup
.event
, NULL
);
2176 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2178 if (instance
.cleanup
.library
)
2180 LdrUnloadDll( instance
.cleanup
.library
);
2184 RtlEnterCriticalSection( &pool
->cs
);
2185 pool
->num_busy_workers
--;
2187 /* Simple callbacks are automatically shutdown after execution. */
2188 if (object
->type
== TP_OBJECT_TYPE_SIMPLE
)
2190 tp_object_prepare_shutdown( object
);
2191 object
->shutdown
= TRUE
;
2194 object
->num_running_callbacks
--;
2195 if (!object
->num_pending_callbacks
&& !object
->num_running_callbacks
)
2196 RtlWakeAllConditionVariable( &object
->group_finished_event
);
2198 if (instance
.associated
)
2200 object
->num_associated_callbacks
--;
2201 if (!object
->num_pending_callbacks
&& !object
->num_associated_callbacks
)
2202 RtlWakeAllConditionVariable( &object
->finished_event
);
2205 tp_object_release( object
);
2208 /* Shutdown worker thread if requested. */
2212 /* Wait for new tasks or until the timeout expires. A thread only terminates
2213 * when no new tasks are available, and the number of threads can be
2214 * decreased without violating the min_workers limit. An exception is when
2215 * min_workers == 0, then objcount is used to detect if the last thread
2216 * can be terminated. */
2217 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
2218 if (RtlSleepConditionVariableCS( &pool
->update_event
, &pool
->cs
, &timeout
) == STATUS_TIMEOUT
&&
2219 !list_head( &pool
->pool
) && (pool
->num_workers
> max( pool
->min_workers
, 1 ) ||
2220 (!pool
->min_workers
&& !pool
->objcount
)))
2225 pool
->num_workers
--;
2226 RtlLeaveCriticalSection( &pool
->cs
);
2228 TRACE( "terminating worker thread for pool %p\n", pool
);
2229 tp_threadpool_release( pool
);
2230 RtlExitUserThread( 0 );
2233 /***********************************************************************
2234 * TpAllocCleanupGroup (NTDLL.@)
2236 NTSTATUS WINAPI
TpAllocCleanupGroup( TP_CLEANUP_GROUP
**out
)
2238 TRACE( "%p\n", out
);
2240 return tp_group_alloc( (struct threadpool_group
**)out
);
2243 /***********************************************************************
2244 * TpAllocPool (NTDLL.@)
2246 NTSTATUS WINAPI
TpAllocPool( TP_POOL
**out
, PVOID reserved
)
2248 TRACE( "%p %p\n", out
, reserved
);
2251 FIXME( "reserved argument is nonzero (%p)\n", reserved
);
2253 return tp_threadpool_alloc( (struct threadpool
**)out
);
2256 /***********************************************************************
2257 * TpAllocTimer (NTDLL.@)
2259 NTSTATUS WINAPI
TpAllocTimer( TP_TIMER
**out
, PTP_TIMER_CALLBACK callback
, PVOID userdata
,
2260 TP_CALLBACK_ENVIRON
*environment
)
2262 struct threadpool_object
*object
;
2263 struct threadpool
*pool
;
2266 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2268 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2270 return STATUS_NO_MEMORY
;
2272 status
= tp_threadpool_lock( &pool
, environment
);
2275 RtlFreeHeap( GetProcessHeap(), 0, object
);
2279 object
->type
= TP_OBJECT_TYPE_TIMER
;
2280 object
->u
.timer
.callback
= callback
;
2282 status
= tp_timerqueue_lock( object
);
2285 tp_threadpool_unlock( pool
);
2286 RtlFreeHeap( GetProcessHeap(), 0, object
);
2290 tp_object_initialize( object
, pool
, userdata
, environment
);
2292 *out
= (TP_TIMER
*)object
;
2293 return STATUS_SUCCESS
;
2296 /***********************************************************************
2297 * TpAllocWait (NTDLL.@)
2299 NTSTATUS WINAPI
TpAllocWait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2300 TP_CALLBACK_ENVIRON
*environment
)
2302 struct threadpool_object
*object
;
2303 struct threadpool
*pool
;
2306 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2308 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2310 return STATUS_NO_MEMORY
;
2312 status
= tp_threadpool_lock( &pool
, environment
);
2315 RtlFreeHeap( GetProcessHeap(), 0, object
);
2319 object
->type
= TP_OBJECT_TYPE_WAIT
;
2320 object
->u
.wait
.callback
= callback
;
2322 status
= tp_waitqueue_lock( object
);
2325 tp_threadpool_unlock( pool
);
2326 RtlFreeHeap( GetProcessHeap(), 0, object
);
2330 tp_object_initialize( object
, pool
, userdata
, environment
);
2332 *out
= (TP_WAIT
*)object
;
2333 return STATUS_SUCCESS
;
2336 /***********************************************************************
2337 * TpAllocWork (NTDLL.@)
2339 NTSTATUS WINAPI
TpAllocWork( TP_WORK
**out
, PTP_WORK_CALLBACK callback
, PVOID userdata
,
2340 TP_CALLBACK_ENVIRON
*environment
)
2342 struct threadpool_object
*object
;
2343 struct threadpool
*pool
;
2346 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2348 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2350 return STATUS_NO_MEMORY
;
2352 status
= tp_threadpool_lock( &pool
, environment
);
2355 RtlFreeHeap( GetProcessHeap(), 0, object
);
2359 object
->type
= TP_OBJECT_TYPE_WORK
;
2360 object
->u
.work
.callback
= callback
;
2361 tp_object_initialize( object
, pool
, userdata
, environment
);
2363 *out
= (TP_WORK
*)object
;
2364 return STATUS_SUCCESS
;
2367 /***********************************************************************
2368 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2370 VOID WINAPI
TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE
*instance
, CRITICAL_SECTION
*crit
)
2372 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2374 TRACE( "%p %p\n", instance
, crit
);
2376 if (!this->cleanup
.critical_section
)
2377 this->cleanup
.critical_section
= crit
;
2380 /***********************************************************************
2381 * TpCallbackMayRunLong (NTDLL.@)
2383 NTSTATUS WINAPI
TpCallbackMayRunLong( TP_CALLBACK_INSTANCE
*instance
)
2385 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2386 struct threadpool_object
*object
= this->object
;
2387 struct threadpool
*pool
;
2388 NTSTATUS status
= STATUS_SUCCESS
;
2390 TRACE( "%p\n", instance
);
2392 if (this->threadid
!= GetCurrentThreadId())
2394 ERR("called from wrong thread, ignoring\n");
2395 return STATUS_UNSUCCESSFUL
; /* FIXME */
2398 if (this->may_run_long
)
2399 return STATUS_SUCCESS
;
2401 pool
= object
->pool
;
2402 RtlEnterCriticalSection( &pool
->cs
);
2404 /* Start new worker threads if required. */
2405 if (pool
->num_busy_workers
>= pool
->num_workers
)
2407 if (pool
->num_workers
< pool
->max_workers
)
2409 status
= tp_new_worker_thread( pool
);
2413 status
= STATUS_TOO_MANY_THREADS
;
2417 RtlLeaveCriticalSection( &pool
->cs
);
2418 this->may_run_long
= TRUE
;
2422 /***********************************************************************
2423 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2425 VOID WINAPI
TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE mutex
)
2427 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2429 TRACE( "%p %p\n", instance
, mutex
);
2431 if (!this->cleanup
.mutex
)
2432 this->cleanup
.mutex
= mutex
;
2435 /***********************************************************************
2436 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2438 VOID WINAPI
TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE semaphore
, DWORD count
)
2440 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2442 TRACE( "%p %p %u\n", instance
, semaphore
, count
);
2444 if (!this->cleanup
.semaphore
)
2446 this->cleanup
.semaphore
= semaphore
;
2447 this->cleanup
.semaphore_count
= count
;
2451 /***********************************************************************
2452 * TpCallbackSetEventOnCompletion (NTDLL.@)
2454 VOID WINAPI
TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE event
)
2456 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2458 TRACE( "%p %p\n", instance
, event
);
2460 if (!this->cleanup
.event
)
2461 this->cleanup
.event
= event
;
2464 /***********************************************************************
2465 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2467 VOID WINAPI
TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HMODULE module
)
2469 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2471 TRACE( "%p %p\n", instance
, module
);
2473 if (!this->cleanup
.library
)
2474 this->cleanup
.library
= module
;
2477 /***********************************************************************
2478 * TpDisassociateCallback (NTDLL.@)
2480 VOID WINAPI
TpDisassociateCallback( TP_CALLBACK_INSTANCE
*instance
)
2482 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2483 struct threadpool_object
*object
= this->object
;
2484 struct threadpool
*pool
;
2486 TRACE( "%p\n", instance
);
2488 if (this->threadid
!= GetCurrentThreadId())
2490 ERR("called from wrong thread, ignoring\n");
2494 if (!this->associated
)
2497 pool
= object
->pool
;
2498 RtlEnterCriticalSection( &pool
->cs
);
2500 object
->num_associated_callbacks
--;
2501 if (!object
->num_pending_callbacks
&& !object
->num_associated_callbacks
)
2502 RtlWakeAllConditionVariable( &object
->finished_event
);
2504 RtlLeaveCriticalSection( &pool
->cs
);
2505 this->associated
= FALSE
;
2508 /***********************************************************************
2509 * TpIsTimerSet (NTDLL.@)
2511 BOOL WINAPI
TpIsTimerSet( TP_TIMER
*timer
)
2513 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2515 TRACE( "%p\n", timer
);
2517 return this->u
.timer
.timer_set
;
2520 /***********************************************************************
2521 * TpPostWork (NTDLL.@)
2523 VOID WINAPI
TpPostWork( TP_WORK
*work
)
2525 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2527 TRACE( "%p\n", work
);
2529 tp_object_submit( this, FALSE
);
2532 /***********************************************************************
2533 * TpReleaseCleanupGroup (NTDLL.@)
2535 VOID WINAPI
TpReleaseCleanupGroup( TP_CLEANUP_GROUP
*group
)
2537 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2539 TRACE( "%p\n", group
);
2541 tp_group_shutdown( this );
2542 tp_group_release( this );
2545 /***********************************************************************
2546 * TpReleaseCleanupGroupMembers (NTDLL.@)
2548 VOID WINAPI
TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP
*group
, BOOL cancel_pending
, PVOID userdata
)
2550 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2551 struct threadpool_object
*object
, *next
;
2552 struct list members
;
2554 TRACE( "%p %u %p\n", group
, cancel_pending
, userdata
);
2556 RtlEnterCriticalSection( &this->cs
);
2558 /* Unset group, increase references, and mark objects for shutdown */
2559 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &this->members
, struct threadpool_object
, group_entry
)
2561 assert( object
->group
== this );
2562 assert( object
->is_group_member
);
2564 if (interlocked_inc( &object
->refcount
) == 1)
2566 /* Object is basically already destroyed, but group reference
2567 * was not deleted yet. We can safely ignore this object. */
2568 interlocked_dec( &object
->refcount
);
2569 list_remove( &object
->group_entry
);
2570 object
->is_group_member
= FALSE
;
2574 object
->is_group_member
= FALSE
;
2575 tp_object_prepare_shutdown( object
);
2578 /* Move members to a new temporary list */
2579 list_init( &members
);
2580 list_move_tail( &members
, &this->members
);
2582 RtlLeaveCriticalSection( &this->cs
);
2584 /* Cancel pending callbacks if requested */
2587 LIST_FOR_EACH_ENTRY( object
, &members
, struct threadpool_object
, group_entry
)
2589 tp_object_cancel( object
);
2593 /* Wait for remaining callbacks to finish */
2594 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &members
, struct threadpool_object
, group_entry
)
2596 tp_object_wait( object
, TRUE
);
2598 if (!object
->shutdown
)
2600 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2601 if (cancel_pending
&& object
->group_cancel_callback
)
2603 TRACE( "executing group cancel callback %p(%p, %p)\n",
2604 object
->group_cancel_callback
, object
->userdata
, userdata
);
2605 object
->group_cancel_callback( object
->userdata
, userdata
);
2606 TRACE( "callback %p returned\n", object
->group_cancel_callback
);
2609 if (object
->type
!= TP_OBJECT_TYPE_SIMPLE
)
2610 tp_object_release( object
);
2613 object
->shutdown
= TRUE
;
2614 tp_object_release( object
);
2618 /***********************************************************************
2619 * TpReleasePool (NTDLL.@)
2621 VOID WINAPI
TpReleasePool( TP_POOL
*pool
)
2623 struct threadpool
*this = impl_from_TP_POOL( pool
);
2625 TRACE( "%p\n", pool
);
2627 tp_threadpool_shutdown( this );
2628 tp_threadpool_release( this );
2631 /***********************************************************************
2632 * TpReleaseTimer (NTDLL.@)
2634 VOID WINAPI
TpReleaseTimer( TP_TIMER
*timer
)
2636 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2638 TRACE( "%p\n", timer
);
2640 tp_object_prepare_shutdown( this );
2641 this->shutdown
= TRUE
;
2642 tp_object_release( this );
2645 /***********************************************************************
2646 * TpReleaseWait (NTDLL.@)
2648 VOID WINAPI
TpReleaseWait( TP_WAIT
*wait
)
2650 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2652 TRACE( "%p\n", wait
);
2654 tp_object_prepare_shutdown( this );
2655 this->shutdown
= TRUE
;
2656 tp_object_release( this );
2659 /***********************************************************************
2660 * TpReleaseWork (NTDLL.@)
2662 VOID WINAPI
TpReleaseWork( TP_WORK
*work
)
2664 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2666 TRACE( "%p\n", work
);
2668 tp_object_prepare_shutdown( this );
2669 this->shutdown
= TRUE
;
2670 tp_object_release( this );
2673 /***********************************************************************
2674 * TpSetPoolMaxThreads (NTDLL.@)
2676 VOID WINAPI
TpSetPoolMaxThreads( TP_POOL
*pool
, DWORD maximum
)
2678 struct threadpool
*this = impl_from_TP_POOL( pool
);
2680 TRACE( "%p %u\n", pool
, maximum
);
2682 RtlEnterCriticalSection( &this->cs
);
2683 this->max_workers
= max( maximum
, 1 );
2684 this->min_workers
= min( this->min_workers
, this->max_workers
);
2685 RtlLeaveCriticalSection( &this->cs
);
2688 /***********************************************************************
2689 * TpSetPoolMinThreads (NTDLL.@)
2691 BOOL WINAPI
TpSetPoolMinThreads( TP_POOL
*pool
, DWORD minimum
)
2693 struct threadpool
*this = impl_from_TP_POOL( pool
);
2694 NTSTATUS status
= STATUS_SUCCESS
;
2696 TRACE( "%p %u\n", pool
, minimum
);
2698 RtlEnterCriticalSection( &this->cs
);
2700 while (this->num_workers
< minimum
)
2702 status
= tp_new_worker_thread( this );
2703 if (status
!= STATUS_SUCCESS
)
2707 if (status
== STATUS_SUCCESS
)
2709 this->min_workers
= minimum
;
2710 this->max_workers
= max( this->min_workers
, this->max_workers
);
2713 RtlLeaveCriticalSection( &this->cs
);
2717 /***********************************************************************
2718 * TpSetTimer (NTDLL.@)
2720 VOID WINAPI
TpSetTimer( TP_TIMER
*timer
, LARGE_INTEGER
*timeout
, LONG period
, LONG window_length
)
2722 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2723 struct threadpool_object
*other_timer
;
2724 BOOL submit_timer
= FALSE
;
2725 ULONGLONG timestamp
;
2727 TRACE( "%p %p %u %u\n", timer
, timeout
, period
, window_length
);
2729 RtlEnterCriticalSection( &timerqueue
.cs
);
2731 assert( this->u
.timer
.timer_initialized
);
2732 this->u
.timer
.timer_set
= timeout
!= NULL
;
2734 /* Convert relative timeout to absolute timestamp and handle a timeout
2735 * of zero, which means that the timer is submitted immediately. */
2738 timestamp
= timeout
->QuadPart
;
2739 if ((LONGLONG
)timestamp
< 0)
2742 NtQuerySystemTime( &now
);
2743 timestamp
= now
.QuadPart
- timestamp
;
2745 else if (!timestamp
)
2752 NtQuerySystemTime( &now
);
2753 timestamp
= now
.QuadPart
+ (ULONGLONG
)period
* 10000;
2755 submit_timer
= TRUE
;
2759 /* First remove existing timeout. */
2760 if (this->u
.timer
.timer_pending
)
2762 list_remove( &this->u
.timer
.timer_entry
);
2763 this->u
.timer
.timer_pending
= FALSE
;
2766 /* If the timer was enabled, then add it back to the queue. */
2769 this->u
.timer
.timeout
= timestamp
;
2770 this->u
.timer
.period
= period
;
2771 this->u
.timer
.window_length
= window_length
;
2773 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
2774 struct threadpool_object
, u
.timer
.timer_entry
)
2776 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
2777 if (this->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
2780 list_add_before( &other_timer
->u
.timer
.timer_entry
, &this->u
.timer
.timer_entry
);
2782 /* Wake up the timer thread when the timeout has to be updated. */
2783 if (list_head( &timerqueue
.pending_timers
) == &this->u
.timer
.timer_entry
)
2784 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
2786 this->u
.timer
.timer_pending
= TRUE
;
2789 RtlLeaveCriticalSection( &timerqueue
.cs
);
2792 tp_object_submit( this, FALSE
);
2795 /***********************************************************************
2796 * TpSetWait (NTDLL.@)
2798 VOID WINAPI
TpSetWait( TP_WAIT
*wait
, HANDLE handle
, LARGE_INTEGER
*timeout
)
2800 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2801 ULONGLONG timestamp
= TIMEOUT_INFINITE
;
2802 BOOL submit_wait
= FALSE
;
2804 TRACE( "%p %p %p\n", wait
, handle
, timeout
);
2806 RtlEnterCriticalSection( &waitqueue
.cs
);
2808 assert( this->u
.wait
.bucket
);
2809 this->u
.wait
.handle
= handle
;
2811 if (handle
|| this->u
.wait
.wait_pending
)
2813 struct waitqueue_bucket
*bucket
= this->u
.wait
.bucket
;
2814 list_remove( &this->u
.wait
.wait_entry
);
2816 /* Convert relative timeout to absolute timestamp. */
2817 if (handle
&& timeout
)
2819 timestamp
= timeout
->QuadPart
;
2820 if ((LONGLONG
)timestamp
< 0)
2823 NtQuerySystemTime( &now
);
2824 timestamp
= now
.QuadPart
- timestamp
;
2826 else if (!timestamp
)
2833 /* Add wait object back into one of the queues. */
2836 list_add_tail( &bucket
->waiting
, &this->u
.wait
.wait_entry
);
2837 this->u
.wait
.wait_pending
= TRUE
;
2838 this->u
.wait
.timeout
= timestamp
;
2842 list_add_tail( &bucket
->reserved
, &this->u
.wait
.wait_entry
);
2843 this->u
.wait
.wait_pending
= FALSE
;
2846 /* Wake up the wait queue thread. */
2847 NtSetEvent( bucket
->update_event
, NULL
);
2850 RtlLeaveCriticalSection( &waitqueue
.cs
);
2853 tp_object_submit( this, FALSE
);
2856 /***********************************************************************
2857 * TpSimpleTryPost (NTDLL.@)
2859 NTSTATUS WINAPI
TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback
, PVOID userdata
,
2860 TP_CALLBACK_ENVIRON
*environment
)
2862 struct threadpool_object
*object
;
2863 struct threadpool
*pool
;
2866 TRACE( "%p %p %p\n", callback
, userdata
, environment
);
2868 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2870 return STATUS_NO_MEMORY
;
2872 status
= tp_threadpool_lock( &pool
, environment
);
2875 RtlFreeHeap( GetProcessHeap(), 0, object
);
2879 object
->type
= TP_OBJECT_TYPE_SIMPLE
;
2880 object
->u
.simple
.callback
= callback
;
2881 tp_object_initialize( object
, pool
, userdata
, environment
);
2883 return STATUS_SUCCESS
;
2886 /***********************************************************************
2887 * TpWaitForTimer (NTDLL.@)
2889 VOID WINAPI
TpWaitForTimer( TP_TIMER
*timer
, BOOL cancel_pending
)
2891 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2893 TRACE( "%p %d\n", timer
, cancel_pending
);
2896 tp_object_cancel( this );
2897 tp_object_wait( this, FALSE
);
2900 /***********************************************************************
2901 * TpWaitForWait (NTDLL.@)
2903 VOID WINAPI
TpWaitForWait( TP_WAIT
*wait
, BOOL cancel_pending
)
2905 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2907 TRACE( "%p %d\n", wait
, cancel_pending
);
2910 tp_object_cancel( this );
2911 tp_object_wait( this, FALSE
);
2914 /***********************************************************************
2915 * TpWaitForWork (NTDLL.@)
2917 VOID WINAPI
TpWaitForWork( TP_WORK
*work
, BOOL cancel_pending
)
2919 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2921 TRACE( "%p %u\n", work
, cancel_pending
);
2924 tp_object_cancel( this );
2925 tp_object_wait( this, FALSE
);