4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
23 #include "wine/port.h"
29 #define NONAMELESSUNION
31 #define WIN32_NO_STATUS
34 #include "wine/debug.h"
35 #include "wine/list.h"
37 #include "ntdll_misc.h"
39 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
42 * Old thread pooling API
47 PRTL_WORK_ITEM_ROUTINE function
;
51 #define EXPIRE_NEVER (~(ULONGLONG)0)
52 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
54 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
;
59 RTL_CRITICAL_SECTION threadpool_compl_cs
;
63 NULL
, /* compl_port */
64 { &critsect_compl_debug
, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
67 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
69 0, 0, &old_threadpool
.threadpool_compl_cs
,
70 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
71 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
78 WAITORTIMERCALLBACK Callback
;
82 HANDLE CompletionEvent
;
84 BOOLEAN CallbackInProgress
;
90 struct timer_queue
*q
;
92 ULONG runcount
; /* number of callbacks pending execution */
93 RTL_WAITORTIMERCALLBACKFUNC callback
;
98 BOOL destroy
; /* timer should be deleted; once set, never unset */
99 HANDLE event
; /* removal event */
105 RTL_CRITICAL_SECTION cs
;
106 struct list timers
; /* sorted by expiration time */
107 BOOL quit
; /* queue should be deleted; once set, never unset */
113 * Object-oriented thread pooling API
116 #define THREADPOOL_WORKER_TIMEOUT 5000
117 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
119 /* internal threadpool representation */
126 /* pool of work items, locked via .cs */
128 RTL_CONDITION_VARIABLE update_event
;
129 /* information about worker threads, locked via .cs */
133 int num_busy_workers
;
136 enum threadpool_objtype
138 TP_OBJECT_TYPE_SIMPLE
,
140 TP_OBJECT_TYPE_TIMER
,
144 /* internal threadpool object representation */
145 struct threadpool_object
149 /* read-only information */
150 enum threadpool_objtype type
;
151 struct threadpool
*pool
;
152 struct threadpool_group
*group
;
154 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
;
155 PTP_SIMPLE_CALLBACK finalization_callback
;
158 /* information about the group, locked via .group->cs */
159 struct list group_entry
;
160 BOOL is_group_member
;
161 /* information about the pool, locked via .pool->cs */
162 struct list pool_entry
;
163 RTL_CONDITION_VARIABLE finished_event
;
164 RTL_CONDITION_VARIABLE group_finished_event
;
165 LONG num_pending_callbacks
;
166 LONG num_running_callbacks
;
167 LONG num_associated_callbacks
;
168 /* arguments for callback */
173 PTP_SIMPLE_CALLBACK callback
;
177 PTP_WORK_CALLBACK callback
;
181 PTP_TIMER_CALLBACK callback
;
182 /* information about the timer, locked via timerqueue.cs */
183 BOOL timer_initialized
;
185 struct list timer_entry
;
193 PTP_WAIT_CALLBACK callback
;
195 /* information about the wait object, locked via waitqueue.cs */
196 struct waitqueue_bucket
*bucket
;
198 struct list wait_entry
;
205 /* internal threadpool instance representation */
206 struct threadpool_instance
208 struct threadpool_object
*object
;
214 CRITICAL_SECTION
*critical_section
;
217 LONG semaphore_count
;
223 /* internal threadpool group representation */
224 struct threadpool_group
229 /* list of group members, locked via .cs */
233 /* global timerqueue object */
234 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
;
241 struct list pending_timers
;
242 RTL_CONDITION_VARIABLE update_event
;
246 { &timerqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
248 FALSE
, /* thread_running */
249 LIST_INIT( timerqueue
.pending_timers
), /* pending_timers */
250 RTL_CONDITION_VARIABLE_INIT
/* update_event */
253 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
=
255 0, 0, &timerqueue
.cs
,
256 { &timerqueue_debug
.ProcessLocksList
, &timerqueue_debug
.ProcessLocksList
},
257 0, 0, { (DWORD_PTR
)(__FILE__
": timerqueue.cs") }
260 /* global waitqueue object */
261 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
;
271 { &waitqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
273 LIST_INIT( waitqueue
.buckets
) /* buckets */
276 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
=
279 { &waitqueue_debug
.ProcessLocksList
, &waitqueue_debug
.ProcessLocksList
},
280 0, 0, { (DWORD_PTR
)(__FILE__
": waitqueue.cs") }
283 struct waitqueue_bucket
285 struct list bucket_entry
;
287 struct list reserved
;
292 static inline struct threadpool
*impl_from_TP_POOL( TP_POOL
*pool
)
294 return (struct threadpool
*)pool
;
297 static inline struct threadpool_object
*impl_from_TP_WORK( TP_WORK
*work
)
299 struct threadpool_object
*object
= (struct threadpool_object
*)work
;
300 assert( object
->type
== TP_OBJECT_TYPE_WORK
);
304 static inline struct threadpool_object
*impl_from_TP_TIMER( TP_TIMER
*timer
)
306 struct threadpool_object
*object
= (struct threadpool_object
*)timer
;
307 assert( object
->type
== TP_OBJECT_TYPE_TIMER
);
311 static inline struct threadpool_object
*impl_from_TP_WAIT( TP_WAIT
*wait
)
313 struct threadpool_object
*object
= (struct threadpool_object
*)wait
;
314 assert( object
->type
== TP_OBJECT_TYPE_WAIT
);
318 static inline struct threadpool_group
*impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group
)
320 return (struct threadpool_group
*)group
;
323 static inline struct threadpool_instance
*impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE
*instance
)
325 return (struct threadpool_instance
*)instance
;
328 static void CALLBACK
threadpool_worker_proc( void *param
);
329 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
);
330 static void tp_object_prepare_shutdown( struct threadpool_object
*object
);
331 static BOOL
tp_object_release( struct threadpool_object
*object
);
332 static struct threadpool
*default_threadpool
= NULL
;
334 static inline LONG
interlocked_inc( PLONG dest
)
336 return interlocked_xchg_add( dest
, 1 ) + 1;
339 static inline LONG
interlocked_dec( PLONG dest
)
341 return interlocked_xchg_add( dest
, -1 ) - 1;
344 static void CALLBACK
process_rtl_work_item( TP_CALLBACK_INSTANCE
*instance
, void *userdata
)
346 struct rtl_work_item
*item
= userdata
;
348 TRACE("executing %p(%p)\n", item
->function
, item
->context
);
349 item
->function( item
->context
);
351 RtlFreeHeap( GetProcessHeap(), 0, item
);
354 /***********************************************************************
355 * RtlQueueWorkItem (NTDLL.@)
357 * Queues a work item into a thread in the thread pool.
360 * function [I] Work function to execute.
361 * context [I] Context to pass to the work function when it is executed.
362 * flags [I] Flags. See notes.
365 * Success: STATUS_SUCCESS.
366 * Failure: Any NTSTATUS code.
369 * Flags can be one or more of the following:
370 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
371 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
372 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
373 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
374 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
376 NTSTATUS WINAPI
RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function
, PVOID context
, ULONG flags
)
378 TP_CALLBACK_ENVIRON environment
;
379 struct rtl_work_item
*item
;
382 TRACE( "%p %p %u\n", function
, context
, flags
);
384 item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item
) );
386 return STATUS_NO_MEMORY
;
388 memset( &environment
, 0, sizeof(environment
) );
389 environment
.Version
= 1;
390 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
391 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
393 item
->function
= function
;
394 item
->context
= context
;
396 status
= TpSimpleTryPost( process_rtl_work_item
, item
, &environment
);
397 if (status
) RtlFreeHeap( GetProcessHeap(), 0, item
);
401 /***********************************************************************
402 * iocp_poller - get completion events and run callbacks
404 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
410 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
412 IO_STATUS_BLOCK iosb
;
413 NTSTATUS res
= NtRemoveIoCompletion( cport
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
416 ERR("NtRemoveIoCompletion failed: 0x%x\n", res
);
420 DWORD transferred
= 0;
423 if (iosb
.u
.Status
== STATUS_SUCCESS
)
424 transferred
= iosb
.Information
;
426 err
= RtlNtStatusToDosError(iosb
.u
.Status
);
428 callback( err
, transferred
, overlapped
);
434 /***********************************************************************
435 * RtlSetIoCompletionCallback (NTDLL.@)
437 * Binds a handle to a thread pool's completion port, and possibly
438 * starts a non-I/O thread to monitor this port and call functions back.
441 * FileHandle [I] Handle to bind to a completion port.
442 * Function [I] Callback function to call on I/O completions.
443 * Flags [I] Not used.
446 * Success: STATUS_SUCCESS.
447 * Failure: Any NTSTATUS code.
450 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
452 IO_STATUS_BLOCK iosb
;
453 FILE_COMPLETION_INFORMATION info
;
455 if (Flags
) FIXME("Unknown value Flags=0x%x\n", Flags
);
457 if (!old_threadpool
.compl_port
)
459 NTSTATUS res
= STATUS_SUCCESS
;
461 RtlEnterCriticalSection(&old_threadpool
.threadpool_compl_cs
);
462 if (!old_threadpool
.compl_port
)
466 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
469 /* FIXME native can start additional threads in case of e.g. hung callback function. */
470 res
= RtlQueueWorkItem( iocp_poller
, cport
, WT_EXECUTEDEFAULT
);
472 old_threadpool
.compl_port
= cport
;
477 RtlLeaveCriticalSection(&old_threadpool
.threadpool_compl_cs
);
481 info
.CompletionPort
= old_threadpool
.compl_port
;
482 info
.CompletionKey
= (ULONG_PTR
)Function
;
484 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
487 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
489 if (timeout
== INFINITE
) return NULL
;
490 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
494 static void delete_wait_work_item(struct wait_work_item
*wait_work_item
)
496 NtClose( wait_work_item
->CancelEvent
);
497 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
500 static DWORD CALLBACK
wait_thread_proc(LPVOID Arg
)
502 struct wait_work_item
*wait_work_item
= Arg
;
504 BOOLEAN alertable
= (wait_work_item
->Flags
& WT_EXECUTEINIOTHREAD
) != 0;
505 HANDLE handles
[2] = { wait_work_item
->Object
, wait_work_item
->CancelEvent
};
506 LARGE_INTEGER timeout
;
507 HANDLE completion_event
;
513 status
= NtWaitForMultipleObjects( 2, handles
, TRUE
, alertable
,
514 get_nt_timeout( &timeout
, wait_work_item
->Milliseconds
) );
515 if (status
== STATUS_WAIT_0
|| status
== STATUS_TIMEOUT
)
517 BOOLEAN TimerOrWaitFired
;
519 if (status
== STATUS_WAIT_0
)
521 TRACE( "object %p signaled, calling callback %p with context %p\n",
522 wait_work_item
->Object
, wait_work_item
->Callback
,
523 wait_work_item
->Context
);
524 TimerOrWaitFired
= FALSE
;
528 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
529 wait_work_item
->Object
, wait_work_item
->Callback
,
530 wait_work_item
->Context
);
531 TimerOrWaitFired
= TRUE
;
533 wait_work_item
->CallbackInProgress
= TRUE
;
534 wait_work_item
->Callback( wait_work_item
->Context
, TimerOrWaitFired
);
535 wait_work_item
->CallbackInProgress
= FALSE
;
537 if (wait_work_item
->Flags
& WT_EXECUTEONLYONCE
)
540 else if (status
!= STATUS_USER_APC
)
545 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
547 completion_event
= wait_work_item
->CompletionEvent
;
548 delete_wait_work_item( wait_work_item
);
549 if (completion_event
) NtSetEvent( completion_event
, NULL
);
555 /***********************************************************************
556 * RtlRegisterWait (NTDLL.@)
558 * Registers a wait for a handle to become signaled.
561 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
562 * Object [I] Object to wait to become signaled.
563 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
564 * Context [I] Context to pass to the callback function when it is executed.
565 * Milliseconds [I] Number of milliseconds to wait before timing out.
566 * Flags [I] Flags. See notes.
569 * Success: STATUS_SUCCESS.
570 * Failure: Any NTSTATUS code.
573 * Flags can be one or more of the following:
574 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
575 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
576 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
577 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
578 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
580 NTSTATUS WINAPI
RtlRegisterWait(PHANDLE NewWaitObject
, HANDLE Object
,
581 RTL_WAITORTIMERCALLBACKFUNC Callback
,
582 PVOID Context
, ULONG Milliseconds
, ULONG Flags
)
584 struct wait_work_item
*wait_work_item
;
587 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject
, Object
, Callback
, Context
, Milliseconds
, Flags
);
589 wait_work_item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item
) );
591 return STATUS_NO_MEMORY
;
593 wait_work_item
->Object
= Object
;
594 wait_work_item
->Callback
= Callback
;
595 wait_work_item
->Context
= Context
;
596 wait_work_item
->Milliseconds
= Milliseconds
;
597 wait_work_item
->Flags
= Flags
;
598 wait_work_item
->CallbackInProgress
= FALSE
;
599 wait_work_item
->DeleteCount
= 0;
600 wait_work_item
->CompletionEvent
= NULL
;
602 status
= NtCreateEvent( &wait_work_item
->CancelEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
603 if (status
!= STATUS_SUCCESS
)
605 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
609 Flags
= Flags
& (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
|
610 WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
);
611 status
= RtlQueueWorkItem( wait_thread_proc
, wait_work_item
, Flags
);
612 if (status
!= STATUS_SUCCESS
)
614 delete_wait_work_item( wait_work_item
);
618 *NewWaitObject
= wait_work_item
;
622 /***********************************************************************
623 * RtlDeregisterWaitEx (NTDLL.@)
625 * Cancels a wait operation and frees the resources associated with calling
629 * WaitObject [I] Handle to the wait object to free.
632 * Success: STATUS_SUCCESS.
633 * Failure: Any NTSTATUS code.
635 NTSTATUS WINAPI
RtlDeregisterWaitEx(HANDLE WaitHandle
, HANDLE CompletionEvent
)
637 struct wait_work_item
*wait_work_item
= WaitHandle
;
639 HANDLE LocalEvent
= NULL
;
640 BOOLEAN CallbackInProgress
;
642 TRACE( "(%p %p)\n", WaitHandle
, CompletionEvent
);
644 if (WaitHandle
== NULL
)
645 return STATUS_INVALID_HANDLE
;
647 CallbackInProgress
= wait_work_item
->CallbackInProgress
;
648 if (CompletionEvent
== INVALID_HANDLE_VALUE
|| !CallbackInProgress
)
650 status
= NtCreateEvent( &LocalEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
651 if (status
!= STATUS_SUCCESS
)
653 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, LocalEvent
);
655 else if (CompletionEvent
!= NULL
)
657 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, CompletionEvent
);
660 NtSetEvent( wait_work_item
->CancelEvent
, NULL
);
662 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
664 status
= STATUS_SUCCESS
;
665 delete_wait_work_item( wait_work_item
);
669 NtWaitForSingleObject( LocalEvent
, FALSE
, NULL
);
670 status
= STATUS_SUCCESS
;
674 status
= STATUS_PENDING
;
678 NtClose( LocalEvent
);
683 /***********************************************************************
684 * RtlDeregisterWait (NTDLL.@)
686 * Cancels a wait operation and frees the resources associated with calling
690 * WaitObject [I] Handle to the wait object to free.
693 * Success: STATUS_SUCCESS.
694 * Failure: Any NTSTATUS code.
696 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
698 return RtlDeregisterWaitEx(WaitHandle
, NULL
);
702 /************************** Timer Queue Impl **************************/
704 static void queue_remove_timer(struct queue_timer
*t
)
706 /* We MUST hold the queue cs while calling this function. This ensures
707 that we cannot queue another callback for this timer. The runcount
708 being zero makes sure we don't have any already queued. */
709 struct timer_queue
*q
= t
->q
;
711 assert(t
->runcount
== 0);
714 list_remove(&t
->entry
);
716 NtSetEvent(t
->event
, NULL
);
717 RtlFreeHeap(GetProcessHeap(), 0, t
);
719 if (q
->quit
&& list_empty(&q
->timers
))
720 NtSetEvent(q
->event
, NULL
);
723 static void timer_cleanup_callback(struct queue_timer
*t
)
725 struct timer_queue
*q
= t
->q
;
726 RtlEnterCriticalSection(&q
->cs
);
728 assert(0 < t
->runcount
);
731 if (t
->destroy
&& t
->runcount
== 0)
732 queue_remove_timer(t
);
734 RtlLeaveCriticalSection(&q
->cs
);
737 static DWORD WINAPI
timer_callback_wrapper(LPVOID p
)
739 struct queue_timer
*t
= p
;
740 t
->callback(t
->param
, TRUE
);
741 timer_cleanup_callback(t
);
745 static inline ULONGLONG
queue_current_time(void)
747 LARGE_INTEGER now
, freq
;
748 NtQueryPerformanceCounter(&now
, &freq
);
749 return now
.QuadPart
* 1000 / freq
.QuadPart
;
752 static void queue_add_timer(struct queue_timer
*t
, ULONGLONG time
,
755 /* We MUST hold the queue cs while calling this function. */
756 struct timer_queue
*q
= t
->q
;
757 struct list
*ptr
= &q
->timers
;
759 assert(!q
->quit
|| (t
->destroy
&& time
== EXPIRE_NEVER
));
761 if (time
!= EXPIRE_NEVER
)
762 LIST_FOR_EACH(ptr
, &q
->timers
)
764 struct queue_timer
*cur
= LIST_ENTRY(ptr
, struct queue_timer
, entry
);
765 if (time
< cur
->expire
)
768 list_add_before(ptr
, &t
->entry
);
772 /* If we insert at the head of the list, we need to expire sooner
774 if (set_event
&& &t
->entry
== list_head(&q
->timers
))
775 NtSetEvent(q
->event
, NULL
);
778 static inline void queue_move_timer(struct queue_timer
*t
, ULONGLONG time
,
781 /* We MUST hold the queue cs while calling this function. */
782 list_remove(&t
->entry
);
783 queue_add_timer(t
, time
, set_event
);
786 static void queue_timer_expire(struct timer_queue
*q
)
788 struct queue_timer
*t
= NULL
;
790 RtlEnterCriticalSection(&q
->cs
);
791 if (list_head(&q
->timers
))
794 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
795 if (!t
->destroy
&& t
->expire
<= ((now
= queue_current_time())))
800 next
= t
->expire
+ t
->period
;
801 /* avoid trigger cascade if overloaded / hibernated */
803 next
= now
+ t
->period
;
807 queue_move_timer(t
, next
, FALSE
);
812 RtlLeaveCriticalSection(&q
->cs
);
816 if (t
->flags
& WT_EXECUTEINTIMERTHREAD
)
817 timer_callback_wrapper(t
);
822 & (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
823 | WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
));
824 NTSTATUS status
= RtlQueueWorkItem(timer_callback_wrapper
, t
, flags
);
825 if (status
!= STATUS_SUCCESS
)
826 timer_cleanup_callback(t
);
831 static ULONG
queue_get_timeout(struct timer_queue
*q
)
833 struct queue_timer
*t
;
834 ULONG timeout
= INFINITE
;
836 RtlEnterCriticalSection(&q
->cs
);
837 if (list_head(&q
->timers
))
839 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
840 assert(!t
->destroy
|| t
->expire
== EXPIRE_NEVER
);
842 if (t
->expire
!= EXPIRE_NEVER
)
844 ULONGLONG time
= queue_current_time();
845 timeout
= t
->expire
< time
? 0 : t
->expire
- time
;
848 RtlLeaveCriticalSection(&q
->cs
);
853 static void WINAPI
timer_queue_thread_proc(LPVOID p
)
855 struct timer_queue
*q
= p
;
858 timeout_ms
= INFINITE
;
861 LARGE_INTEGER timeout
;
865 status
= NtWaitForSingleObject(
866 q
->event
, FALSE
, get_nt_timeout(&timeout
, timeout_ms
));
868 if (status
== STATUS_WAIT_0
)
870 /* There are two possible ways to trigger the event. Either
871 we are quitting and the last timer got removed, or a new
872 timer got put at the head of the list so we need to adjust
874 RtlEnterCriticalSection(&q
->cs
);
875 if (q
->quit
&& list_empty(&q
->timers
))
877 RtlLeaveCriticalSection(&q
->cs
);
879 else if (status
== STATUS_TIMEOUT
)
880 queue_timer_expire(q
);
885 timeout_ms
= queue_get_timeout(q
);
889 RtlDeleteCriticalSection(&q
->cs
);
891 RtlFreeHeap(GetProcessHeap(), 0, q
);
892 RtlExitUserThread( 0 );
895 static void queue_destroy_timer(struct queue_timer
*t
)
897 /* We MUST hold the queue cs while calling this function. */
899 if (t
->runcount
== 0)
900 /* Ensure a timer is promptly removed. If callbacks are pending,
901 it will be removed after the last one finishes by the callback
903 queue_remove_timer(t
);
905 /* Make sure no destroyed timer masks an active timer at the head
906 of the sorted list. */
907 queue_move_timer(t
, EXPIRE_NEVER
, FALSE
);
910 /***********************************************************************
911 * RtlCreateTimerQueue (NTDLL.@)
913 * Creates a timer queue object and returns a handle to it.
916 * NewTimerQueue [O] The newly created queue.
919 * Success: STATUS_SUCCESS.
920 * Failure: Any NTSTATUS code.
922 NTSTATUS WINAPI
RtlCreateTimerQueue(PHANDLE NewTimerQueue
)
925 struct timer_queue
*q
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q
);
927 return STATUS_NO_MEMORY
;
929 RtlInitializeCriticalSection(&q
->cs
);
930 list_init(&q
->timers
);
932 q
->magic
= TIMER_QUEUE_MAGIC
;
933 status
= NtCreateEvent(&q
->event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
934 if (status
!= STATUS_SUCCESS
)
936 RtlFreeHeap(GetProcessHeap(), 0, q
);
939 status
= RtlCreateUserThread(GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
940 timer_queue_thread_proc
, q
, &q
->thread
, NULL
);
941 if (status
!= STATUS_SUCCESS
)
944 RtlFreeHeap(GetProcessHeap(), 0, q
);
949 return STATUS_SUCCESS
;
952 /***********************************************************************
953 * RtlDeleteTimerQueueEx (NTDLL.@)
955 * Deletes a timer queue object.
958 * TimerQueue [I] The timer queue to destroy.
959 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
960 * wait until all timers are finished firing before
961 * returning. Otherwise, return immediately and set the
962 * event when all timers are done.
965 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
966 * Failure: Any NTSTATUS code.
968 NTSTATUS WINAPI
RtlDeleteTimerQueueEx(HANDLE TimerQueue
, HANDLE CompletionEvent
)
970 struct timer_queue
*q
= TimerQueue
;
971 struct queue_timer
*t
, *temp
;
975 if (!q
|| q
->magic
!= TIMER_QUEUE_MAGIC
)
976 return STATUS_INVALID_HANDLE
;
980 RtlEnterCriticalSection(&q
->cs
);
982 if (list_head(&q
->timers
))
983 /* When the last timer is removed, it will signal the timer thread to
985 LIST_FOR_EACH_ENTRY_SAFE(t
, temp
, &q
->timers
, struct queue_timer
, entry
)
986 queue_destroy_timer(t
);
988 /* However if we have none, we must do it ourselves. */
989 NtSetEvent(q
->event
, NULL
);
990 RtlLeaveCriticalSection(&q
->cs
);
992 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
994 NtWaitForSingleObject(thread
, FALSE
, NULL
);
995 status
= STATUS_SUCCESS
;
1001 FIXME("asynchronous return on completion event unimplemented\n");
1002 NtWaitForSingleObject(thread
, FALSE
, NULL
);
1003 NtSetEvent(CompletionEvent
, NULL
);
1005 status
= STATUS_PENDING
;
1012 static struct timer_queue
*get_timer_queue(HANDLE TimerQueue
)
1014 static struct timer_queue
*default_timer_queue
;
1020 if (!default_timer_queue
)
1023 NTSTATUS status
= RtlCreateTimerQueue(&q
);
1024 if (status
== STATUS_SUCCESS
)
1026 PVOID p
= interlocked_cmpxchg_ptr(
1027 (void **) &default_timer_queue
, q
, NULL
);
1029 /* Got beat to the punch. */
1030 RtlDeleteTimerQueueEx(q
, NULL
);
1033 return default_timer_queue
;
1037 /***********************************************************************
1038 * RtlCreateTimer (NTDLL.@)
1040 * Creates a new timer associated with the given queue.
1043 * NewTimer [O] The newly created timer.
1044 * TimerQueue [I] The queue to hold the timer.
1045 * Callback [I] The callback to fire.
1046 * Parameter [I] The argument for the callback.
1047 * DueTime [I] The delay, in milliseconds, before first firing the
1049 * Period [I] The period, in milliseconds, at which to fire the timer
1050 * after the first callback. If zero, the timer will only
1051 * fire once. It still needs to be deleted with
1053 * Flags [I] Flags controlling the execution of the callback. In
1054 * addition to the WT_* thread pool flags (see
1055 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1056 * WT_EXECUTEONLYONCE are supported.
1059 * Success: STATUS_SUCCESS.
1060 * Failure: Any NTSTATUS code.
1062 NTSTATUS WINAPI
RtlCreateTimer(PHANDLE NewTimer
, HANDLE TimerQueue
,
1063 RTL_WAITORTIMERCALLBACKFUNC Callback
,
1064 PVOID Parameter
, DWORD DueTime
, DWORD Period
,
1068 struct queue_timer
*t
;
1069 struct timer_queue
*q
= get_timer_queue(TimerQueue
);
1071 if (!q
) return STATUS_NO_MEMORY
;
1072 if (q
->magic
!= TIMER_QUEUE_MAGIC
) return STATUS_INVALID_HANDLE
;
1074 t
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t
);
1076 return STATUS_NO_MEMORY
;
1080 t
->callback
= Callback
;
1081 t
->param
= Parameter
;
1087 status
= STATUS_SUCCESS
;
1088 RtlEnterCriticalSection(&q
->cs
);
1090 status
= STATUS_INVALID_HANDLE
;
1092 queue_add_timer(t
, queue_current_time() + DueTime
, TRUE
);
1093 RtlLeaveCriticalSection(&q
->cs
);
1095 if (status
== STATUS_SUCCESS
)
1098 RtlFreeHeap(GetProcessHeap(), 0, t
);
1103 /***********************************************************************
1104 * RtlUpdateTimer (NTDLL.@)
1106 * Changes the time at which a timer expires.
1109 * TimerQueue [I] The queue that holds the timer.
1110 * Timer [I] The timer to update.
1111 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1112 * Period [I] The period, in milliseconds, at which to fire the timer
1113 * after the first callback. If zero, the timer will not
1114 * refire once. It still needs to be deleted with
1118 * Success: STATUS_SUCCESS.
1119 * Failure: Any NTSTATUS code.
1121 NTSTATUS WINAPI
RtlUpdateTimer(HANDLE TimerQueue
, HANDLE Timer
,
1122 DWORD DueTime
, DWORD Period
)
1124 struct queue_timer
*t
= Timer
;
1125 struct timer_queue
*q
= t
->q
;
1127 RtlEnterCriticalSection(&q
->cs
);
1128 /* Can't change a timer if it was once-only or destroyed. */
1129 if (t
->expire
!= EXPIRE_NEVER
)
1132 queue_move_timer(t
, queue_current_time() + DueTime
, TRUE
);
1134 RtlLeaveCriticalSection(&q
->cs
);
1136 return STATUS_SUCCESS
;
1139 /***********************************************************************
1140 * RtlDeleteTimer (NTDLL.@)
1142 * Cancels a timer-queue timer.
1145 * TimerQueue [I] The queue that holds the timer.
1146 * Timer [I] The timer to update.
1147 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1148 * wait until the timer is finished firing all pending
1149 * callbacks before returning. Otherwise, return
1150 * immediately and set the timer is done.
1153 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1154 or if the completion event is NULL.
1155 * Failure: Any NTSTATUS code.
1157 NTSTATUS WINAPI
RtlDeleteTimer(HANDLE TimerQueue
, HANDLE Timer
,
1158 HANDLE CompletionEvent
)
1160 struct queue_timer
*t
= Timer
;
1161 struct timer_queue
*q
;
1162 NTSTATUS status
= STATUS_PENDING
;
1163 HANDLE event
= NULL
;
1166 return STATUS_INVALID_PARAMETER_1
;
1168 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1170 status
= NtCreateEvent(&event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1171 if (status
== STATUS_SUCCESS
)
1172 status
= STATUS_PENDING
;
1174 else if (CompletionEvent
)
1175 event
= CompletionEvent
;
1177 RtlEnterCriticalSection(&q
->cs
);
1179 if (t
->runcount
== 0 && event
)
1180 status
= STATUS_SUCCESS
;
1181 queue_destroy_timer(t
);
1182 RtlLeaveCriticalSection(&q
->cs
);
1184 if (CompletionEvent
== INVALID_HANDLE_VALUE
&& event
)
1186 if (status
== STATUS_PENDING
)
1188 NtWaitForSingleObject(event
, FALSE
, NULL
);
1189 status
= STATUS_SUCCESS
;
1197 /***********************************************************************
1198 * timerqueue_thread_proc (internal)
1200 static void CALLBACK
timerqueue_thread_proc( void *param
)
1202 ULONGLONG timeout_lower
, timeout_upper
, new_timeout
;
1203 struct threadpool_object
*other_timer
;
1204 LARGE_INTEGER now
, timeout
;
1207 TRACE( "starting timer queue thread\n" );
1209 RtlEnterCriticalSection( &timerqueue
.cs
);
1212 NtQuerySystemTime( &now
);
1214 /* Check for expired timers. */
1215 while ((ptr
= list_head( &timerqueue
.pending_timers
)))
1217 struct threadpool_object
*timer
= LIST_ENTRY( ptr
, struct threadpool_object
, u
.timer
.timer_entry
);
1218 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1219 assert( timer
->u
.timer
.timer_pending
);
1220 if (timer
->u
.timer
.timeout
> now
.QuadPart
)
1223 /* Queue a new callback in one of the worker threads. */
1224 list_remove( &timer
->u
.timer
.timer_entry
);
1225 timer
->u
.timer
.timer_pending
= FALSE
;
1226 tp_object_submit( timer
, FALSE
);
1228 /* Insert the timer back into the queue, except it's marked for shutdown. */
1229 if (timer
->u
.timer
.period
&& !timer
->shutdown
)
1231 timer
->u
.timer
.timeout
+= (ULONGLONG
)timer
->u
.timer
.period
* 10000;
1232 if (timer
->u
.timer
.timeout
<= now
.QuadPart
)
1233 timer
->u
.timer
.timeout
= now
.QuadPart
+ 1;
1235 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1236 struct threadpool_object
, u
.timer
.timer_entry
)
1238 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1239 if (timer
->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
1242 list_add_before( &other_timer
->u
.timer
.timer_entry
, &timer
->u
.timer
.timer_entry
);
1243 timer
->u
.timer
.timer_pending
= TRUE
;
1247 timeout_lower
= TIMEOUT_INFINITE
;
1248 timeout_upper
= TIMEOUT_INFINITE
;
1250 /* Determine next timeout and use the window length to optimize wakeup times. */
1251 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1252 struct threadpool_object
, u
.timer
.timer_entry
)
1254 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1255 if (other_timer
->u
.timer
.timeout
>= timeout_upper
)
1258 timeout_lower
= other_timer
->u
.timer
.timeout
;
1259 new_timeout
= timeout_lower
+ (ULONGLONG
)other_timer
->u
.timer
.window_length
* 10000;
1260 if (new_timeout
< timeout_upper
)
1261 timeout_upper
= new_timeout
;
1264 /* Wait for timer update events or until the next timer expires. */
1265 if (timerqueue
.objcount
)
1267 timeout
.QuadPart
= timeout_lower
;
1268 RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
, &timeout
);
1272 /* All timers have been destroyed, if no new timers are created
1273 * within some amount of time, then we can shutdown this thread. */
1274 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1275 if (RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
,
1276 &timeout
) == STATUS_TIMEOUT
&& !timerqueue
.objcount
)
1282 timerqueue
.thread_running
= FALSE
;
1283 RtlLeaveCriticalSection( &timerqueue
.cs
);
1285 TRACE( "terminating timer queue thread\n" );
1286 RtlExitUserThread( 0 );
1289 /***********************************************************************
1290 * tp_new_worker_thread (internal)
1292 * Create and account a new worker thread for the desired pool.
1294 static NTSTATUS
tp_new_worker_thread( struct threadpool
*pool
)
1299 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1300 threadpool_worker_proc
, pool
, &thread
, NULL
);
1301 if (status
== STATUS_SUCCESS
)
1303 interlocked_inc( &pool
->refcount
);
1304 pool
->num_workers
++;
1305 pool
->num_busy_workers
++;
1311 /***********************************************************************
1312 * tp_timerqueue_lock (internal)
1314 * Acquires a lock on the global timerqueue. When the lock is acquired
1315 * successfully, it is guaranteed that the timer thread is running.
1317 static NTSTATUS
tp_timerqueue_lock( struct threadpool_object
*timer
)
1319 NTSTATUS status
= STATUS_SUCCESS
;
1320 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1322 timer
->u
.timer
.timer_initialized
= FALSE
;
1323 timer
->u
.timer
.timer_pending
= FALSE
;
1324 timer
->u
.timer
.timer_set
= FALSE
;
1325 timer
->u
.timer
.timeout
= 0;
1326 timer
->u
.timer
.period
= 0;
1327 timer
->u
.timer
.window_length
= 0;
1329 RtlEnterCriticalSection( &timerqueue
.cs
);
1331 /* Make sure that the timerqueue thread is running. */
1332 if (!timerqueue
.thread_running
)
1335 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1336 timerqueue_thread_proc
, NULL
, &thread
, NULL
);
1337 if (status
== STATUS_SUCCESS
)
1339 timerqueue
.thread_running
= TRUE
;
1344 if (status
== STATUS_SUCCESS
)
1346 timer
->u
.timer
.timer_initialized
= TRUE
;
1347 timerqueue
.objcount
++;
1350 RtlLeaveCriticalSection( &timerqueue
.cs
);
1354 /***********************************************************************
1355 * tp_timerqueue_unlock (internal)
1357 * Releases a lock on the global timerqueue.
1359 static void tp_timerqueue_unlock( struct threadpool_object
*timer
)
1361 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1363 RtlEnterCriticalSection( &timerqueue
.cs
);
1364 if (timer
->u
.timer
.timer_initialized
)
1366 /* If timer was pending, remove it. */
1367 if (timer
->u
.timer
.timer_pending
)
1369 list_remove( &timer
->u
.timer
.timer_entry
);
1370 timer
->u
.timer
.timer_pending
= FALSE
;
1373 /* If the last timer object was destroyed, then wake up the thread. */
1374 if (!--timerqueue
.objcount
)
1376 assert( list_empty( &timerqueue
.pending_timers
) );
1377 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
1380 timer
->u
.timer
.timer_initialized
= FALSE
;
1382 RtlLeaveCriticalSection( &timerqueue
.cs
);
1385 /***********************************************************************
1386 * waitqueue_thread_proc (internal)
1388 static void CALLBACK
waitqueue_thread_proc( void *param
)
1390 struct threadpool_object
*objects
[MAXIMUM_WAITQUEUE_OBJECTS
];
1391 HANDLE handles
[MAXIMUM_WAITQUEUE_OBJECTS
+ 1];
1392 struct waitqueue_bucket
*bucket
= param
;
1393 struct threadpool_object
*wait
, *next
;
1394 LARGE_INTEGER now
, timeout
;
1398 TRACE( "starting wait queue thread\n" );
1400 RtlEnterCriticalSection( &waitqueue
.cs
);
1404 NtQuerySystemTime( &now
);
1405 timeout
.QuadPart
= TIMEOUT_INFINITE
;
1408 LIST_FOR_EACH_ENTRY_SAFE( wait
, next
, &bucket
->waiting
, struct threadpool_object
,
1411 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1412 if (wait
->u
.wait
.timeout
<= now
.QuadPart
)
1414 /* Wait object timed out. */
1415 list_remove( &wait
->u
.wait
.wait_entry
);
1416 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1417 tp_object_submit( wait
, FALSE
);
1421 if (wait
->u
.wait
.timeout
< timeout
.QuadPart
)
1422 timeout
.QuadPart
= wait
->u
.wait
.timeout
;
1424 assert( num_handles
< MAXIMUM_WAITQUEUE_OBJECTS
);
1425 interlocked_inc( &wait
->refcount
);
1426 objects
[num_handles
] = wait
;
1427 handles
[num_handles
] = wait
->u
.wait
.handle
;
1432 if (!bucket
->objcount
)
1434 /* All wait objects have been destroyed, if no new wait objects are created
1435 * within some amount of time, then we can shutdown this thread. */
1436 assert( num_handles
== 0 );
1437 RtlLeaveCriticalSection( &waitqueue
.cs
);
1438 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1439 status
= NtWaitForMultipleObjects( 1, &bucket
->update_event
, TRUE
, FALSE
, &timeout
);
1440 RtlEnterCriticalSection( &waitqueue
.cs
);
1442 if (status
== STATUS_TIMEOUT
&& !bucket
->objcount
)
1447 handles
[num_handles
] = bucket
->update_event
;
1448 RtlLeaveCriticalSection( &waitqueue
.cs
);
1449 status
= NtWaitForMultipleObjects( num_handles
+ 1, handles
, TRUE
, FALSE
, &timeout
);
1450 RtlEnterCriticalSection( &waitqueue
.cs
);
1452 if (status
>= STATUS_WAIT_0
&& status
< STATUS_WAIT_0
+ num_handles
)
1454 wait
= objects
[status
- STATUS_WAIT_0
];
1455 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1456 if (wait
->u
.wait
.bucket
)
1458 /* Wait object signaled. */
1459 assert( wait
->u
.wait
.bucket
== bucket
);
1460 list_remove( &wait
->u
.wait
.wait_entry
);
1461 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1462 tp_object_submit( wait
, TRUE
);
1465 WARN("wait object %p triggered while object was destroyed\n", wait
);
1468 /* Release temporary references to wait objects. */
1471 wait
= objects
[--num_handles
];
1472 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1473 tp_object_release( wait
);
1477 /* Try to merge bucket with other threads. */
1478 if (waitqueue
.num_buckets
> 1 && bucket
->objcount
&&
1479 bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 1 / 3)
1481 struct waitqueue_bucket
*other_bucket
;
1482 LIST_FOR_EACH_ENTRY( other_bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1484 if (other_bucket
!= bucket
&& other_bucket
->objcount
&&
1485 other_bucket
->objcount
+ bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 2 / 3)
1487 other_bucket
->objcount
+= bucket
->objcount
;
1488 bucket
->objcount
= 0;
1490 /* Update reserved list. */
1491 LIST_FOR_EACH_ENTRY( wait
, &bucket
->reserved
, struct threadpool_object
, u
.wait
.wait_entry
)
1493 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1494 wait
->u
.wait
.bucket
= other_bucket
;
1496 list_move_tail( &other_bucket
->reserved
, &bucket
->reserved
);
1498 /* Update waiting list. */
1499 LIST_FOR_EACH_ENTRY( wait
, &bucket
->waiting
, struct threadpool_object
, u
.wait
.wait_entry
)
1501 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1502 wait
->u
.wait
.bucket
= other_bucket
;
1504 list_move_tail( &other_bucket
->waiting
, &bucket
->waiting
);
1506 /* Move bucket to the end, to keep the probability of
1507 * newly added wait objects as small as possible. */
1508 list_remove( &bucket
->bucket_entry
);
1509 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1511 NtSetEvent( other_bucket
->update_event
, NULL
);
1518 /* Remove this bucket from the list. */
1519 list_remove( &bucket
->bucket_entry
);
1520 if (!--waitqueue
.num_buckets
)
1521 assert( list_empty( &waitqueue
.buckets
) );
1523 RtlLeaveCriticalSection( &waitqueue
.cs
);
1525 TRACE( "terminating wait queue thread\n" );
1527 assert( bucket
->objcount
== 0 );
1528 assert( list_empty( &bucket
->reserved
) );
1529 assert( list_empty( &bucket
->waiting
) );
1530 NtClose( bucket
->update_event
);
1532 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1533 RtlExitUserThread( 0 );
1536 /***********************************************************************
1537 * tp_waitqueue_lock (internal)
1539 static NTSTATUS
tp_waitqueue_lock( struct threadpool_object
*wait
)
1541 struct waitqueue_bucket
*bucket
;
1544 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1546 wait
->u
.wait
.signaled
= 0;
1547 wait
->u
.wait
.bucket
= NULL
;
1548 wait
->u
.wait
.wait_pending
= FALSE
;
1549 wait
->u
.wait
.timeout
= 0;
1550 wait
->u
.wait
.handle
= INVALID_HANDLE_VALUE
;
1552 RtlEnterCriticalSection( &waitqueue
.cs
);
1554 /* Try to assign to existing bucket if possible. */
1555 LIST_FOR_EACH_ENTRY( bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1557 if (bucket
->objcount
< MAXIMUM_WAITQUEUE_OBJECTS
)
1559 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1560 wait
->u
.wait
.bucket
= bucket
;
1563 status
= STATUS_SUCCESS
;
1568 /* Create a new bucket and corresponding worker thread. */
1569 bucket
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket
) );
1572 status
= STATUS_NO_MEMORY
;
1576 bucket
->objcount
= 0;
1577 list_init( &bucket
->reserved
);
1578 list_init( &bucket
->waiting
);
1580 status
= NtCreateEvent( &bucket
->update_event
, EVENT_ALL_ACCESS
,
1581 NULL
, SynchronizationEvent
, FALSE
);
1584 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1588 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1589 waitqueue_thread_proc
, bucket
, &thread
, NULL
);
1590 if (status
== STATUS_SUCCESS
)
1592 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1593 waitqueue
.num_buckets
++;
1595 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1596 wait
->u
.wait
.bucket
= bucket
;
1603 NtClose( bucket
->update_event
);
1604 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1608 RtlLeaveCriticalSection( &waitqueue
.cs
);
1612 /***********************************************************************
1613 * tp_waitqueue_unlock (internal)
1615 static void tp_waitqueue_unlock( struct threadpool_object
*wait
)
1617 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1619 RtlEnterCriticalSection( &waitqueue
.cs
);
1620 if (wait
->u
.wait
.bucket
)
1622 struct waitqueue_bucket
*bucket
= wait
->u
.wait
.bucket
;
1623 assert( bucket
->objcount
> 0 );
1625 list_remove( &wait
->u
.wait
.wait_entry
);
1626 wait
->u
.wait
.bucket
= NULL
;
1629 NtSetEvent( bucket
->update_event
, NULL
);
1631 RtlLeaveCriticalSection( &waitqueue
.cs
);
1634 /***********************************************************************
1635 * tp_threadpool_alloc (internal)
1637 * Allocates a new threadpool object.
1639 static NTSTATUS
tp_threadpool_alloc( struct threadpool
**out
)
1641 struct threadpool
*pool
;
1643 pool
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool
) );
1645 return STATUS_NO_MEMORY
;
1649 pool
->shutdown
= FALSE
;
1651 RtlInitializeCriticalSection( &pool
->cs
);
1652 pool
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool.cs");
1654 list_init( &pool
->pool
);
1655 RtlInitializeConditionVariable( &pool
->update_event
);
1657 pool
->max_workers
= 500;
1658 pool
->min_workers
= 0;
1659 pool
->num_workers
= 0;
1660 pool
->num_busy_workers
= 0;
1662 TRACE( "allocated threadpool %p\n", pool
);
1665 return STATUS_SUCCESS
;
1668 /***********************************************************************
1669 * tp_threadpool_shutdown (internal)
1671 * Prepares the shutdown of a threadpool object and notifies all worker
1672 * threads to terminate (after all remaining work items have been
1675 static void tp_threadpool_shutdown( struct threadpool
*pool
)
1677 assert( pool
!= default_threadpool
);
1679 pool
->shutdown
= TRUE
;
1680 RtlWakeAllConditionVariable( &pool
->update_event
);
1683 /***********************************************************************
1684 * tp_threadpool_release (internal)
1686 * Releases a reference to a threadpool object.
1688 static BOOL
tp_threadpool_release( struct threadpool
*pool
)
1690 if (interlocked_dec( &pool
->refcount
))
1693 TRACE( "destroying threadpool %p\n", pool
);
1695 assert( pool
->shutdown
);
1696 assert( !pool
->objcount
);
1697 assert( list_empty( &pool
->pool
) );
1699 pool
->cs
.DebugInfo
->Spare
[0] = 0;
1700 RtlDeleteCriticalSection( &pool
->cs
);
1702 RtlFreeHeap( GetProcessHeap(), 0, pool
);
1706 /***********************************************************************
1707 * tp_threadpool_lock (internal)
1709 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1710 * block. When the lock is acquired successfully, it is guaranteed that
1711 * there is at least one worker thread to process tasks.
1713 static NTSTATUS
tp_threadpool_lock( struct threadpool
**out
, TP_CALLBACK_ENVIRON
*environment
)
1715 struct threadpool
*pool
= NULL
;
1716 NTSTATUS status
= STATUS_SUCCESS
;
1719 pool
= (struct threadpool
*)environment
->Pool
;
1723 if (!default_threadpool
)
1725 status
= tp_threadpool_alloc( &pool
);
1726 if (status
!= STATUS_SUCCESS
)
1729 if (interlocked_cmpxchg_ptr( (void *)&default_threadpool
, pool
, NULL
) != NULL
)
1731 tp_threadpool_shutdown( pool
);
1732 tp_threadpool_release( pool
);
1736 pool
= default_threadpool
;
1739 RtlEnterCriticalSection( &pool
->cs
);
1741 /* Make sure that the threadpool has at least one thread. */
1742 if (!pool
->num_workers
)
1743 status
= tp_new_worker_thread( pool
);
1745 /* Keep a reference, and increment objcount to ensure that the
1746 * last thread doesn't terminate. */
1747 if (status
== STATUS_SUCCESS
)
1749 interlocked_inc( &pool
->refcount
);
1753 RtlLeaveCriticalSection( &pool
->cs
);
1755 if (status
!= STATUS_SUCCESS
)
1759 return STATUS_SUCCESS
;
1762 /***********************************************************************
1763 * tp_threadpool_unlock (internal)
1765 * Releases a lock on a threadpool.
1767 static void tp_threadpool_unlock( struct threadpool
*pool
)
1769 RtlEnterCriticalSection( &pool
->cs
);
1771 RtlLeaveCriticalSection( &pool
->cs
);
1772 tp_threadpool_release( pool
);
1775 /***********************************************************************
1776 * tp_group_alloc (internal)
1778 * Allocates a new threadpool group object.
1780 static NTSTATUS
tp_group_alloc( struct threadpool_group
**out
)
1782 struct threadpool_group
*group
;
1784 group
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group
) );
1786 return STATUS_NO_MEMORY
;
1788 group
->refcount
= 1;
1789 group
->shutdown
= FALSE
;
1791 RtlInitializeCriticalSection( &group
->cs
);
1792 group
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool_group.cs");
1794 list_init( &group
->members
);
1796 TRACE( "allocated group %p\n", group
);
1799 return STATUS_SUCCESS
;
1802 /***********************************************************************
1803 * tp_group_shutdown (internal)
1805 * Marks the group object for shutdown.
1807 static void tp_group_shutdown( struct threadpool_group
*group
)
1809 group
->shutdown
= TRUE
;
1812 /***********************************************************************
1813 * tp_group_release (internal)
1815 * Releases a reference to a group object.
1817 static BOOL
tp_group_release( struct threadpool_group
*group
)
1819 if (interlocked_dec( &group
->refcount
))
1822 TRACE( "destroying group %p\n", group
);
1824 assert( group
->shutdown
);
1825 assert( list_empty( &group
->members
) );
1827 group
->cs
.DebugInfo
->Spare
[0] = 0;
1828 RtlDeleteCriticalSection( &group
->cs
);
1830 RtlFreeHeap( GetProcessHeap(), 0, group
);
1834 /***********************************************************************
1835 * tp_object_initialize (internal)
1837 * Initializes members of a threadpool object.
1839 static void tp_object_initialize( struct threadpool_object
*object
, struct threadpool
*pool
,
1840 PVOID userdata
, TP_CALLBACK_ENVIRON
*environment
)
1842 BOOL is_simple_callback
= (object
->type
== TP_OBJECT_TYPE_SIMPLE
);
1844 object
->refcount
= 1;
1845 object
->shutdown
= FALSE
;
1847 object
->pool
= pool
;
1848 object
->group
= NULL
;
1849 object
->userdata
= userdata
;
1850 object
->group_cancel_callback
= NULL
;
1851 object
->finalization_callback
= NULL
;
1852 object
->may_run_long
= 0;
1853 object
->race_dll
= NULL
;
1855 memset( &object
->group_entry
, 0, sizeof(object
->group_entry
) );
1856 object
->is_group_member
= FALSE
;
1858 memset( &object
->pool_entry
, 0, sizeof(object
->pool_entry
) );
1859 RtlInitializeConditionVariable( &object
->finished_event
);
1860 RtlInitializeConditionVariable( &object
->group_finished_event
);
1861 object
->num_pending_callbacks
= 0;
1862 object
->num_running_callbacks
= 0;
1863 object
->num_associated_callbacks
= 0;
1867 if (environment
->Version
!= 1 && environment
->Version
!= 3)
1868 FIXME( "unsupported environment version %u\n", environment
->Version
);
1870 object
->group
= impl_from_TP_CLEANUP_GROUP( environment
->CleanupGroup
);
1871 object
->group_cancel_callback
= environment
->CleanupGroupCancelCallback
;
1872 object
->finalization_callback
= environment
->FinalizationCallback
;
1873 object
->may_run_long
= environment
->u
.s
.LongFunction
!= 0;
1874 object
->race_dll
= environment
->RaceDll
;
1876 if (environment
->ActivationContext
)
1877 FIXME( "activation context not supported yet\n" );
1879 if (environment
->u
.s
.Persistent
)
1880 FIXME( "persistent threads not supported yet\n" );
1883 if (object
->race_dll
)
1884 LdrAddRefDll( 0, object
->race_dll
);
1886 TRACE( "allocated object %p of type %u\n", object
, object
->type
);
1888 /* For simple callbacks we have to run tp_object_submit before adding this object
1889 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1890 * will be set, and tp_object_submit would fail with an assertion. */
1892 if (is_simple_callback
)
1893 tp_object_submit( object
, FALSE
);
1897 struct threadpool_group
*group
= object
->group
;
1898 interlocked_inc( &group
->refcount
);
1900 RtlEnterCriticalSection( &group
->cs
);
1901 list_add_tail( &group
->members
, &object
->group_entry
);
1902 object
->is_group_member
= TRUE
;
1903 RtlLeaveCriticalSection( &group
->cs
);
1906 if (is_simple_callback
)
1907 tp_object_release( object
);
1910 /***********************************************************************
1911 * tp_object_submit (internal)
1913 * Submits a threadpool object to the associated threadpool. This
1914 * function has to be VOID because TpPostWork can never fail on Windows.
1916 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
)
1918 struct threadpool
*pool
= object
->pool
;
1919 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
1921 assert( !object
->shutdown
);
1922 assert( !pool
->shutdown
);
1924 RtlEnterCriticalSection( &pool
->cs
);
1926 /* Start new worker threads if required. */
1927 if (pool
->num_busy_workers
>= pool
->num_workers
&&
1928 pool
->num_workers
< pool
->max_workers
)
1929 status
= tp_new_worker_thread( pool
);
1931 /* Queue work item and increment refcount. */
1932 interlocked_inc( &object
->refcount
);
1933 if (!object
->num_pending_callbacks
++)
1934 list_add_tail( &pool
->pool
, &object
->pool_entry
);
1936 /* Count how often the object was signaled. */
1937 if (object
->type
== TP_OBJECT_TYPE_WAIT
&& signaled
)
1938 object
->u
.wait
.signaled
++;
1940 /* No new thread started - wake up one existing thread. */
1941 if (status
!= STATUS_SUCCESS
)
1943 assert( pool
->num_workers
> 0 );
1944 RtlWakeConditionVariable( &pool
->update_event
);
1947 RtlLeaveCriticalSection( &pool
->cs
);
1950 /***********************************************************************
1951 * tp_object_cancel (internal)
1953 * Cancels all currently pending callbacks for a specific object.
1955 static void tp_object_cancel( struct threadpool_object
*object
)
1957 struct threadpool
*pool
= object
->pool
;
1958 LONG pending_callbacks
= 0;
1960 RtlEnterCriticalSection( &pool
->cs
);
1961 if (object
->num_pending_callbacks
)
1963 pending_callbacks
= object
->num_pending_callbacks
;
1964 object
->num_pending_callbacks
= 0;
1965 list_remove( &object
->pool_entry
);
1967 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
1968 object
->u
.wait
.signaled
= 0;
1970 RtlLeaveCriticalSection( &pool
->cs
);
1972 while (pending_callbacks
--)
1973 tp_object_release( object
);
1976 /***********************************************************************
1977 * tp_object_wait (internal)
1979 * Waits until all pending and running callbacks of a specific object
1980 * have been processed.
1982 static void tp_object_wait( struct threadpool_object
*object
, BOOL group_wait
)
1984 struct threadpool
*pool
= object
->pool
;
1986 RtlEnterCriticalSection( &pool
->cs
);
1989 while (object
->num_pending_callbacks
|| object
->num_running_callbacks
)
1990 RtlSleepConditionVariableCS( &object
->group_finished_event
, &pool
->cs
, NULL
);
1994 while (object
->num_pending_callbacks
|| object
->num_associated_callbacks
)
1995 RtlSleepConditionVariableCS( &object
->finished_event
, &pool
->cs
, NULL
);
1997 RtlLeaveCriticalSection( &pool
->cs
);
2000 /***********************************************************************
2001 * tp_object_prepare_shutdown (internal)
2003 * Prepares a threadpool object for shutdown.
2005 static void tp_object_prepare_shutdown( struct threadpool_object
*object
)
2007 if (object
->type
== TP_OBJECT_TYPE_TIMER
)
2008 tp_timerqueue_unlock( object
);
2009 else if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2010 tp_waitqueue_unlock( object
);
2013 /***********************************************************************
2014 * tp_object_release (internal)
2016 * Releases a reference to a threadpool object.
2018 static BOOL
tp_object_release( struct threadpool_object
*object
)
2020 if (interlocked_dec( &object
->refcount
))
2023 TRACE( "destroying object %p of type %u\n", object
, object
->type
);
2025 assert( object
->shutdown
);
2026 assert( !object
->num_pending_callbacks
);
2027 assert( !object
->num_running_callbacks
);
2028 assert( !object
->num_associated_callbacks
);
2030 /* release reference to the group */
2033 struct threadpool_group
*group
= object
->group
;
2035 RtlEnterCriticalSection( &group
->cs
);
2036 if (object
->is_group_member
)
2038 list_remove( &object
->group_entry
);
2039 object
->is_group_member
= FALSE
;
2041 RtlLeaveCriticalSection( &group
->cs
);
2043 tp_group_release( group
);
2046 tp_threadpool_unlock( object
->pool
);
2048 if (object
->race_dll
)
2049 LdrUnloadDll( object
->race_dll
);
2051 RtlFreeHeap( GetProcessHeap(), 0, object
);
2055 /***********************************************************************
2056 * threadpool_worker_proc (internal)
2058 static void CALLBACK
threadpool_worker_proc( void *param
)
2060 TP_CALLBACK_INSTANCE
*callback_instance
;
2061 struct threadpool_instance instance
;
2062 struct threadpool
*pool
= param
;
2063 TP_WAIT_RESULT wait_result
= 0;
2064 LARGE_INTEGER timeout
;
2068 TRACE( "starting worker thread for pool %p\n", pool
);
2070 RtlEnterCriticalSection( &pool
->cs
);
2071 pool
->num_busy_workers
--;
2074 while ((ptr
= list_head( &pool
->pool
)))
2076 struct threadpool_object
*object
= LIST_ENTRY( ptr
, struct threadpool_object
, pool_entry
);
2077 assert( object
->num_pending_callbacks
> 0 );
2079 /* If further pending callbacks are queued, move the work item to
2080 * the end of the pool list. Otherwise remove it from the pool. */
2081 list_remove( &object
->pool_entry
);
2082 if (--object
->num_pending_callbacks
)
2083 list_add_tail( &pool
->pool
, &object
->pool_entry
);
2085 /* For wait objects check if they were signaled or have timed out. */
2086 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2088 wait_result
= object
->u
.wait
.signaled
? WAIT_OBJECT_0
: WAIT_TIMEOUT
;
2089 if (wait_result
== WAIT_OBJECT_0
) object
->u
.wait
.signaled
--;
2092 /* Leave critical section and do the actual callback. */
2093 object
->num_associated_callbacks
++;
2094 object
->num_running_callbacks
++;
2095 pool
->num_busy_workers
++;
2096 RtlLeaveCriticalSection( &pool
->cs
);
2098 /* Initialize threadpool instance struct. */
2099 callback_instance
= (TP_CALLBACK_INSTANCE
*)&instance
;
2100 instance
.object
= object
;
2101 instance
.threadid
= GetCurrentThreadId();
2102 instance
.associated
= TRUE
;
2103 instance
.may_run_long
= object
->may_run_long
;
2104 instance
.cleanup
.critical_section
= NULL
;
2105 instance
.cleanup
.mutex
= NULL
;
2106 instance
.cleanup
.semaphore
= NULL
;
2107 instance
.cleanup
.semaphore_count
= 0;
2108 instance
.cleanup
.event
= NULL
;
2109 instance
.cleanup
.library
= NULL
;
2111 switch (object
->type
)
2113 case TP_OBJECT_TYPE_SIMPLE
:
2115 TRACE( "executing simple callback %p(%p, %p)\n",
2116 object
->u
.simple
.callback
, callback_instance
, object
->userdata
);
2117 object
->u
.simple
.callback( callback_instance
, object
->userdata
);
2118 TRACE( "callback %p returned\n", object
->u
.simple
.callback
);
2122 case TP_OBJECT_TYPE_WORK
:
2124 TRACE( "executing work callback %p(%p, %p, %p)\n",
2125 object
->u
.work
.callback
, callback_instance
, object
->userdata
, object
);
2126 object
->u
.work
.callback( callback_instance
, object
->userdata
, (TP_WORK
*)object
);
2127 TRACE( "callback %p returned\n", object
->u
.work
.callback
);
2131 case TP_OBJECT_TYPE_TIMER
:
2133 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2134 object
->u
.timer
.callback
, callback_instance
, object
->userdata
, object
);
2135 object
->u
.timer
.callback( callback_instance
, object
->userdata
, (TP_TIMER
*)object
);
2136 TRACE( "callback %p returned\n", object
->u
.timer
.callback
);
2140 case TP_OBJECT_TYPE_WAIT
:
2142 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2143 object
->u
.wait
.callback
, callback_instance
, object
->userdata
, object
, wait_result
);
2144 object
->u
.wait
.callback( callback_instance
, object
->userdata
, (TP_WAIT
*)object
, wait_result
);
2145 TRACE( "callback %p returned\n", object
->u
.wait
.callback
);
2154 /* Execute finalization callback. */
2155 if (object
->finalization_callback
)
2157 TRACE( "executing finalization callback %p(%p, %p)\n",
2158 object
->finalization_callback
, callback_instance
, object
->userdata
);
2159 object
->finalization_callback( callback_instance
, object
->userdata
);
2160 TRACE( "callback %p returned\n", object
->finalization_callback
);
2163 /* Execute cleanup tasks. */
2164 if (instance
.cleanup
.critical_section
)
2166 RtlLeaveCriticalSection( instance
.cleanup
.critical_section
);
2168 if (instance
.cleanup
.mutex
)
2170 status
= NtReleaseMutant( instance
.cleanup
.mutex
, NULL
);
2171 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2173 if (instance
.cleanup
.semaphore
)
2175 status
= NtReleaseSemaphore( instance
.cleanup
.semaphore
, instance
.cleanup
.semaphore_count
, NULL
);
2176 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2178 if (instance
.cleanup
.event
)
2180 status
= NtSetEvent( instance
.cleanup
.event
, NULL
);
2181 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2183 if (instance
.cleanup
.library
)
2185 LdrUnloadDll( instance
.cleanup
.library
);
2189 RtlEnterCriticalSection( &pool
->cs
);
2190 pool
->num_busy_workers
--;
2192 /* Simple callbacks are automatically shutdown after execution. */
2193 if (object
->type
== TP_OBJECT_TYPE_SIMPLE
)
2195 tp_object_prepare_shutdown( object
);
2196 object
->shutdown
= TRUE
;
2199 object
->num_running_callbacks
--;
2200 if (!object
->num_pending_callbacks
&& !object
->num_running_callbacks
)
2201 RtlWakeAllConditionVariable( &object
->group_finished_event
);
2203 if (instance
.associated
)
2205 object
->num_associated_callbacks
--;
2206 if (!object
->num_pending_callbacks
&& !object
->num_associated_callbacks
)
2207 RtlWakeAllConditionVariable( &object
->finished_event
);
2210 tp_object_release( object
);
2213 /* Shutdown worker thread if requested. */
2217 /* Wait for new tasks or until the timeout expires. A thread only terminates
2218 * when no new tasks are available, and the number of threads can be
2219 * decreased without violating the min_workers limit. An exception is when
2220 * min_workers == 0, then objcount is used to detect if the last thread
2221 * can be terminated. */
2222 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
2223 if (RtlSleepConditionVariableCS( &pool
->update_event
, &pool
->cs
, &timeout
) == STATUS_TIMEOUT
&&
2224 !list_head( &pool
->pool
) && (pool
->num_workers
> max( pool
->min_workers
, 1 ) ||
2225 (!pool
->min_workers
&& !pool
->objcount
)))
2230 pool
->num_workers
--;
2231 RtlLeaveCriticalSection( &pool
->cs
);
2233 TRACE( "terminating worker thread for pool %p\n", pool
);
2234 tp_threadpool_release( pool
);
2235 RtlExitUserThread( 0 );
2238 /***********************************************************************
2239 * TpAllocCleanupGroup (NTDLL.@)
2241 NTSTATUS WINAPI
TpAllocCleanupGroup( TP_CLEANUP_GROUP
**out
)
2243 TRACE( "%p\n", out
);
2245 return tp_group_alloc( (struct threadpool_group
**)out
);
2248 /***********************************************************************
2249 * TpAllocPool (NTDLL.@)
2251 NTSTATUS WINAPI
TpAllocPool( TP_POOL
**out
, PVOID reserved
)
2253 TRACE( "%p %p\n", out
, reserved
);
2256 FIXME( "reserved argument is nonzero (%p)\n", reserved
);
2258 return tp_threadpool_alloc( (struct threadpool
**)out
);
2261 /***********************************************************************
2262 * TpAllocTimer (NTDLL.@)
2264 NTSTATUS WINAPI
TpAllocTimer( TP_TIMER
**out
, PTP_TIMER_CALLBACK callback
, PVOID userdata
,
2265 TP_CALLBACK_ENVIRON
*environment
)
2267 struct threadpool_object
*object
;
2268 struct threadpool
*pool
;
2271 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2273 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2275 return STATUS_NO_MEMORY
;
2277 status
= tp_threadpool_lock( &pool
, environment
);
2280 RtlFreeHeap( GetProcessHeap(), 0, object
);
2284 object
->type
= TP_OBJECT_TYPE_TIMER
;
2285 object
->u
.timer
.callback
= callback
;
2287 status
= tp_timerqueue_lock( object
);
2290 tp_threadpool_unlock( pool
);
2291 RtlFreeHeap( GetProcessHeap(), 0, object
);
2295 tp_object_initialize( object
, pool
, userdata
, environment
);
2297 *out
= (TP_TIMER
*)object
;
2298 return STATUS_SUCCESS
;
2301 /***********************************************************************
2302 * TpAllocWait (NTDLL.@)
2304 NTSTATUS WINAPI
TpAllocWait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2305 TP_CALLBACK_ENVIRON
*environment
)
2307 struct threadpool_object
*object
;
2308 struct threadpool
*pool
;
2311 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2313 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2315 return STATUS_NO_MEMORY
;
2317 status
= tp_threadpool_lock( &pool
, environment
);
2320 RtlFreeHeap( GetProcessHeap(), 0, object
);
2324 object
->type
= TP_OBJECT_TYPE_WAIT
;
2325 object
->u
.wait
.callback
= callback
;
2327 status
= tp_waitqueue_lock( object
);
2330 tp_threadpool_unlock( pool
);
2331 RtlFreeHeap( GetProcessHeap(), 0, object
);
2335 tp_object_initialize( object
, pool
, userdata
, environment
);
2337 *out
= (TP_WAIT
*)object
;
2338 return STATUS_SUCCESS
;
2341 /***********************************************************************
2342 * TpAllocWork (NTDLL.@)
2344 NTSTATUS WINAPI
TpAllocWork( TP_WORK
**out
, PTP_WORK_CALLBACK callback
, PVOID userdata
,
2345 TP_CALLBACK_ENVIRON
*environment
)
2347 struct threadpool_object
*object
;
2348 struct threadpool
*pool
;
2351 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2353 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2355 return STATUS_NO_MEMORY
;
2357 status
= tp_threadpool_lock( &pool
, environment
);
2360 RtlFreeHeap( GetProcessHeap(), 0, object
);
2364 object
->type
= TP_OBJECT_TYPE_WORK
;
2365 object
->u
.work
.callback
= callback
;
2366 tp_object_initialize( object
, pool
, userdata
, environment
);
2368 *out
= (TP_WORK
*)object
;
2369 return STATUS_SUCCESS
;
2372 /***********************************************************************
2373 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2375 VOID WINAPI
TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE
*instance
, CRITICAL_SECTION
*crit
)
2377 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2379 TRACE( "%p %p\n", instance
, crit
);
2381 if (!this->cleanup
.critical_section
)
2382 this->cleanup
.critical_section
= crit
;
2385 /***********************************************************************
2386 * TpCallbackMayRunLong (NTDLL.@)
2388 NTSTATUS WINAPI
TpCallbackMayRunLong( TP_CALLBACK_INSTANCE
*instance
)
2390 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2391 struct threadpool_object
*object
= this->object
;
2392 struct threadpool
*pool
;
2393 NTSTATUS status
= STATUS_SUCCESS
;
2395 TRACE( "%p\n", instance
);
2397 if (this->threadid
!= GetCurrentThreadId())
2399 ERR("called from wrong thread, ignoring\n");
2400 return STATUS_UNSUCCESSFUL
; /* FIXME */
2403 if (this->may_run_long
)
2404 return STATUS_SUCCESS
;
2406 pool
= object
->pool
;
2407 RtlEnterCriticalSection( &pool
->cs
);
2409 /* Start new worker threads if required. */
2410 if (pool
->num_busy_workers
>= pool
->num_workers
)
2412 if (pool
->num_workers
< pool
->max_workers
)
2414 status
= tp_new_worker_thread( pool
);
2418 status
= STATUS_TOO_MANY_THREADS
;
2422 RtlLeaveCriticalSection( &pool
->cs
);
2423 this->may_run_long
= TRUE
;
2427 /***********************************************************************
2428 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2430 VOID WINAPI
TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE mutex
)
2432 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2434 TRACE( "%p %p\n", instance
, mutex
);
2436 if (!this->cleanup
.mutex
)
2437 this->cleanup
.mutex
= mutex
;
2440 /***********************************************************************
2441 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2443 VOID WINAPI
TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE semaphore
, DWORD count
)
2445 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2447 TRACE( "%p %p %u\n", instance
, semaphore
, count
);
2449 if (!this->cleanup
.semaphore
)
2451 this->cleanup
.semaphore
= semaphore
;
2452 this->cleanup
.semaphore_count
= count
;
2456 /***********************************************************************
2457 * TpCallbackSetEventOnCompletion (NTDLL.@)
2459 VOID WINAPI
TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE event
)
2461 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2463 TRACE( "%p %p\n", instance
, event
);
2465 if (!this->cleanup
.event
)
2466 this->cleanup
.event
= event
;
2469 /***********************************************************************
2470 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2472 VOID WINAPI
TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HMODULE module
)
2474 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2476 TRACE( "%p %p\n", instance
, module
);
2478 if (!this->cleanup
.library
)
2479 this->cleanup
.library
= module
;
2482 /***********************************************************************
2483 * TpDisassociateCallback (NTDLL.@)
2485 VOID WINAPI
TpDisassociateCallback( TP_CALLBACK_INSTANCE
*instance
)
2487 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2488 struct threadpool_object
*object
= this->object
;
2489 struct threadpool
*pool
;
2491 TRACE( "%p\n", instance
);
2493 if (this->threadid
!= GetCurrentThreadId())
2495 ERR("called from wrong thread, ignoring\n");
2499 if (!this->associated
)
2502 pool
= object
->pool
;
2503 RtlEnterCriticalSection( &pool
->cs
);
2505 object
->num_associated_callbacks
--;
2506 if (!object
->num_pending_callbacks
&& !object
->num_associated_callbacks
)
2507 RtlWakeAllConditionVariable( &object
->finished_event
);
2509 RtlLeaveCriticalSection( &pool
->cs
);
2510 this->associated
= FALSE
;
2513 /***********************************************************************
2514 * TpIsTimerSet (NTDLL.@)
2516 BOOL WINAPI
TpIsTimerSet( TP_TIMER
*timer
)
2518 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2520 TRACE( "%p\n", timer
);
2522 return this->u
.timer
.timer_set
;
2525 /***********************************************************************
2526 * TpPostWork (NTDLL.@)
2528 VOID WINAPI
TpPostWork( TP_WORK
*work
)
2530 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2532 TRACE( "%p\n", work
);
2534 tp_object_submit( this, FALSE
);
2537 /***********************************************************************
2538 * TpReleaseCleanupGroup (NTDLL.@)
2540 VOID WINAPI
TpReleaseCleanupGroup( TP_CLEANUP_GROUP
*group
)
2542 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2544 TRACE( "%p\n", group
);
2546 tp_group_shutdown( this );
2547 tp_group_release( this );
2550 /***********************************************************************
2551 * TpReleaseCleanupGroupMembers (NTDLL.@)
2553 VOID WINAPI
TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP
*group
, BOOL cancel_pending
, PVOID userdata
)
2555 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2556 struct threadpool_object
*object
, *next
;
2557 struct list members
;
2559 TRACE( "%p %u %p\n", group
, cancel_pending
, userdata
);
2561 RtlEnterCriticalSection( &this->cs
);
2563 /* Unset group, increase references, and mark objects for shutdown */
2564 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &this->members
, struct threadpool_object
, group_entry
)
2566 assert( object
->group
== this );
2567 assert( object
->is_group_member
);
2569 if (interlocked_inc( &object
->refcount
) == 1)
2571 /* Object is basically already destroyed, but group reference
2572 * was not deleted yet. We can safely ignore this object. */
2573 interlocked_dec( &object
->refcount
);
2574 list_remove( &object
->group_entry
);
2575 object
->is_group_member
= FALSE
;
2579 object
->is_group_member
= FALSE
;
2580 tp_object_prepare_shutdown( object
);
2583 /* Move members to a new temporary list */
2584 list_init( &members
);
2585 list_move_tail( &members
, &this->members
);
2587 RtlLeaveCriticalSection( &this->cs
);
2589 /* Cancel pending callbacks if requested */
2592 LIST_FOR_EACH_ENTRY( object
, &members
, struct threadpool_object
, group_entry
)
2594 tp_object_cancel( object
);
2598 /* Wait for remaining callbacks to finish */
2599 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &members
, struct threadpool_object
, group_entry
)
2601 tp_object_wait( object
, TRUE
);
2603 if (!object
->shutdown
)
2605 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2606 if (cancel_pending
&& object
->group_cancel_callback
)
2608 TRACE( "executing group cancel callback %p(%p, %p)\n",
2609 object
->group_cancel_callback
, object
->userdata
, userdata
);
2610 object
->group_cancel_callback( object
->userdata
, userdata
);
2611 TRACE( "callback %p returned\n", object
->group_cancel_callback
);
2614 if (object
->type
!= TP_OBJECT_TYPE_SIMPLE
)
2615 tp_object_release( object
);
2618 object
->shutdown
= TRUE
;
2619 tp_object_release( object
);
2623 /***********************************************************************
2624 * TpReleasePool (NTDLL.@)
2626 VOID WINAPI
TpReleasePool( TP_POOL
*pool
)
2628 struct threadpool
*this = impl_from_TP_POOL( pool
);
2630 TRACE( "%p\n", pool
);
2632 tp_threadpool_shutdown( this );
2633 tp_threadpool_release( this );
2636 /***********************************************************************
2637 * TpReleaseTimer (NTDLL.@)
2639 VOID WINAPI
TpReleaseTimer( TP_TIMER
*timer
)
2641 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2643 TRACE( "%p\n", timer
);
2645 tp_object_prepare_shutdown( this );
2646 this->shutdown
= TRUE
;
2647 tp_object_release( this );
2650 /***********************************************************************
2651 * TpReleaseWait (NTDLL.@)
2653 VOID WINAPI
TpReleaseWait( TP_WAIT
*wait
)
2655 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2657 TRACE( "%p\n", wait
);
2659 tp_object_prepare_shutdown( this );
2660 this->shutdown
= TRUE
;
2661 tp_object_release( this );
2664 /***********************************************************************
2665 * TpReleaseWork (NTDLL.@)
2667 VOID WINAPI
TpReleaseWork( TP_WORK
*work
)
2669 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2671 TRACE( "%p\n", work
);
2673 tp_object_prepare_shutdown( this );
2674 this->shutdown
= TRUE
;
2675 tp_object_release( this );
2678 /***********************************************************************
2679 * TpSetPoolMaxThreads (NTDLL.@)
2681 VOID WINAPI
TpSetPoolMaxThreads( TP_POOL
*pool
, DWORD maximum
)
2683 struct threadpool
*this = impl_from_TP_POOL( pool
);
2685 TRACE( "%p %u\n", pool
, maximum
);
2687 RtlEnterCriticalSection( &this->cs
);
2688 this->max_workers
= max( maximum
, 1 );
2689 this->min_workers
= min( this->min_workers
, this->max_workers
);
2690 RtlLeaveCriticalSection( &this->cs
);
2693 /***********************************************************************
2694 * TpSetPoolMinThreads (NTDLL.@)
2696 BOOL WINAPI
TpSetPoolMinThreads( TP_POOL
*pool
, DWORD minimum
)
2698 struct threadpool
*this = impl_from_TP_POOL( pool
);
2699 NTSTATUS status
= STATUS_SUCCESS
;
2701 TRACE( "%p %u\n", pool
, minimum
);
2703 RtlEnterCriticalSection( &this->cs
);
2705 while (this->num_workers
< minimum
)
2707 status
= tp_new_worker_thread( this );
2708 if (status
!= STATUS_SUCCESS
)
2712 if (status
== STATUS_SUCCESS
)
2714 this->min_workers
= minimum
;
2715 this->max_workers
= max( this->min_workers
, this->max_workers
);
2718 RtlLeaveCriticalSection( &this->cs
);
2722 /***********************************************************************
2723 * TpSetTimer (NTDLL.@)
2725 VOID WINAPI
TpSetTimer( TP_TIMER
*timer
, LARGE_INTEGER
*timeout
, LONG period
, LONG window_length
)
2727 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2728 struct threadpool_object
*other_timer
;
2729 BOOL submit_timer
= FALSE
;
2730 ULONGLONG timestamp
;
2732 TRACE( "%p %p %u %u\n", timer
, timeout
, period
, window_length
);
2734 RtlEnterCriticalSection( &timerqueue
.cs
);
2736 assert( this->u
.timer
.timer_initialized
);
2737 this->u
.timer
.timer_set
= timeout
!= NULL
;
2739 /* Convert relative timeout to absolute timestamp and handle a timeout
2740 * of zero, which means that the timer is submitted immediately. */
2743 timestamp
= timeout
->QuadPart
;
2744 if ((LONGLONG
)timestamp
< 0)
2747 NtQuerySystemTime( &now
);
2748 timestamp
= now
.QuadPart
- timestamp
;
2750 else if (!timestamp
)
2757 NtQuerySystemTime( &now
);
2758 timestamp
= now
.QuadPart
+ (ULONGLONG
)period
* 10000;
2760 submit_timer
= TRUE
;
2764 /* First remove existing timeout. */
2765 if (this->u
.timer
.timer_pending
)
2767 list_remove( &this->u
.timer
.timer_entry
);
2768 this->u
.timer
.timer_pending
= FALSE
;
2771 /* If the timer was enabled, then add it back to the queue. */
2774 this->u
.timer
.timeout
= timestamp
;
2775 this->u
.timer
.period
= period
;
2776 this->u
.timer
.window_length
= window_length
;
2778 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
2779 struct threadpool_object
, u
.timer
.timer_entry
)
2781 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
2782 if (this->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
2785 list_add_before( &other_timer
->u
.timer
.timer_entry
, &this->u
.timer
.timer_entry
);
2787 /* Wake up the timer thread when the timeout has to be updated. */
2788 if (list_head( &timerqueue
.pending_timers
) == &this->u
.timer
.timer_entry
)
2789 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
2791 this->u
.timer
.timer_pending
= TRUE
;
2794 RtlLeaveCriticalSection( &timerqueue
.cs
);
2797 tp_object_submit( this, FALSE
);
2800 /***********************************************************************
2801 * TpSetWait (NTDLL.@)
2803 VOID WINAPI
TpSetWait( TP_WAIT
*wait
, HANDLE handle
, LARGE_INTEGER
*timeout
)
2805 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2806 ULONGLONG timestamp
= TIMEOUT_INFINITE
;
2807 BOOL submit_wait
= FALSE
;
2809 TRACE( "%p %p %p\n", wait
, handle
, timeout
);
2811 RtlEnterCriticalSection( &waitqueue
.cs
);
2813 assert( this->u
.wait
.bucket
);
2814 this->u
.wait
.handle
= handle
;
2816 if (handle
|| this->u
.wait
.wait_pending
)
2818 struct waitqueue_bucket
*bucket
= this->u
.wait
.bucket
;
2819 list_remove( &this->u
.wait
.wait_entry
);
2821 /* Convert relative timeout to absolute timestamp. */
2822 if (handle
&& timeout
)
2824 timestamp
= timeout
->QuadPart
;
2825 if ((LONGLONG
)timestamp
< 0)
2828 NtQuerySystemTime( &now
);
2829 timestamp
= now
.QuadPart
- timestamp
;
2831 else if (!timestamp
)
2838 /* Add wait object back into one of the queues. */
2841 list_add_tail( &bucket
->waiting
, &this->u
.wait
.wait_entry
);
2842 this->u
.wait
.wait_pending
= TRUE
;
2843 this->u
.wait
.timeout
= timestamp
;
2847 list_add_tail( &bucket
->reserved
, &this->u
.wait
.wait_entry
);
2848 this->u
.wait
.wait_pending
= FALSE
;
2851 /* Wake up the wait queue thread. */
2852 NtSetEvent( bucket
->update_event
, NULL
);
2855 RtlLeaveCriticalSection( &waitqueue
.cs
);
2858 tp_object_submit( this, FALSE
);
2861 /***********************************************************************
2862 * TpSimpleTryPost (NTDLL.@)
2864 NTSTATUS WINAPI
TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback
, PVOID userdata
,
2865 TP_CALLBACK_ENVIRON
*environment
)
2867 struct threadpool_object
*object
;
2868 struct threadpool
*pool
;
2871 TRACE( "%p %p %p\n", callback
, userdata
, environment
);
2873 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2875 return STATUS_NO_MEMORY
;
2877 status
= tp_threadpool_lock( &pool
, environment
);
2880 RtlFreeHeap( GetProcessHeap(), 0, object
);
2884 object
->type
= TP_OBJECT_TYPE_SIMPLE
;
2885 object
->u
.simple
.callback
= callback
;
2886 tp_object_initialize( object
, pool
, userdata
, environment
);
2888 return STATUS_SUCCESS
;
2891 /***********************************************************************
2892 * TpWaitForTimer (NTDLL.@)
2894 VOID WINAPI
TpWaitForTimer( TP_TIMER
*timer
, BOOL cancel_pending
)
2896 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2898 TRACE( "%p %d\n", timer
, cancel_pending
);
2901 tp_object_cancel( this );
2902 tp_object_wait( this, FALSE
);
2905 /***********************************************************************
2906 * TpWaitForWait (NTDLL.@)
2908 VOID WINAPI
TpWaitForWait( TP_WAIT
*wait
, BOOL cancel_pending
)
2910 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2912 TRACE( "%p %d\n", wait
, cancel_pending
);
2915 tp_object_cancel( this );
2916 tp_object_wait( this, FALSE
);
2919 /***********************************************************************
2920 * TpWaitForWork (NTDLL.@)
2922 VOID WINAPI
TpWaitForWork( TP_WORK
*work
, BOOL cancel_pending
)
2924 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2926 TRACE( "%p %u\n", work
, cancel_pending
);
2929 tp_object_cancel( this );
2930 tp_object_wait( this, FALSE
);