4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #define NONAMELESSUNION
28 #define WIN32_NO_STATUS
31 #include "wine/debug.h"
32 #include "wine/list.h"
34 #include "ntdll_misc.h"
36 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
39 * Old thread pooling API
44 PRTL_WORK_ITEM_ROUTINE function
;
48 #define EXPIRE_NEVER (~(ULONGLONG)0)
49 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
51 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
;
56 RTL_CRITICAL_SECTION threadpool_compl_cs
;
60 NULL
, /* compl_port */
61 { &critsect_compl_debug
, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
64 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
66 0, 0, &old_threadpool
.threadpool_compl_cs
,
67 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
68 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
74 struct timer_queue
*q
;
76 ULONG runcount
; /* number of callbacks pending execution */
77 RTL_WAITORTIMERCALLBACKFUNC callback
;
82 BOOL destroy
; /* timer should be deleted; once set, never unset */
83 HANDLE event
; /* removal event */
89 RTL_CRITICAL_SECTION cs
;
90 struct list timers
; /* sorted by expiration time */
91 BOOL quit
; /* queue should be deleted; once set, never unset */
97 * Object-oriented thread pooling API
100 #define THREADPOOL_WORKER_TIMEOUT 5000
101 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
103 /* internal threadpool representation */
110 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
111 struct list pools
[3];
112 RTL_CONDITION_VARIABLE update_event
;
113 /* information about worker threads, locked via .cs */
117 int num_busy_workers
;
119 TP_POOL_STACK_INFORMATION stack_info
;
122 enum threadpool_objtype
124 TP_OBJECT_TYPE_SIMPLE
,
126 TP_OBJECT_TYPE_TIMER
,
133 IO_STATUS_BLOCK iosb
;
137 /* internal threadpool object representation */
138 struct threadpool_object
140 void *win32_callback
; /* leave space for kernelbase to store win32 callback */
143 /* read-only information */
144 enum threadpool_objtype type
;
145 struct threadpool
*pool
;
146 struct threadpool_group
*group
;
148 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
;
149 PTP_SIMPLE_CALLBACK finalization_callback
;
152 TP_CALLBACK_PRIORITY priority
;
153 /* information about the group, locked via .group->cs */
154 struct list group_entry
;
155 BOOL is_group_member
;
156 /* information about the pool, locked via .pool->cs */
157 struct list pool_entry
;
158 RTL_CONDITION_VARIABLE finished_event
;
159 RTL_CONDITION_VARIABLE group_finished_event
;
160 HANDLE completed_event
;
161 LONG num_pending_callbacks
;
162 LONG num_running_callbacks
;
163 LONG num_associated_callbacks
;
164 /* arguments for callback */
169 PTP_SIMPLE_CALLBACK callback
;
173 PTP_WORK_CALLBACK callback
;
177 PTP_TIMER_CALLBACK callback
;
178 /* information about the timer, locked via timerqueue.cs */
179 BOOL timer_initialized
;
181 struct list timer_entry
;
189 PTP_WAIT_CALLBACK callback
;
191 /* information about the wait object, locked via waitqueue.cs */
192 struct waitqueue_bucket
*bucket
;
194 struct list wait_entry
;
198 RTL_WAITORTIMERCALLBACKFUNC rtl_callback
;
202 PTP_IO_CALLBACK callback
;
203 /* locked via .pool->cs */
204 unsigned int pending_count
, completion_count
, completion_max
;
205 struct io_completion
*completions
;
210 /* internal threadpool instance representation */
211 struct threadpool_instance
213 struct threadpool_object
*object
;
219 CRITICAL_SECTION
*critical_section
;
222 LONG semaphore_count
;
228 /* internal threadpool group representation */
229 struct threadpool_group
234 /* list of group members, locked via .cs */
238 /* global timerqueue object */
239 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
;
246 struct list pending_timers
;
247 RTL_CONDITION_VARIABLE update_event
;
251 { &timerqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
253 FALSE
, /* thread_running */
254 LIST_INIT( timerqueue
.pending_timers
), /* pending_timers */
255 RTL_CONDITION_VARIABLE_INIT
/* update_event */
258 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
=
260 0, 0, &timerqueue
.cs
,
261 { &timerqueue_debug
.ProcessLocksList
, &timerqueue_debug
.ProcessLocksList
},
262 0, 0, { (DWORD_PTR
)(__FILE__
": timerqueue.cs") }
265 /* global waitqueue object */
266 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
;
276 { &waitqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
278 LIST_INIT( waitqueue
.buckets
) /* buckets */
281 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
=
284 { &waitqueue_debug
.ProcessLocksList
, &waitqueue_debug
.ProcessLocksList
},
285 0, 0, { (DWORD_PTR
)(__FILE__
": waitqueue.cs") }
288 struct waitqueue_bucket
290 struct list bucket_entry
;
292 struct list reserved
;
298 /* global I/O completion queue object */
299 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
;
307 RTL_CONDITION_VARIABLE update_event
;
311 .cs
= { &ioqueue_debug
, -1, 0, 0, 0, 0 },
314 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
=
317 { &ioqueue_debug
.ProcessLocksList
, &ioqueue_debug
.ProcessLocksList
},
318 0, 0, { (DWORD_PTR
)(__FILE__
": ioqueue.cs") }
321 static inline struct threadpool
*impl_from_TP_POOL( TP_POOL
*pool
)
323 return (struct threadpool
*)pool
;
326 static inline struct threadpool_object
*impl_from_TP_WORK( TP_WORK
*work
)
328 struct threadpool_object
*object
= (struct threadpool_object
*)work
;
329 assert( object
->type
== TP_OBJECT_TYPE_WORK
);
333 static inline struct threadpool_object
*impl_from_TP_TIMER( TP_TIMER
*timer
)
335 struct threadpool_object
*object
= (struct threadpool_object
*)timer
;
336 assert( object
->type
== TP_OBJECT_TYPE_TIMER
);
340 static inline struct threadpool_object
*impl_from_TP_WAIT( TP_WAIT
*wait
)
342 struct threadpool_object
*object
= (struct threadpool_object
*)wait
;
343 assert( object
->type
== TP_OBJECT_TYPE_WAIT
);
347 static inline struct threadpool_object
*impl_from_TP_IO( TP_IO
*io
)
349 struct threadpool_object
*object
= (struct threadpool_object
*)io
;
350 assert( object
->type
== TP_OBJECT_TYPE_IO
);
354 static inline struct threadpool_group
*impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group
)
356 return (struct threadpool_group
*)group
;
359 static inline struct threadpool_instance
*impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE
*instance
)
361 return (struct threadpool_instance
*)instance
;
364 static void CALLBACK
threadpool_worker_proc( void *param
);
365 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
);
366 static void tp_object_execute( struct threadpool_object
*object
, BOOL wait_thread
);
367 static void tp_object_prepare_shutdown( struct threadpool_object
*object
);
368 static BOOL
tp_object_release( struct threadpool_object
*object
);
369 static struct threadpool
*default_threadpool
= NULL
;
371 static BOOL
array_reserve(void **elements
, unsigned int *capacity
, unsigned int count
, unsigned int size
)
373 unsigned int new_capacity
, max_capacity
;
376 if (count
<= *capacity
)
379 max_capacity
= ~(SIZE_T
)0 / size
;
380 if (count
> max_capacity
)
383 new_capacity
= max(4, *capacity
);
384 while (new_capacity
< count
&& new_capacity
<= max_capacity
/ 2)
386 if (new_capacity
< count
)
387 new_capacity
= max_capacity
;
389 if (!(new_elements
= RtlReAllocateHeap( GetProcessHeap(), 0, *elements
, new_capacity
* size
)))
392 *elements
= new_elements
;
393 *capacity
= new_capacity
;
398 static void CALLBACK
process_rtl_work_item( TP_CALLBACK_INSTANCE
*instance
, void *userdata
)
400 struct rtl_work_item
*item
= userdata
;
402 TRACE("executing %p(%p)\n", item
->function
, item
->context
);
403 item
->function( item
->context
);
405 RtlFreeHeap( GetProcessHeap(), 0, item
);
408 /***********************************************************************
409 * RtlQueueWorkItem (NTDLL.@)
411 * Queues a work item into a thread in the thread pool.
414 * function [I] Work function to execute.
415 * context [I] Context to pass to the work function when it is executed.
416 * flags [I] Flags. See notes.
419 * Success: STATUS_SUCCESS.
420 * Failure: Any NTSTATUS code.
423 * Flags can be one or more of the following:
424 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
425 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
426 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
427 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
428 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
430 NTSTATUS WINAPI
RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function
, PVOID context
, ULONG flags
)
432 TP_CALLBACK_ENVIRON environment
;
433 struct rtl_work_item
*item
;
436 TRACE( "%p %p %u\n", function
, context
, flags
);
438 item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item
) );
440 return STATUS_NO_MEMORY
;
442 memset( &environment
, 0, sizeof(environment
) );
443 environment
.Version
= 1;
444 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
445 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
447 item
->function
= function
;
448 item
->context
= context
;
450 status
= TpSimpleTryPost( process_rtl_work_item
, item
, &environment
);
451 if (status
) RtlFreeHeap( GetProcessHeap(), 0, item
);
455 /***********************************************************************
456 * iocp_poller - get completion events and run callbacks
458 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
464 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
466 IO_STATUS_BLOCK iosb
;
467 NTSTATUS res
= NtRemoveIoCompletion( cport
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
470 ERR("NtRemoveIoCompletion failed: 0x%x\n", res
);
474 DWORD transferred
= 0;
477 if (iosb
.u
.Status
== STATUS_SUCCESS
)
478 transferred
= iosb
.Information
;
480 err
= RtlNtStatusToDosError(iosb
.u
.Status
);
482 callback( err
, transferred
, overlapped
);
488 /***********************************************************************
489 * RtlSetIoCompletionCallback (NTDLL.@)
491 * Binds a handle to a thread pool's completion port, and possibly
492 * starts a non-I/O thread to monitor this port and call functions back.
495 * FileHandle [I] Handle to bind to a completion port.
496 * Function [I] Callback function to call on I/O completions.
497 * Flags [I] Not used.
500 * Success: STATUS_SUCCESS.
501 * Failure: Any NTSTATUS code.
504 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
506 IO_STATUS_BLOCK iosb
;
507 FILE_COMPLETION_INFORMATION info
;
509 if (Flags
) FIXME("Unknown value Flags=0x%x\n", Flags
);
511 if (!old_threadpool
.compl_port
)
513 NTSTATUS res
= STATUS_SUCCESS
;
515 RtlEnterCriticalSection(&old_threadpool
.threadpool_compl_cs
);
516 if (!old_threadpool
.compl_port
)
520 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
523 /* FIXME native can start additional threads in case of e.g. hung callback function. */
524 res
= RtlQueueWorkItem( iocp_poller
, cport
, WT_EXECUTEDEFAULT
);
526 old_threadpool
.compl_port
= cport
;
531 RtlLeaveCriticalSection(&old_threadpool
.threadpool_compl_cs
);
535 info
.CompletionPort
= old_threadpool
.compl_port
;
536 info
.CompletionKey
= (ULONG_PTR
)Function
;
538 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
541 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
543 if (timeout
== INFINITE
) return NULL
;
544 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
549 /************************** Timer Queue Impl **************************/
551 static void queue_remove_timer(struct queue_timer
*t
)
553 /* We MUST hold the queue cs while calling this function. This ensures
554 that we cannot queue another callback for this timer. The runcount
555 being zero makes sure we don't have any already queued. */
556 struct timer_queue
*q
= t
->q
;
558 assert(t
->runcount
== 0);
561 list_remove(&t
->entry
);
563 NtSetEvent(t
->event
, NULL
);
564 RtlFreeHeap(GetProcessHeap(), 0, t
);
566 if (q
->quit
&& list_empty(&q
->timers
))
567 NtSetEvent(q
->event
, NULL
);
570 static void timer_cleanup_callback(struct queue_timer
*t
)
572 struct timer_queue
*q
= t
->q
;
573 RtlEnterCriticalSection(&q
->cs
);
575 assert(0 < t
->runcount
);
578 if (t
->destroy
&& t
->runcount
== 0)
579 queue_remove_timer(t
);
581 RtlLeaveCriticalSection(&q
->cs
);
584 static DWORD WINAPI
timer_callback_wrapper(LPVOID p
)
586 struct queue_timer
*t
= p
;
587 t
->callback(t
->param
, TRUE
);
588 timer_cleanup_callback(t
);
592 static inline ULONGLONG
queue_current_time(void)
594 LARGE_INTEGER now
, freq
;
595 NtQueryPerformanceCounter(&now
, &freq
);
596 return now
.QuadPart
* 1000 / freq
.QuadPart
;
599 static void queue_add_timer(struct queue_timer
*t
, ULONGLONG time
,
602 /* We MUST hold the queue cs while calling this function. */
603 struct timer_queue
*q
= t
->q
;
604 struct list
*ptr
= &q
->timers
;
606 assert(!q
->quit
|| (t
->destroy
&& time
== EXPIRE_NEVER
));
608 if (time
!= EXPIRE_NEVER
)
609 LIST_FOR_EACH(ptr
, &q
->timers
)
611 struct queue_timer
*cur
= LIST_ENTRY(ptr
, struct queue_timer
, entry
);
612 if (time
< cur
->expire
)
615 list_add_before(ptr
, &t
->entry
);
619 /* If we insert at the head of the list, we need to expire sooner
621 if (set_event
&& &t
->entry
== list_head(&q
->timers
))
622 NtSetEvent(q
->event
, NULL
);
625 static inline void queue_move_timer(struct queue_timer
*t
, ULONGLONG time
,
628 /* We MUST hold the queue cs while calling this function. */
629 list_remove(&t
->entry
);
630 queue_add_timer(t
, time
, set_event
);
633 static void queue_timer_expire(struct timer_queue
*q
)
635 struct queue_timer
*t
= NULL
;
637 RtlEnterCriticalSection(&q
->cs
);
638 if (list_head(&q
->timers
))
641 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
642 if (!t
->destroy
&& t
->expire
<= ((now
= queue_current_time())))
647 next
= t
->expire
+ t
->period
;
648 /* avoid trigger cascade if overloaded / hibernated */
650 next
= now
+ t
->period
;
654 queue_move_timer(t
, next
, FALSE
);
659 RtlLeaveCriticalSection(&q
->cs
);
663 if (t
->flags
& WT_EXECUTEINTIMERTHREAD
)
664 timer_callback_wrapper(t
);
669 & (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
670 | WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
));
671 NTSTATUS status
= RtlQueueWorkItem(timer_callback_wrapper
, t
, flags
);
672 if (status
!= STATUS_SUCCESS
)
673 timer_cleanup_callback(t
);
678 static ULONG
queue_get_timeout(struct timer_queue
*q
)
680 struct queue_timer
*t
;
681 ULONG timeout
= INFINITE
;
683 RtlEnterCriticalSection(&q
->cs
);
684 if (list_head(&q
->timers
))
686 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
687 assert(!t
->destroy
|| t
->expire
== EXPIRE_NEVER
);
689 if (t
->expire
!= EXPIRE_NEVER
)
691 ULONGLONG time
= queue_current_time();
692 timeout
= t
->expire
< time
? 0 : t
->expire
- time
;
695 RtlLeaveCriticalSection(&q
->cs
);
700 static void WINAPI
timer_queue_thread_proc(LPVOID p
)
702 struct timer_queue
*q
= p
;
705 timeout_ms
= INFINITE
;
708 LARGE_INTEGER timeout
;
712 status
= NtWaitForSingleObject(
713 q
->event
, FALSE
, get_nt_timeout(&timeout
, timeout_ms
));
715 if (status
== STATUS_WAIT_0
)
717 /* There are two possible ways to trigger the event. Either
718 we are quitting and the last timer got removed, or a new
719 timer got put at the head of the list so we need to adjust
721 RtlEnterCriticalSection(&q
->cs
);
722 if (q
->quit
&& list_empty(&q
->timers
))
724 RtlLeaveCriticalSection(&q
->cs
);
726 else if (status
== STATUS_TIMEOUT
)
727 queue_timer_expire(q
);
732 timeout_ms
= queue_get_timeout(q
);
736 RtlDeleteCriticalSection(&q
->cs
);
738 RtlFreeHeap(GetProcessHeap(), 0, q
);
739 RtlExitUserThread( 0 );
742 static void queue_destroy_timer(struct queue_timer
*t
)
744 /* We MUST hold the queue cs while calling this function. */
746 if (t
->runcount
== 0)
747 /* Ensure a timer is promptly removed. If callbacks are pending,
748 it will be removed after the last one finishes by the callback
750 queue_remove_timer(t
);
752 /* Make sure no destroyed timer masks an active timer at the head
753 of the sorted list. */
754 queue_move_timer(t
, EXPIRE_NEVER
, FALSE
);
757 /***********************************************************************
758 * RtlCreateTimerQueue (NTDLL.@)
760 * Creates a timer queue object and returns a handle to it.
763 * NewTimerQueue [O] The newly created queue.
766 * Success: STATUS_SUCCESS.
767 * Failure: Any NTSTATUS code.
769 NTSTATUS WINAPI
RtlCreateTimerQueue(PHANDLE NewTimerQueue
)
772 struct timer_queue
*q
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q
);
774 return STATUS_NO_MEMORY
;
776 RtlInitializeCriticalSection(&q
->cs
);
777 list_init(&q
->timers
);
779 q
->magic
= TIMER_QUEUE_MAGIC
;
780 status
= NtCreateEvent(&q
->event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
781 if (status
!= STATUS_SUCCESS
)
783 RtlFreeHeap(GetProcessHeap(), 0, q
);
786 status
= RtlCreateUserThread(GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
787 timer_queue_thread_proc
, q
, &q
->thread
, NULL
);
788 if (status
!= STATUS_SUCCESS
)
791 RtlFreeHeap(GetProcessHeap(), 0, q
);
796 return STATUS_SUCCESS
;
799 /***********************************************************************
800 * RtlDeleteTimerQueueEx (NTDLL.@)
802 * Deletes a timer queue object.
805 * TimerQueue [I] The timer queue to destroy.
806 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
807 * wait until all timers are finished firing before
808 * returning. Otherwise, return immediately and set the
809 * event when all timers are done.
812 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
813 * Failure: Any NTSTATUS code.
815 NTSTATUS WINAPI
RtlDeleteTimerQueueEx(HANDLE TimerQueue
, HANDLE CompletionEvent
)
817 struct timer_queue
*q
= TimerQueue
;
818 struct queue_timer
*t
, *temp
;
822 if (!q
|| q
->magic
!= TIMER_QUEUE_MAGIC
)
823 return STATUS_INVALID_HANDLE
;
827 RtlEnterCriticalSection(&q
->cs
);
829 if (list_head(&q
->timers
))
830 /* When the last timer is removed, it will signal the timer thread to
832 LIST_FOR_EACH_ENTRY_SAFE(t
, temp
, &q
->timers
, struct queue_timer
, entry
)
833 queue_destroy_timer(t
);
835 /* However if we have none, we must do it ourselves. */
836 NtSetEvent(q
->event
, NULL
);
837 RtlLeaveCriticalSection(&q
->cs
);
839 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
841 NtWaitForSingleObject(thread
, FALSE
, NULL
);
842 status
= STATUS_SUCCESS
;
848 FIXME("asynchronous return on completion event unimplemented\n");
849 NtWaitForSingleObject(thread
, FALSE
, NULL
);
850 NtSetEvent(CompletionEvent
, NULL
);
852 status
= STATUS_PENDING
;
859 static struct timer_queue
*get_timer_queue(HANDLE TimerQueue
)
861 static struct timer_queue
*default_timer_queue
;
867 if (!default_timer_queue
)
870 NTSTATUS status
= RtlCreateTimerQueue(&q
);
871 if (status
== STATUS_SUCCESS
)
873 PVOID p
= InterlockedCompareExchangePointer( (void **) &default_timer_queue
, q
, NULL
);
875 /* Got beat to the punch. */
876 RtlDeleteTimerQueueEx(q
, NULL
);
879 return default_timer_queue
;
883 /***********************************************************************
884 * RtlCreateTimer (NTDLL.@)
886 * Creates a new timer associated with the given queue.
889 * NewTimer [O] The newly created timer.
890 * TimerQueue [I] The queue to hold the timer.
891 * Callback [I] The callback to fire.
892 * Parameter [I] The argument for the callback.
893 * DueTime [I] The delay, in milliseconds, before first firing the
895 * Period [I] The period, in milliseconds, at which to fire the timer
896 * after the first callback. If zero, the timer will only
897 * fire once. It still needs to be deleted with
899 * Flags [I] Flags controlling the execution of the callback. In
900 * addition to the WT_* thread pool flags (see
901 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
902 * WT_EXECUTEONLYONCE are supported.
905 * Success: STATUS_SUCCESS.
906 * Failure: Any NTSTATUS code.
908 NTSTATUS WINAPI
RtlCreateTimer(PHANDLE NewTimer
, HANDLE TimerQueue
,
909 RTL_WAITORTIMERCALLBACKFUNC Callback
,
910 PVOID Parameter
, DWORD DueTime
, DWORD Period
,
914 struct queue_timer
*t
;
915 struct timer_queue
*q
= get_timer_queue(TimerQueue
);
917 if (!q
) return STATUS_NO_MEMORY
;
918 if (q
->magic
!= TIMER_QUEUE_MAGIC
) return STATUS_INVALID_HANDLE
;
920 t
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t
);
922 return STATUS_NO_MEMORY
;
926 t
->callback
= Callback
;
927 t
->param
= Parameter
;
933 status
= STATUS_SUCCESS
;
934 RtlEnterCriticalSection(&q
->cs
);
936 status
= STATUS_INVALID_HANDLE
;
938 queue_add_timer(t
, queue_current_time() + DueTime
, TRUE
);
939 RtlLeaveCriticalSection(&q
->cs
);
941 if (status
== STATUS_SUCCESS
)
944 RtlFreeHeap(GetProcessHeap(), 0, t
);
949 /***********************************************************************
950 * RtlUpdateTimer (NTDLL.@)
952 * Changes the time at which a timer expires.
955 * TimerQueue [I] The queue that holds the timer.
956 * Timer [I] The timer to update.
957 * DueTime [I] The delay, in milliseconds, before next firing the timer.
958 * Period [I] The period, in milliseconds, at which to fire the timer
959 * after the first callback. If zero, the timer will not
960 * refire once. It still needs to be deleted with
964 * Success: STATUS_SUCCESS.
965 * Failure: Any NTSTATUS code.
967 NTSTATUS WINAPI
RtlUpdateTimer(HANDLE TimerQueue
, HANDLE Timer
,
968 DWORD DueTime
, DWORD Period
)
970 struct queue_timer
*t
= Timer
;
971 struct timer_queue
*q
= t
->q
;
973 RtlEnterCriticalSection(&q
->cs
);
974 /* Can't change a timer if it was once-only or destroyed. */
975 if (t
->expire
!= EXPIRE_NEVER
)
978 queue_move_timer(t
, queue_current_time() + DueTime
, TRUE
);
980 RtlLeaveCriticalSection(&q
->cs
);
982 return STATUS_SUCCESS
;
985 /***********************************************************************
986 * RtlDeleteTimer (NTDLL.@)
988 * Cancels a timer-queue timer.
991 * TimerQueue [I] The queue that holds the timer.
992 * Timer [I] The timer to update.
993 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
994 * wait until the timer is finished firing all pending
995 * callbacks before returning. Otherwise, return
996 * immediately and set the timer is done.
999 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1000 or if the completion event is NULL.
1001 * Failure: Any NTSTATUS code.
1003 NTSTATUS WINAPI
RtlDeleteTimer(HANDLE TimerQueue
, HANDLE Timer
,
1004 HANDLE CompletionEvent
)
1006 struct queue_timer
*t
= Timer
;
1007 struct timer_queue
*q
;
1008 NTSTATUS status
= STATUS_PENDING
;
1009 HANDLE event
= NULL
;
1012 return STATUS_INVALID_PARAMETER_1
;
1014 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1016 status
= NtCreateEvent(&event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1017 if (status
== STATUS_SUCCESS
)
1018 status
= STATUS_PENDING
;
1020 else if (CompletionEvent
)
1021 event
= CompletionEvent
;
1023 RtlEnterCriticalSection(&q
->cs
);
1025 if (t
->runcount
== 0 && event
)
1026 status
= STATUS_SUCCESS
;
1027 queue_destroy_timer(t
);
1028 RtlLeaveCriticalSection(&q
->cs
);
1030 if (CompletionEvent
== INVALID_HANDLE_VALUE
&& event
)
1032 if (status
== STATUS_PENDING
)
1034 NtWaitForSingleObject(event
, FALSE
, NULL
);
1035 status
= STATUS_SUCCESS
;
1043 /***********************************************************************
1044 * timerqueue_thread_proc (internal)
1046 static void CALLBACK
timerqueue_thread_proc( void *param
)
1048 ULONGLONG timeout_lower
, timeout_upper
, new_timeout
;
1049 struct threadpool_object
*other_timer
;
1050 LARGE_INTEGER now
, timeout
;
1053 TRACE( "starting timer queue thread\n" );
1055 RtlEnterCriticalSection( &timerqueue
.cs
);
1058 NtQuerySystemTime( &now
);
1060 /* Check for expired timers. */
1061 while ((ptr
= list_head( &timerqueue
.pending_timers
)))
1063 struct threadpool_object
*timer
= LIST_ENTRY( ptr
, struct threadpool_object
, u
.timer
.timer_entry
);
1064 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1065 assert( timer
->u
.timer
.timer_pending
);
1066 if (timer
->u
.timer
.timeout
> now
.QuadPart
)
1069 /* Queue a new callback in one of the worker threads. */
1070 list_remove( &timer
->u
.timer
.timer_entry
);
1071 timer
->u
.timer
.timer_pending
= FALSE
;
1072 tp_object_submit( timer
, FALSE
);
1074 /* Insert the timer back into the queue, except it's marked for shutdown. */
1075 if (timer
->u
.timer
.period
&& !timer
->shutdown
)
1077 timer
->u
.timer
.timeout
+= (ULONGLONG
)timer
->u
.timer
.period
* 10000;
1078 if (timer
->u
.timer
.timeout
<= now
.QuadPart
)
1079 timer
->u
.timer
.timeout
= now
.QuadPart
+ 1;
1081 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1082 struct threadpool_object
, u
.timer
.timer_entry
)
1084 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1085 if (timer
->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
1088 list_add_before( &other_timer
->u
.timer
.timer_entry
, &timer
->u
.timer
.timer_entry
);
1089 timer
->u
.timer
.timer_pending
= TRUE
;
1093 timeout_lower
= timeout_upper
= MAXLONGLONG
;
1095 /* Determine next timeout and use the window length to optimize wakeup times. */
1096 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1097 struct threadpool_object
, u
.timer
.timer_entry
)
1099 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1100 if (other_timer
->u
.timer
.timeout
>= timeout_upper
)
1103 timeout_lower
= other_timer
->u
.timer
.timeout
;
1104 new_timeout
= timeout_lower
+ (ULONGLONG
)other_timer
->u
.timer
.window_length
* 10000;
1105 if (new_timeout
< timeout_upper
)
1106 timeout_upper
= new_timeout
;
1109 /* Wait for timer update events or until the next timer expires. */
1110 if (timerqueue
.objcount
)
1112 timeout
.QuadPart
= timeout_lower
;
1113 RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
, &timeout
);
1117 /* All timers have been destroyed, if no new timers are created
1118 * within some amount of time, then we can shutdown this thread. */
1119 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1120 if (RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
,
1121 &timeout
) == STATUS_TIMEOUT
&& !timerqueue
.objcount
)
1127 timerqueue
.thread_running
= FALSE
;
1128 RtlLeaveCriticalSection( &timerqueue
.cs
);
1130 TRACE( "terminating timer queue thread\n" );
1131 RtlExitUserThread( 0 );
1134 /***********************************************************************
1135 * tp_new_worker_thread (internal)
1137 * Create and account a new worker thread for the desired pool.
1139 static NTSTATUS
tp_new_worker_thread( struct threadpool
*pool
)
1144 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
1145 threadpool_worker_proc
, pool
, &thread
, NULL
);
1146 if (status
== STATUS_SUCCESS
)
1148 InterlockedIncrement( &pool
->refcount
);
1149 pool
->num_workers
++;
1155 /***********************************************************************
1156 * tp_timerqueue_lock (internal)
1158 * Acquires a lock on the global timerqueue. When the lock is acquired
1159 * successfully, it is guaranteed that the timer thread is running.
1161 static NTSTATUS
tp_timerqueue_lock( struct threadpool_object
*timer
)
1163 NTSTATUS status
= STATUS_SUCCESS
;
1164 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1166 timer
->u
.timer
.timer_initialized
= FALSE
;
1167 timer
->u
.timer
.timer_pending
= FALSE
;
1168 timer
->u
.timer
.timer_set
= FALSE
;
1169 timer
->u
.timer
.timeout
= 0;
1170 timer
->u
.timer
.period
= 0;
1171 timer
->u
.timer
.window_length
= 0;
1173 RtlEnterCriticalSection( &timerqueue
.cs
);
1175 /* Make sure that the timerqueue thread is running. */
1176 if (!timerqueue
.thread_running
)
1179 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
1180 timerqueue_thread_proc
, NULL
, &thread
, NULL
);
1181 if (status
== STATUS_SUCCESS
)
1183 timerqueue
.thread_running
= TRUE
;
1188 if (status
== STATUS_SUCCESS
)
1190 timer
->u
.timer
.timer_initialized
= TRUE
;
1191 timerqueue
.objcount
++;
1194 RtlLeaveCriticalSection( &timerqueue
.cs
);
1198 /***********************************************************************
1199 * tp_timerqueue_unlock (internal)
1201 * Releases a lock on the global timerqueue.
1203 static void tp_timerqueue_unlock( struct threadpool_object
*timer
)
1205 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1207 RtlEnterCriticalSection( &timerqueue
.cs
);
1208 if (timer
->u
.timer
.timer_initialized
)
1210 /* If timer was pending, remove it. */
1211 if (timer
->u
.timer
.timer_pending
)
1213 list_remove( &timer
->u
.timer
.timer_entry
);
1214 timer
->u
.timer
.timer_pending
= FALSE
;
1217 /* If the last timer object was destroyed, then wake up the thread. */
1218 if (!--timerqueue
.objcount
)
1220 assert( list_empty( &timerqueue
.pending_timers
) );
1221 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
1224 timer
->u
.timer
.timer_initialized
= FALSE
;
1226 RtlLeaveCriticalSection( &timerqueue
.cs
);
1229 /***********************************************************************
1230 * waitqueue_thread_proc (internal)
1232 static void CALLBACK
waitqueue_thread_proc( void *param
)
1234 struct threadpool_object
*objects
[MAXIMUM_WAITQUEUE_OBJECTS
];
1235 HANDLE handles
[MAXIMUM_WAITQUEUE_OBJECTS
+ 1];
1236 struct waitqueue_bucket
*bucket
= param
;
1237 struct threadpool_object
*wait
, *next
;
1238 LARGE_INTEGER now
, timeout
;
1242 TRACE( "starting wait queue thread\n" );
1244 RtlEnterCriticalSection( &waitqueue
.cs
);
1248 NtQuerySystemTime( &now
);
1249 timeout
.QuadPart
= MAXLONGLONG
;
1252 LIST_FOR_EACH_ENTRY_SAFE( wait
, next
, &bucket
->waiting
, struct threadpool_object
,
1255 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1256 if (wait
->u
.wait
.timeout
<= now
.QuadPart
)
1258 /* Wait object timed out. */
1259 if ((wait
->u
.wait
.flags
& WT_EXECUTEONLYONCE
))
1261 list_remove( &wait
->u
.wait
.wait_entry
);
1262 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1264 if ((wait
->u
.wait
.flags
& (WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
)))
1266 InterlockedIncrement( &wait
->refcount
);
1267 wait
->num_pending_callbacks
++;
1268 RtlEnterCriticalSection( &wait
->pool
->cs
);
1269 tp_object_execute( wait
, TRUE
);
1270 RtlLeaveCriticalSection( &wait
->pool
->cs
);
1271 tp_object_release( wait
);
1273 else tp_object_submit( wait
, FALSE
);
1277 if (wait
->u
.wait
.timeout
< timeout
.QuadPart
)
1278 timeout
.QuadPart
= wait
->u
.wait
.timeout
;
1280 assert( num_handles
< MAXIMUM_WAITQUEUE_OBJECTS
);
1281 InterlockedIncrement( &wait
->refcount
);
1282 objects
[num_handles
] = wait
;
1283 handles
[num_handles
] = wait
->u
.wait
.handle
;
1288 if (!bucket
->objcount
)
1290 /* All wait objects have been destroyed, if no new wait objects are created
1291 * within some amount of time, then we can shutdown this thread. */
1292 assert( num_handles
== 0 );
1293 RtlLeaveCriticalSection( &waitqueue
.cs
);
1294 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1295 status
= NtWaitForMultipleObjects( 1, &bucket
->update_event
, TRUE
, bucket
->alertable
, &timeout
);
1296 RtlEnterCriticalSection( &waitqueue
.cs
);
1298 if (status
== STATUS_TIMEOUT
&& !bucket
->objcount
)
1303 handles
[num_handles
] = bucket
->update_event
;
1304 RtlLeaveCriticalSection( &waitqueue
.cs
);
1305 status
= NtWaitForMultipleObjects( num_handles
+ 1, handles
, TRUE
, bucket
->alertable
, &timeout
);
1306 RtlEnterCriticalSection( &waitqueue
.cs
);
1308 if (status
>= STATUS_WAIT_0
&& status
< STATUS_WAIT_0
+ num_handles
)
1310 wait
= objects
[status
- STATUS_WAIT_0
];
1311 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1312 if (wait
->u
.wait
.bucket
)
1314 /* Wait object signaled. */
1315 assert( wait
->u
.wait
.bucket
== bucket
);
1316 if ((wait
->u
.wait
.flags
& WT_EXECUTEONLYONCE
))
1318 list_remove( &wait
->u
.wait
.wait_entry
);
1319 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1321 if ((wait
->u
.wait
.flags
& (WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
)))
1323 wait
->u
.wait
.signaled
++;
1324 wait
->num_pending_callbacks
++;
1325 RtlEnterCriticalSection( &wait
->pool
->cs
);
1326 tp_object_execute( wait
, TRUE
);
1327 RtlLeaveCriticalSection( &wait
->pool
->cs
);
1329 else tp_object_submit( wait
, TRUE
);
1332 WARN("wait object %p triggered while object was destroyed\n", wait
);
1335 /* Release temporary references to wait objects. */
1338 wait
= objects
[--num_handles
];
1339 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1340 tp_object_release( wait
);
1344 /* Try to merge bucket with other threads. */
1345 if (waitqueue
.num_buckets
> 1 && bucket
->objcount
&&
1346 bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 1 / 3)
1348 struct waitqueue_bucket
*other_bucket
;
1349 LIST_FOR_EACH_ENTRY( other_bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1351 if (other_bucket
!= bucket
&& other_bucket
->objcount
&& other_bucket
->alertable
== bucket
->alertable
&&
1352 other_bucket
->objcount
+ bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 2 / 3)
1354 other_bucket
->objcount
+= bucket
->objcount
;
1355 bucket
->objcount
= 0;
1357 /* Update reserved list. */
1358 LIST_FOR_EACH_ENTRY( wait
, &bucket
->reserved
, struct threadpool_object
, u
.wait
.wait_entry
)
1360 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1361 wait
->u
.wait
.bucket
= other_bucket
;
1363 list_move_tail( &other_bucket
->reserved
, &bucket
->reserved
);
1365 /* Update waiting list. */
1366 LIST_FOR_EACH_ENTRY( wait
, &bucket
->waiting
, struct threadpool_object
, u
.wait
.wait_entry
)
1368 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1369 wait
->u
.wait
.bucket
= other_bucket
;
1371 list_move_tail( &other_bucket
->waiting
, &bucket
->waiting
);
1373 /* Move bucket to the end, to keep the probability of
1374 * newly added wait objects as small as possible. */
1375 list_remove( &bucket
->bucket_entry
);
1376 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1378 NtSetEvent( other_bucket
->update_event
, NULL
);
1385 /* Remove this bucket from the list. */
1386 list_remove( &bucket
->bucket_entry
);
1387 if (!--waitqueue
.num_buckets
)
1388 assert( list_empty( &waitqueue
.buckets
) );
1390 RtlLeaveCriticalSection( &waitqueue
.cs
);
1392 TRACE( "terminating wait queue thread\n" );
1394 assert( bucket
->objcount
== 0 );
1395 assert( list_empty( &bucket
->reserved
) );
1396 assert( list_empty( &bucket
->waiting
) );
1397 NtClose( bucket
->update_event
);
1399 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1400 RtlExitUserThread( 0 );
1403 /***********************************************************************
1404 * tp_waitqueue_lock (internal)
1406 static NTSTATUS
tp_waitqueue_lock( struct threadpool_object
*wait
)
1408 struct waitqueue_bucket
*bucket
;
1411 BOOL alertable
= (wait
->u
.wait
.flags
& WT_EXECUTEINIOTHREAD
) != 0;
1412 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1414 wait
->u
.wait
.signaled
= 0;
1415 wait
->u
.wait
.bucket
= NULL
;
1416 wait
->u
.wait
.wait_pending
= FALSE
;
1417 wait
->u
.wait
.timeout
= 0;
1418 wait
->u
.wait
.handle
= INVALID_HANDLE_VALUE
;
1420 RtlEnterCriticalSection( &waitqueue
.cs
);
1422 /* Try to assign to existing bucket if possible. */
1423 LIST_FOR_EACH_ENTRY( bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1425 if (bucket
->objcount
< MAXIMUM_WAITQUEUE_OBJECTS
&& bucket
->alertable
== alertable
)
1427 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1428 wait
->u
.wait
.bucket
= bucket
;
1431 status
= STATUS_SUCCESS
;
1436 /* Create a new bucket and corresponding worker thread. */
1437 bucket
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket
) );
1440 status
= STATUS_NO_MEMORY
;
1444 bucket
->objcount
= 0;
1445 bucket
->alertable
= alertable
;
1446 list_init( &bucket
->reserved
);
1447 list_init( &bucket
->waiting
);
1449 status
= NtCreateEvent( &bucket
->update_event
, EVENT_ALL_ACCESS
,
1450 NULL
, SynchronizationEvent
, FALSE
);
1453 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1457 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
1458 waitqueue_thread_proc
, bucket
, &thread
, NULL
);
1459 if (status
== STATUS_SUCCESS
)
1461 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1462 waitqueue
.num_buckets
++;
1464 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1465 wait
->u
.wait
.bucket
= bucket
;
1472 NtClose( bucket
->update_event
);
1473 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1477 RtlLeaveCriticalSection( &waitqueue
.cs
);
1481 /***********************************************************************
1482 * tp_waitqueue_unlock (internal)
1484 static void tp_waitqueue_unlock( struct threadpool_object
*wait
)
1486 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1488 RtlEnterCriticalSection( &waitqueue
.cs
);
1489 if (wait
->u
.wait
.bucket
)
1491 struct waitqueue_bucket
*bucket
= wait
->u
.wait
.bucket
;
1492 assert( bucket
->objcount
> 0 );
1494 list_remove( &wait
->u
.wait
.wait_entry
);
1495 wait
->u
.wait
.bucket
= NULL
;
1498 NtSetEvent( bucket
->update_event
, NULL
);
1500 RtlLeaveCriticalSection( &waitqueue
.cs
);
1503 static void CALLBACK
ioqueue_thread_proc( void *param
)
1505 struct io_completion
*completion
;
1506 struct threadpool_object
*io
;
1507 IO_STATUS_BLOCK iosb
;
1508 ULONG_PTR key
, value
;
1511 TRACE( "starting I/O completion thread\n" );
1513 RtlEnterCriticalSection( &ioqueue
.cs
);
1517 RtlLeaveCriticalSection( &ioqueue
.cs
);
1518 if ((status
= NtRemoveIoCompletion( ioqueue
.port
, &key
, &value
, &iosb
, NULL
)))
1519 ERR("NtRemoveIoCompletion failed, status %#x.\n", status
);
1520 RtlEnterCriticalSection( &ioqueue
.cs
);
1522 io
= (struct threadpool_object
*)key
;
1524 if (io
&& io
->shutdown
)
1526 if (iosb
.u
.Status
!= STATUS_THREADPOOL_RELEASED_DURING_OPERATION
)
1528 /* Skip remaining completions until the final one. */
1532 TRACE( "Releasing io %p.\n", io
);
1533 tp_object_release( io
);
1537 RtlEnterCriticalSection( &io
->pool
->cs
);
1539 if (!array_reserve((void **)&io
->u
.io
.completions
, &io
->u
.io
.completion_max
,
1540 io
->u
.io
.completion_count
+ 1, sizeof(*io
->u
.io
.completions
)))
1542 ERR("Failed to allocate memory.\n");
1543 RtlLeaveCriticalSection( &io
->pool
->cs
);
1547 completion
= &io
->u
.io
.completions
[io
->u
.io
.completion_count
++];
1548 completion
->iosb
= iosb
;
1549 completion
->cvalue
= value
;
1551 tp_object_submit( io
, FALSE
);
1553 RtlLeaveCriticalSection( &io
->pool
->cs
);
1556 if (!ioqueue
.objcount
)
1558 /* All I/O objects have been destroyed; if no new objects are
1559 * created within some amount of time, then we can shutdown this
1561 LARGE_INTEGER timeout
= {.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000};
1562 if (RtlSleepConditionVariableCS( &ioqueue
.update_event
, &ioqueue
.cs
,
1563 &timeout
) == STATUS_TIMEOUT
&& !ioqueue
.objcount
)
1568 RtlLeaveCriticalSection( &ioqueue
.cs
);
1570 TRACE( "terminating I/O completion thread\n" );
1572 RtlExitUserThread( 0 );
1575 static NTSTATUS
tp_ioqueue_lock( struct threadpool_object
*io
, HANDLE file
)
1577 NTSTATUS status
= STATUS_SUCCESS
;
1579 assert( io
->type
== TP_OBJECT_TYPE_IO
);
1581 RtlEnterCriticalSection( &ioqueue
.cs
);
1583 if (!ioqueue
.port
&& (status
= NtCreateIoCompletion( &ioqueue
.port
,
1584 IO_COMPLETION_ALL_ACCESS
, NULL
, 0 )))
1586 RtlLeaveCriticalSection( &ioqueue
.cs
);
1590 if (!ioqueue
.thread_running
)
1594 if (!(status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
,
1595 0, 0, 0, ioqueue_thread_proc
, NULL
, &thread
, NULL
)))
1597 ioqueue
.thread_running
= TRUE
;
1602 if (status
== STATUS_SUCCESS
)
1604 FILE_COMPLETION_INFORMATION info
;
1605 IO_STATUS_BLOCK iosb
;
1607 info
.CompletionPort
= ioqueue
.port
;
1608 info
.CompletionKey
= (ULONG_PTR
)io
;
1610 status
= NtSetInformationFile( file
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
1613 if (status
== STATUS_SUCCESS
)
1615 if (!ioqueue
.objcount
++)
1616 RtlWakeConditionVariable( &ioqueue
.update_event
);
1619 RtlLeaveCriticalSection( &ioqueue
.cs
);
1623 /***********************************************************************
1624 * tp_threadpool_alloc (internal)
1626 * Allocates a new threadpool object.
1628 static NTSTATUS
tp_threadpool_alloc( struct threadpool
**out
)
1630 IMAGE_NT_HEADERS
*nt
= RtlImageNtHeader( NtCurrentTeb()->Peb
->ImageBaseAddress
);
1631 struct threadpool
*pool
;
1634 pool
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool
) );
1636 return STATUS_NO_MEMORY
;
1640 pool
->shutdown
= FALSE
;
1642 RtlInitializeCriticalSection( &pool
->cs
);
1643 pool
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool.cs");
1645 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1646 list_init( &pool
->pools
[i
] );
1647 RtlInitializeConditionVariable( &pool
->update_event
);
1649 pool
->max_workers
= 500;
1650 pool
->min_workers
= 0;
1651 pool
->num_workers
= 0;
1652 pool
->num_busy_workers
= 0;
1653 pool
->stack_info
.StackReserve
= nt
->OptionalHeader
.SizeOfStackReserve
;
1654 pool
->stack_info
.StackCommit
= nt
->OptionalHeader
.SizeOfStackCommit
;
1656 TRACE( "allocated threadpool %p\n", pool
);
1659 return STATUS_SUCCESS
;
1662 /***********************************************************************
1663 * tp_threadpool_shutdown (internal)
1665 * Prepares the shutdown of a threadpool object and notifies all worker
1666 * threads to terminate (after all remaining work items have been
1669 static void tp_threadpool_shutdown( struct threadpool
*pool
)
1671 assert( pool
!= default_threadpool
);
1673 pool
->shutdown
= TRUE
;
1674 RtlWakeAllConditionVariable( &pool
->update_event
);
1677 /***********************************************************************
1678 * tp_threadpool_release (internal)
1680 * Releases a reference to a threadpool object.
1682 static BOOL
tp_threadpool_release( struct threadpool
*pool
)
1686 if (InterlockedDecrement( &pool
->refcount
))
1689 TRACE( "destroying threadpool %p\n", pool
);
1691 assert( pool
->shutdown
);
1692 assert( !pool
->objcount
);
1693 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1694 assert( list_empty( &pool
->pools
[i
] ) );
1696 pool
->cs
.DebugInfo
->Spare
[0] = 0;
1697 RtlDeleteCriticalSection( &pool
->cs
);
1699 RtlFreeHeap( GetProcessHeap(), 0, pool
);
1703 /***********************************************************************
1704 * tp_threadpool_lock (internal)
1706 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1707 * block. When the lock is acquired successfully, it is guaranteed that
1708 * there is at least one worker thread to process tasks.
1710 static NTSTATUS
tp_threadpool_lock( struct threadpool
**out
, TP_CALLBACK_ENVIRON
*environment
)
1712 struct threadpool
*pool
= NULL
;
1713 NTSTATUS status
= STATUS_SUCCESS
;
1717 /* Validate environment parameters. */
1718 if (environment
->Version
== 3)
1720 TP_CALLBACK_ENVIRON_V3
*environment3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1722 switch (environment3
->CallbackPriority
)
1724 case TP_CALLBACK_PRIORITY_HIGH
:
1725 case TP_CALLBACK_PRIORITY_NORMAL
:
1726 case TP_CALLBACK_PRIORITY_LOW
:
1729 return STATUS_INVALID_PARAMETER
;
1733 pool
= (struct threadpool
*)environment
->Pool
;
1738 if (!default_threadpool
)
1740 status
= tp_threadpool_alloc( &pool
);
1741 if (status
!= STATUS_SUCCESS
)
1744 if (InterlockedCompareExchangePointer( (void *)&default_threadpool
, pool
, NULL
) != NULL
)
1746 tp_threadpool_shutdown( pool
);
1747 tp_threadpool_release( pool
);
1751 pool
= default_threadpool
;
1754 RtlEnterCriticalSection( &pool
->cs
);
1756 /* Make sure that the threadpool has at least one thread. */
1757 if (!pool
->num_workers
)
1758 status
= tp_new_worker_thread( pool
);
1760 /* Keep a reference, and increment objcount to ensure that the
1761 * last thread doesn't terminate. */
1762 if (status
== STATUS_SUCCESS
)
1764 InterlockedIncrement( &pool
->refcount
);
1768 RtlLeaveCriticalSection( &pool
->cs
);
1770 if (status
!= STATUS_SUCCESS
)
1774 return STATUS_SUCCESS
;
1777 /***********************************************************************
1778 * tp_threadpool_unlock (internal)
1780 * Releases a lock on a threadpool.
1782 static void tp_threadpool_unlock( struct threadpool
*pool
)
1784 RtlEnterCriticalSection( &pool
->cs
);
1786 RtlLeaveCriticalSection( &pool
->cs
);
1787 tp_threadpool_release( pool
);
1790 /***********************************************************************
1791 * tp_group_alloc (internal)
1793 * Allocates a new threadpool group object.
1795 static NTSTATUS
tp_group_alloc( struct threadpool_group
**out
)
1797 struct threadpool_group
*group
;
1799 group
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group
) );
1801 return STATUS_NO_MEMORY
;
1803 group
->refcount
= 1;
1804 group
->shutdown
= FALSE
;
1806 RtlInitializeCriticalSection( &group
->cs
);
1807 group
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool_group.cs");
1809 list_init( &group
->members
);
1811 TRACE( "allocated group %p\n", group
);
1814 return STATUS_SUCCESS
;
1817 /***********************************************************************
1818 * tp_group_shutdown (internal)
1820 * Marks the group object for shutdown.
1822 static void tp_group_shutdown( struct threadpool_group
*group
)
1824 group
->shutdown
= TRUE
;
1827 /***********************************************************************
1828 * tp_group_release (internal)
1830 * Releases a reference to a group object.
1832 static BOOL
tp_group_release( struct threadpool_group
*group
)
1834 if (InterlockedDecrement( &group
->refcount
))
1837 TRACE( "destroying group %p\n", group
);
1839 assert( group
->shutdown
);
1840 assert( list_empty( &group
->members
) );
1842 group
->cs
.DebugInfo
->Spare
[0] = 0;
1843 RtlDeleteCriticalSection( &group
->cs
);
1845 RtlFreeHeap( GetProcessHeap(), 0, group
);
1849 /***********************************************************************
1850 * tp_object_initialize (internal)
1852 * Initializes members of a threadpool object.
1854 static void tp_object_initialize( struct threadpool_object
*object
, struct threadpool
*pool
,
1855 PVOID userdata
, TP_CALLBACK_ENVIRON
*environment
)
1857 BOOL is_simple_callback
= (object
->type
== TP_OBJECT_TYPE_SIMPLE
);
1859 object
->refcount
= 1;
1860 object
->shutdown
= FALSE
;
1862 object
->pool
= pool
;
1863 object
->group
= NULL
;
1864 object
->userdata
= userdata
;
1865 object
->group_cancel_callback
= NULL
;
1866 object
->finalization_callback
= NULL
;
1867 object
->may_run_long
= 0;
1868 object
->race_dll
= NULL
;
1869 object
->priority
= TP_CALLBACK_PRIORITY_NORMAL
;
1871 memset( &object
->group_entry
, 0, sizeof(object
->group_entry
) );
1872 object
->is_group_member
= FALSE
;
1874 memset( &object
->pool_entry
, 0, sizeof(object
->pool_entry
) );
1875 RtlInitializeConditionVariable( &object
->finished_event
);
1876 RtlInitializeConditionVariable( &object
->group_finished_event
);
1877 object
->completed_event
= NULL
;
1878 object
->num_pending_callbacks
= 0;
1879 object
->num_running_callbacks
= 0;
1880 object
->num_associated_callbacks
= 0;
1884 if (environment
->Version
!= 1 && environment
->Version
!= 3)
1885 FIXME( "unsupported environment version %u\n", environment
->Version
);
1887 object
->group
= impl_from_TP_CLEANUP_GROUP( environment
->CleanupGroup
);
1888 object
->group_cancel_callback
= environment
->CleanupGroupCancelCallback
;
1889 object
->finalization_callback
= environment
->FinalizationCallback
;
1890 object
->may_run_long
= environment
->u
.s
.LongFunction
!= 0;
1891 object
->race_dll
= environment
->RaceDll
;
1892 if (environment
->Version
== 3)
1894 TP_CALLBACK_ENVIRON_V3
*environment_v3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1896 object
->priority
= environment_v3
->CallbackPriority
;
1897 assert( object
->priority
< ARRAY_SIZE(pool
->pools
) );
1900 if (environment
->ActivationContext
)
1901 FIXME( "activation context not supported yet\n" );
1903 if (environment
->u
.s
.Persistent
)
1904 FIXME( "persistent threads not supported yet\n" );
1907 if (object
->race_dll
)
1908 LdrAddRefDll( 0, object
->race_dll
);
1910 TRACE( "allocated object %p of type %u\n", object
, object
->type
);
1912 /* For simple callbacks we have to run tp_object_submit before adding this object
1913 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1914 * will be set, and tp_object_submit would fail with an assertion. */
1916 if (is_simple_callback
)
1917 tp_object_submit( object
, FALSE
);
1921 struct threadpool_group
*group
= object
->group
;
1922 InterlockedIncrement( &group
->refcount
);
1924 RtlEnterCriticalSection( &group
->cs
);
1925 list_add_tail( &group
->members
, &object
->group_entry
);
1926 object
->is_group_member
= TRUE
;
1927 RtlLeaveCriticalSection( &group
->cs
);
1930 if (is_simple_callback
)
1931 tp_object_release( object
);
1934 static void tp_object_prio_queue( struct threadpool_object
*object
)
1936 ++object
->pool
->num_busy_workers
;
1937 list_add_tail( &object
->pool
->pools
[object
->priority
], &object
->pool_entry
);
1940 /***********************************************************************
1941 * tp_object_submit (internal)
1943 * Submits a threadpool object to the associated threadpool. This
1944 * function has to be VOID because TpPostWork can never fail on Windows.
1946 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
)
1948 struct threadpool
*pool
= object
->pool
;
1949 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
1951 assert( !object
->shutdown
);
1952 assert( !pool
->shutdown
);
1954 RtlEnterCriticalSection( &pool
->cs
);
1956 /* Start new worker threads if required. */
1957 if (pool
->num_busy_workers
>= pool
->num_workers
&&
1958 pool
->num_workers
< pool
->max_workers
)
1959 status
= tp_new_worker_thread( pool
);
1961 /* Queue work item and increment refcount. */
1962 InterlockedIncrement( &object
->refcount
);
1963 if (!object
->num_pending_callbacks
++)
1964 tp_object_prio_queue( object
);
1966 /* Count how often the object was signaled. */
1967 if (object
->type
== TP_OBJECT_TYPE_WAIT
&& signaled
)
1968 object
->u
.wait
.signaled
++;
1970 /* No new thread started - wake up one existing thread. */
1971 if (status
!= STATUS_SUCCESS
)
1973 assert( pool
->num_workers
> 0 );
1974 RtlWakeConditionVariable( &pool
->update_event
);
1977 RtlLeaveCriticalSection( &pool
->cs
);
1980 /***********************************************************************
1981 * tp_object_cancel (internal)
1983 * Cancels all currently pending callbacks for a specific object.
1985 static void tp_object_cancel( struct threadpool_object
*object
)
1987 struct threadpool
*pool
= object
->pool
;
1988 LONG pending_callbacks
= 0;
1990 RtlEnterCriticalSection( &pool
->cs
);
1991 if (object
->num_pending_callbacks
)
1993 pending_callbacks
= object
->num_pending_callbacks
;
1994 object
->num_pending_callbacks
= 0;
1995 list_remove( &object
->pool_entry
);
1997 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
1998 object
->u
.wait
.signaled
= 0;
2000 if (object
->type
== TP_OBJECT_TYPE_IO
)
2001 object
->u
.io
.pending_count
= 0;
2002 RtlLeaveCriticalSection( &pool
->cs
);
2004 while (pending_callbacks
--)
2005 tp_object_release( object
);
2008 static BOOL
object_is_finished( struct threadpool_object
*object
, BOOL group
)
2010 if (object
->num_pending_callbacks
)
2012 if (object
->type
== TP_OBJECT_TYPE_IO
&& object
->u
.io
.pending_count
)
2016 return !object
->num_running_callbacks
;
2018 return !object
->num_associated_callbacks
;
2021 /***********************************************************************
2022 * tp_object_wait (internal)
2024 * Waits until all pending and running callbacks of a specific object
2025 * have been processed.
2027 static void tp_object_wait( struct threadpool_object
*object
, BOOL group_wait
)
2029 struct threadpool
*pool
= object
->pool
;
2031 RtlEnterCriticalSection( &pool
->cs
);
2032 while (!object_is_finished( object
, group_wait
))
2035 RtlSleepConditionVariableCS( &object
->group_finished_event
, &pool
->cs
, NULL
);
2037 RtlSleepConditionVariableCS( &object
->finished_event
, &pool
->cs
, NULL
);
2039 RtlLeaveCriticalSection( &pool
->cs
);
2042 /***********************************************************************
2043 * tp_object_prepare_shutdown (internal)
2045 * Prepares a threadpool object for shutdown.
2047 static void tp_object_prepare_shutdown( struct threadpool_object
*object
)
2049 if (object
->type
== TP_OBJECT_TYPE_TIMER
)
2050 tp_timerqueue_unlock( object
);
2051 else if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2052 tp_waitqueue_unlock( object
);
2055 /***********************************************************************
2056 * tp_object_release (internal)
2058 * Releases a reference to a threadpool object.
2060 static BOOL
tp_object_release( struct threadpool_object
*object
)
2062 if (InterlockedDecrement( &object
->refcount
))
2065 TRACE( "destroying object %p of type %u\n", object
, object
->type
);
2067 assert( object
->shutdown
);
2068 assert( !object
->num_pending_callbacks
);
2069 assert( !object
->num_running_callbacks
);
2070 assert( !object
->num_associated_callbacks
);
2072 /* release reference to the group */
2075 struct threadpool_group
*group
= object
->group
;
2077 RtlEnterCriticalSection( &group
->cs
);
2078 if (object
->is_group_member
)
2080 list_remove( &object
->group_entry
);
2081 object
->is_group_member
= FALSE
;
2083 RtlLeaveCriticalSection( &group
->cs
);
2085 tp_group_release( group
);
2088 tp_threadpool_unlock( object
->pool
);
2090 if (object
->race_dll
)
2091 LdrUnloadDll( object
->race_dll
);
2093 if (object
->completed_event
&& object
->completed_event
!= INVALID_HANDLE_VALUE
)
2094 NtSetEvent( object
->completed_event
, NULL
);
2096 RtlFreeHeap( GetProcessHeap(), 0, object
);
2100 static struct list
*threadpool_get_next_item( const struct threadpool
*pool
)
2105 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
2107 if ((ptr
= list_head( &pool
->pools
[i
] )))
2114 /***********************************************************************
2115 * tp_object_execute (internal)
2117 * Executes a threadpool object callback, object->pool->cs has to be
2120 static void tp_object_execute( struct threadpool_object
*object
, BOOL wait_thread
)
2122 TP_CALLBACK_INSTANCE
*callback_instance
;
2123 struct threadpool_instance instance
;
2124 struct io_completion completion
;
2125 struct threadpool
*pool
= object
->pool
;
2126 TP_WAIT_RESULT wait_result
= 0;
2129 object
->num_pending_callbacks
--;
2131 /* For wait objects check if they were signaled or have timed out. */
2132 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2134 wait_result
= object
->u
.wait
.signaled
? WAIT_OBJECT_0
: WAIT_TIMEOUT
;
2135 if (wait_result
== WAIT_OBJECT_0
) object
->u
.wait
.signaled
--;
2137 else if (object
->type
== TP_OBJECT_TYPE_IO
)
2139 assert( object
->u
.io
.completion_count
);
2140 completion
= object
->u
.io
.completions
[--object
->u
.io
.completion_count
];
2141 object
->u
.io
.pending_count
--;
2144 /* Leave critical section and do the actual callback. */
2145 object
->num_associated_callbacks
++;
2146 object
->num_running_callbacks
++;
2147 RtlLeaveCriticalSection( &pool
->cs
);
2148 if (wait_thread
) RtlLeaveCriticalSection( &waitqueue
.cs
);
2150 /* Initialize threadpool instance struct. */
2151 callback_instance
= (TP_CALLBACK_INSTANCE
*)&instance
;
2152 instance
.object
= object
;
2153 instance
.threadid
= GetCurrentThreadId();
2154 instance
.associated
= TRUE
;
2155 instance
.may_run_long
= object
->may_run_long
;
2156 instance
.cleanup
.critical_section
= NULL
;
2157 instance
.cleanup
.mutex
= NULL
;
2158 instance
.cleanup
.semaphore
= NULL
;
2159 instance
.cleanup
.semaphore_count
= 0;
2160 instance
.cleanup
.event
= NULL
;
2161 instance
.cleanup
.library
= NULL
;
2163 switch (object
->type
)
2165 case TP_OBJECT_TYPE_SIMPLE
:
2167 TRACE( "executing simple callback %p(%p, %p)\n",
2168 object
->u
.simple
.callback
, callback_instance
, object
->userdata
);
2169 object
->u
.simple
.callback( callback_instance
, object
->userdata
);
2170 TRACE( "callback %p returned\n", object
->u
.simple
.callback
);
2174 case TP_OBJECT_TYPE_WORK
:
2176 TRACE( "executing work callback %p(%p, %p, %p)\n",
2177 object
->u
.work
.callback
, callback_instance
, object
->userdata
, object
);
2178 object
->u
.work
.callback( callback_instance
, object
->userdata
, (TP_WORK
*)object
);
2179 TRACE( "callback %p returned\n", object
->u
.work
.callback
);
2183 case TP_OBJECT_TYPE_TIMER
:
2185 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2186 object
->u
.timer
.callback
, callback_instance
, object
->userdata
, object
);
2187 object
->u
.timer
.callback( callback_instance
, object
->userdata
, (TP_TIMER
*)object
);
2188 TRACE( "callback %p returned\n", object
->u
.timer
.callback
);
2192 case TP_OBJECT_TYPE_WAIT
:
2194 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2195 object
->u
.wait
.callback
, callback_instance
, object
->userdata
, object
, wait_result
);
2196 object
->u
.wait
.callback( callback_instance
, object
->userdata
, (TP_WAIT
*)object
, wait_result
);
2197 TRACE( "callback %p returned\n", object
->u
.wait
.callback
);
2201 case TP_OBJECT_TYPE_IO
:
2203 TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
2204 object
->u
.io
.callback
, callback_instance
, object
->userdata
,
2205 completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2206 object
->u
.io
.callback( callback_instance
, object
->userdata
,
2207 (void *)completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2208 TRACE( "callback %p returned\n", object
->u
.io
.callback
);
2217 /* Execute finalization callback. */
2218 if (object
->finalization_callback
)
2220 TRACE( "executing finalization callback %p(%p, %p)\n",
2221 object
->finalization_callback
, callback_instance
, object
->userdata
);
2222 object
->finalization_callback( callback_instance
, object
->userdata
);
2223 TRACE( "callback %p returned\n", object
->finalization_callback
);
2226 /* Execute cleanup tasks. */
2227 if (instance
.cleanup
.critical_section
)
2229 RtlLeaveCriticalSection( instance
.cleanup
.critical_section
);
2231 if (instance
.cleanup
.mutex
)
2233 status
= NtReleaseMutant( instance
.cleanup
.mutex
, NULL
);
2234 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2236 if (instance
.cleanup
.semaphore
)
2238 status
= NtReleaseSemaphore( instance
.cleanup
.semaphore
, instance
.cleanup
.semaphore_count
, NULL
);
2239 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2241 if (instance
.cleanup
.event
)
2243 status
= NtSetEvent( instance
.cleanup
.event
, NULL
);
2244 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2246 if (instance
.cleanup
.library
)
2248 LdrUnloadDll( instance
.cleanup
.library
);
2252 if (wait_thread
) RtlEnterCriticalSection( &waitqueue
.cs
);
2253 RtlEnterCriticalSection( &pool
->cs
);
2255 /* Simple callbacks are automatically shutdown after execution. */
2256 if (object
->type
== TP_OBJECT_TYPE_SIMPLE
)
2258 tp_object_prepare_shutdown( object
);
2259 object
->shutdown
= TRUE
;
2262 object
->num_running_callbacks
--;
2263 if (object_is_finished( object
, TRUE
))
2264 RtlWakeAllConditionVariable( &object
->group_finished_event
);
2266 if (instance
.associated
)
2268 object
->num_associated_callbacks
--;
2269 if (object_is_finished( object
, FALSE
))
2270 RtlWakeAllConditionVariable( &object
->finished_event
);
2274 /***********************************************************************
2275 * threadpool_worker_proc (internal)
2277 static void CALLBACK
threadpool_worker_proc( void *param
)
2279 struct threadpool
*pool
= param
;
2280 LARGE_INTEGER timeout
;
2283 TRACE( "starting worker thread for pool %p\n", pool
);
2285 RtlEnterCriticalSection( &pool
->cs
);
2288 while ((ptr
= threadpool_get_next_item( pool
)))
2290 struct threadpool_object
*object
= LIST_ENTRY( ptr
, struct threadpool_object
, pool_entry
);
2291 assert( object
->num_pending_callbacks
> 0 );
2293 /* If further pending callbacks are queued, move the work item to
2294 * the end of the pool list. Otherwise remove it from the pool. */
2295 list_remove( &object
->pool_entry
);
2296 if (object
->num_pending_callbacks
> 1)
2297 tp_object_prio_queue( object
);
2299 tp_object_execute( object
, FALSE
);
2301 assert(pool
->num_busy_workers
);
2302 pool
->num_busy_workers
--;
2304 tp_object_release( object
);
2307 /* Shutdown worker thread if requested. */
2311 /* Wait for new tasks or until the timeout expires. A thread only terminates
2312 * when no new tasks are available, and the number of threads can be
2313 * decreased without violating the min_workers limit. An exception is when
2314 * min_workers == 0, then objcount is used to detect if the last thread
2315 * can be terminated. */
2316 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
2317 if (RtlSleepConditionVariableCS( &pool
->update_event
, &pool
->cs
, &timeout
) == STATUS_TIMEOUT
&&
2318 !threadpool_get_next_item( pool
) && (pool
->num_workers
> max( pool
->min_workers
, 1 ) ||
2319 (!pool
->min_workers
&& !pool
->objcount
)))
2324 pool
->num_workers
--;
2325 RtlLeaveCriticalSection( &pool
->cs
);
2327 TRACE( "terminating worker thread for pool %p\n", pool
);
2328 tp_threadpool_release( pool
);
2329 RtlExitUserThread( 0 );
2332 /***********************************************************************
2333 * TpAllocCleanupGroup (NTDLL.@)
2335 NTSTATUS WINAPI
TpAllocCleanupGroup( TP_CLEANUP_GROUP
**out
)
2337 TRACE( "%p\n", out
);
2339 return tp_group_alloc( (struct threadpool_group
**)out
);
2342 /***********************************************************************
2343 * TpAllocIoCompletion (NTDLL.@)
2345 NTSTATUS WINAPI
TpAllocIoCompletion( TP_IO
**out
, HANDLE file
, PTP_IO_CALLBACK callback
,
2346 void *userdata
, TP_CALLBACK_ENVIRON
*environment
)
2348 struct threadpool_object
*object
;
2349 struct threadpool
*pool
;
2352 TRACE( "%p %p %p %p %p\n", out
, file
, callback
, userdata
, environment
);
2354 if (!(object
= RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY
, sizeof(*object
) )))
2355 return STATUS_NO_MEMORY
;
2357 if ((status
= tp_threadpool_lock( &pool
, environment
)))
2359 RtlFreeHeap( GetProcessHeap(), 0, object
);
2363 object
->type
= TP_OBJECT_TYPE_IO
;
2364 object
->u
.io
.callback
= callback
;
2365 if (!(object
->u
.io
.completions
= RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object
->u
.io
.completions
) )))
2367 tp_threadpool_unlock( pool
);
2368 RtlFreeHeap( GetProcessHeap(), 0, object
);
2372 if ((status
= tp_ioqueue_lock( object
, file
)))
2374 tp_threadpool_unlock( pool
);
2375 RtlFreeHeap( GetProcessHeap(), 0, object
->u
.io
.completions
);
2376 RtlFreeHeap( GetProcessHeap(), 0, object
);
2380 tp_object_initialize( object
, pool
, userdata
, environment
);
2382 *out
= (TP_IO
*)object
;
2383 return STATUS_SUCCESS
;
2386 /***********************************************************************
2387 * TpAllocPool (NTDLL.@)
2389 NTSTATUS WINAPI
TpAllocPool( TP_POOL
**out
, PVOID reserved
)
2391 TRACE( "%p %p\n", out
, reserved
);
2394 FIXME( "reserved argument is nonzero (%p)\n", reserved
);
2396 return tp_threadpool_alloc( (struct threadpool
**)out
);
2399 /***********************************************************************
2400 * TpAllocTimer (NTDLL.@)
2402 NTSTATUS WINAPI
TpAllocTimer( TP_TIMER
**out
, PTP_TIMER_CALLBACK callback
, PVOID userdata
,
2403 TP_CALLBACK_ENVIRON
*environment
)
2405 struct threadpool_object
*object
;
2406 struct threadpool
*pool
;
2409 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2411 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2413 return STATUS_NO_MEMORY
;
2415 status
= tp_threadpool_lock( &pool
, environment
);
2418 RtlFreeHeap( GetProcessHeap(), 0, object
);
2422 object
->type
= TP_OBJECT_TYPE_TIMER
;
2423 object
->u
.timer
.callback
= callback
;
2425 status
= tp_timerqueue_lock( object
);
2428 tp_threadpool_unlock( pool
);
2429 RtlFreeHeap( GetProcessHeap(), 0, object
);
2433 tp_object_initialize( object
, pool
, userdata
, environment
);
2435 *out
= (TP_TIMER
*)object
;
2436 return STATUS_SUCCESS
;
2439 static NTSTATUS
tp_alloc_wait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2440 TP_CALLBACK_ENVIRON
*environment
, DWORD flags
)
2442 struct threadpool_object
*object
;
2443 struct threadpool
*pool
;
2446 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2448 return STATUS_NO_MEMORY
;
2450 status
= tp_threadpool_lock( &pool
, environment
);
2453 RtlFreeHeap( GetProcessHeap(), 0, object
);
2457 object
->type
= TP_OBJECT_TYPE_WAIT
;
2458 object
->u
.wait
.callback
= callback
;
2459 object
->u
.wait
.flags
= flags
;
2461 status
= tp_waitqueue_lock( object
);
2464 tp_threadpool_unlock( pool
);
2465 RtlFreeHeap( GetProcessHeap(), 0, object
);
2469 tp_object_initialize( object
, pool
, userdata
, environment
);
2471 *out
= (TP_WAIT
*)object
;
2472 return STATUS_SUCCESS
;
2475 /***********************************************************************
2476 * TpAllocWait (NTDLL.@)
2478 NTSTATUS WINAPI
TpAllocWait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2479 TP_CALLBACK_ENVIRON
*environment
)
2481 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2482 return tp_alloc_wait( out
, callback
, userdata
, environment
, WT_EXECUTEONLYONCE
);
2485 /***********************************************************************
2486 * TpAllocWork (NTDLL.@)
2488 NTSTATUS WINAPI
TpAllocWork( TP_WORK
**out
, PTP_WORK_CALLBACK callback
, PVOID userdata
,
2489 TP_CALLBACK_ENVIRON
*environment
)
2491 struct threadpool_object
*object
;
2492 struct threadpool
*pool
;
2495 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2497 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2499 return STATUS_NO_MEMORY
;
2501 status
= tp_threadpool_lock( &pool
, environment
);
2504 RtlFreeHeap( GetProcessHeap(), 0, object
);
2508 object
->type
= TP_OBJECT_TYPE_WORK
;
2509 object
->u
.work
.callback
= callback
;
2510 tp_object_initialize( object
, pool
, userdata
, environment
);
2512 *out
= (TP_WORK
*)object
;
2513 return STATUS_SUCCESS
;
2516 /***********************************************************************
2517 * TpCancelAsyncIoOperation (NTDLL.@)
2519 void WINAPI
TpCancelAsyncIoOperation( TP_IO
*io
)
2521 struct threadpool_object
*this = impl_from_TP_IO( io
);
2523 TRACE( "%p\n", io
);
2525 RtlEnterCriticalSection( &this->pool
->cs
);
2527 this->u
.io
.pending_count
--;
2528 if (object_is_finished( this, TRUE
))
2529 RtlWakeAllConditionVariable( &this->group_finished_event
);
2530 if (object_is_finished( this, FALSE
))
2531 RtlWakeAllConditionVariable( &this->finished_event
);
2533 RtlLeaveCriticalSection( &this->pool
->cs
);
2536 /***********************************************************************
2537 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2539 VOID WINAPI
TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE
*instance
, CRITICAL_SECTION
*crit
)
2541 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2543 TRACE( "%p %p\n", instance
, crit
);
2545 if (!this->cleanup
.critical_section
)
2546 this->cleanup
.critical_section
= crit
;
2549 /***********************************************************************
2550 * TpCallbackMayRunLong (NTDLL.@)
2552 NTSTATUS WINAPI
TpCallbackMayRunLong( TP_CALLBACK_INSTANCE
*instance
)
2554 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2555 struct threadpool_object
*object
= this->object
;
2556 struct threadpool
*pool
;
2557 NTSTATUS status
= STATUS_SUCCESS
;
2559 TRACE( "%p\n", instance
);
2561 if (this->threadid
!= GetCurrentThreadId())
2563 ERR("called from wrong thread, ignoring\n");
2564 return STATUS_UNSUCCESSFUL
; /* FIXME */
2567 if (this->may_run_long
)
2568 return STATUS_SUCCESS
;
2570 pool
= object
->pool
;
2571 RtlEnterCriticalSection( &pool
->cs
);
2573 /* Start new worker threads if required. */
2574 if (pool
->num_busy_workers
>= pool
->num_workers
)
2576 if (pool
->num_workers
< pool
->max_workers
)
2578 status
= tp_new_worker_thread( pool
);
2582 status
= STATUS_TOO_MANY_THREADS
;
2586 RtlLeaveCriticalSection( &pool
->cs
);
2587 this->may_run_long
= TRUE
;
2591 /***********************************************************************
2592 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2594 VOID WINAPI
TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE mutex
)
2596 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2598 TRACE( "%p %p\n", instance
, mutex
);
2600 if (!this->cleanup
.mutex
)
2601 this->cleanup
.mutex
= mutex
;
2604 /***********************************************************************
2605 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2607 VOID WINAPI
TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE semaphore
, DWORD count
)
2609 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2611 TRACE( "%p %p %u\n", instance
, semaphore
, count
);
2613 if (!this->cleanup
.semaphore
)
2615 this->cleanup
.semaphore
= semaphore
;
2616 this->cleanup
.semaphore_count
= count
;
2620 /***********************************************************************
2621 * TpCallbackSetEventOnCompletion (NTDLL.@)
2623 VOID WINAPI
TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE event
)
2625 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2627 TRACE( "%p %p\n", instance
, event
);
2629 if (!this->cleanup
.event
)
2630 this->cleanup
.event
= event
;
2633 /***********************************************************************
2634 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2636 VOID WINAPI
TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HMODULE module
)
2638 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2640 TRACE( "%p %p\n", instance
, module
);
2642 if (!this->cleanup
.library
)
2643 this->cleanup
.library
= module
;
2646 /***********************************************************************
2647 * TpDisassociateCallback (NTDLL.@)
2649 VOID WINAPI
TpDisassociateCallback( TP_CALLBACK_INSTANCE
*instance
)
2651 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2652 struct threadpool_object
*object
= this->object
;
2653 struct threadpool
*pool
;
2655 TRACE( "%p\n", instance
);
2657 if (this->threadid
!= GetCurrentThreadId())
2659 ERR("called from wrong thread, ignoring\n");
2663 if (!this->associated
)
2666 pool
= object
->pool
;
2667 RtlEnterCriticalSection( &pool
->cs
);
2669 object
->num_associated_callbacks
--;
2670 if (object_is_finished( object
, FALSE
))
2671 RtlWakeAllConditionVariable( &object
->finished_event
);
2673 RtlLeaveCriticalSection( &pool
->cs
);
2674 this->associated
= FALSE
;
2677 /***********************************************************************
2678 * TpIsTimerSet (NTDLL.@)
2680 BOOL WINAPI
TpIsTimerSet( TP_TIMER
*timer
)
2682 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2684 TRACE( "%p\n", timer
);
2686 return this->u
.timer
.timer_set
;
2689 /***********************************************************************
2690 * TpPostWork (NTDLL.@)
2692 VOID WINAPI
TpPostWork( TP_WORK
*work
)
2694 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2696 TRACE( "%p\n", work
);
2698 tp_object_submit( this, FALSE
);
2701 /***********************************************************************
2702 * TpReleaseCleanupGroup (NTDLL.@)
2704 VOID WINAPI
TpReleaseCleanupGroup( TP_CLEANUP_GROUP
*group
)
2706 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2708 TRACE( "%p\n", group
);
2710 tp_group_shutdown( this );
2711 tp_group_release( this );
2714 /***********************************************************************
2715 * TpReleaseCleanupGroupMembers (NTDLL.@)
2717 VOID WINAPI
TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP
*group
, BOOL cancel_pending
, PVOID userdata
)
2719 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2720 struct threadpool_object
*object
, *next
;
2721 struct list members
;
2723 TRACE( "%p %u %p\n", group
, cancel_pending
, userdata
);
2725 RtlEnterCriticalSection( &this->cs
);
2727 /* Unset group, increase references, and mark objects for shutdown */
2728 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &this->members
, struct threadpool_object
, group_entry
)
2730 assert( object
->group
== this );
2731 assert( object
->is_group_member
);
2733 if (InterlockedIncrement( &object
->refcount
) == 1)
2735 /* Object is basically already destroyed, but group reference
2736 * was not deleted yet. We can safely ignore this object. */
2737 InterlockedDecrement( &object
->refcount
);
2738 list_remove( &object
->group_entry
);
2739 object
->is_group_member
= FALSE
;
2743 object
->is_group_member
= FALSE
;
2744 tp_object_prepare_shutdown( object
);
2747 /* Move members to a new temporary list */
2748 list_init( &members
);
2749 list_move_tail( &members
, &this->members
);
2751 RtlLeaveCriticalSection( &this->cs
);
2753 /* Cancel pending callbacks if requested */
2756 LIST_FOR_EACH_ENTRY( object
, &members
, struct threadpool_object
, group_entry
)
2758 tp_object_cancel( object
);
2762 /* Wait for remaining callbacks to finish */
2763 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &members
, struct threadpool_object
, group_entry
)
2765 tp_object_wait( object
, TRUE
);
2767 if (!object
->shutdown
)
2769 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2770 if (cancel_pending
&& object
->group_cancel_callback
)
2772 TRACE( "executing group cancel callback %p(%p, %p)\n",
2773 object
->group_cancel_callback
, object
->userdata
, userdata
);
2774 object
->group_cancel_callback( object
->userdata
, userdata
);
2775 TRACE( "callback %p returned\n", object
->group_cancel_callback
);
2778 if (object
->type
!= TP_OBJECT_TYPE_SIMPLE
)
2779 tp_object_release( object
);
2782 object
->shutdown
= TRUE
;
2783 tp_object_release( object
);
2787 /***********************************************************************
2788 * TpReleaseIoCompletion (NTDLL.@)
2790 void WINAPI
TpReleaseIoCompletion( TP_IO
*io
)
2792 struct threadpool_object
*this = impl_from_TP_IO( io
);
2794 TRACE( "%p\n", io
);
2796 RtlEnterCriticalSection( &ioqueue
.cs
);
2798 assert( ioqueue
.objcount
);
2799 this->shutdown
= TRUE
;
2800 NtSetIoCompletion( ioqueue
.port
, (ULONG_PTR
)this, 0, STATUS_THREADPOOL_RELEASED_DURING_OPERATION
, 1 );
2801 RtlLeaveCriticalSection( &ioqueue
.cs
);
2804 /***********************************************************************
2805 * TpReleasePool (NTDLL.@)
2807 VOID WINAPI
TpReleasePool( TP_POOL
*pool
)
2809 struct threadpool
*this = impl_from_TP_POOL( pool
);
2811 TRACE( "%p\n", pool
);
2813 tp_threadpool_shutdown( this );
2814 tp_threadpool_release( this );
2817 /***********************************************************************
2818 * TpReleaseTimer (NTDLL.@)
2820 VOID WINAPI
TpReleaseTimer( TP_TIMER
*timer
)
2822 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2824 TRACE( "%p\n", timer
);
2826 tp_object_prepare_shutdown( this );
2827 this->shutdown
= TRUE
;
2828 tp_object_release( this );
2831 /***********************************************************************
2832 * TpReleaseWait (NTDLL.@)
2834 VOID WINAPI
TpReleaseWait( TP_WAIT
*wait
)
2836 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2838 TRACE( "%p\n", wait
);
2840 tp_object_prepare_shutdown( this );
2841 this->shutdown
= TRUE
;
2842 tp_object_release( this );
2845 /***********************************************************************
2846 * TpReleaseWork (NTDLL.@)
2848 VOID WINAPI
TpReleaseWork( TP_WORK
*work
)
2850 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2852 TRACE( "%p\n", work
);
2854 tp_object_prepare_shutdown( this );
2855 this->shutdown
= TRUE
;
2856 tp_object_release( this );
2859 /***********************************************************************
2860 * TpSetPoolMaxThreads (NTDLL.@)
2862 VOID WINAPI
TpSetPoolMaxThreads( TP_POOL
*pool
, DWORD maximum
)
2864 struct threadpool
*this = impl_from_TP_POOL( pool
);
2866 TRACE( "%p %u\n", pool
, maximum
);
2868 RtlEnterCriticalSection( &this->cs
);
2869 this->max_workers
= max( maximum
, 1 );
2870 this->min_workers
= min( this->min_workers
, this->max_workers
);
2871 RtlLeaveCriticalSection( &this->cs
);
2874 /***********************************************************************
2875 * TpSetPoolMinThreads (NTDLL.@)
2877 BOOL WINAPI
TpSetPoolMinThreads( TP_POOL
*pool
, DWORD minimum
)
2879 struct threadpool
*this = impl_from_TP_POOL( pool
);
2880 NTSTATUS status
= STATUS_SUCCESS
;
2882 TRACE( "%p %u\n", pool
, minimum
);
2884 RtlEnterCriticalSection( &this->cs
);
2886 while (this->num_workers
< minimum
)
2888 status
= tp_new_worker_thread( this );
2889 if (status
!= STATUS_SUCCESS
)
2893 if (status
== STATUS_SUCCESS
)
2895 this->min_workers
= minimum
;
2896 this->max_workers
= max( this->min_workers
, this->max_workers
);
2899 RtlLeaveCriticalSection( &this->cs
);
2903 /***********************************************************************
2904 * TpSetTimer (NTDLL.@)
2906 VOID WINAPI
TpSetTimer( TP_TIMER
*timer
, LARGE_INTEGER
*timeout
, LONG period
, LONG window_length
)
2908 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2909 struct threadpool_object
*other_timer
;
2910 BOOL submit_timer
= FALSE
;
2911 ULONGLONG timestamp
;
2913 TRACE( "%p %p %u %u\n", timer
, timeout
, period
, window_length
);
2915 RtlEnterCriticalSection( &timerqueue
.cs
);
2917 assert( this->u
.timer
.timer_initialized
);
2918 this->u
.timer
.timer_set
= timeout
!= NULL
;
2920 /* Convert relative timeout to absolute timestamp and handle a timeout
2921 * of zero, which means that the timer is submitted immediately. */
2924 timestamp
= timeout
->QuadPart
;
2925 if ((LONGLONG
)timestamp
< 0)
2928 NtQuerySystemTime( &now
);
2929 timestamp
= now
.QuadPart
- timestamp
;
2931 else if (!timestamp
)
2938 NtQuerySystemTime( &now
);
2939 timestamp
= now
.QuadPart
+ (ULONGLONG
)period
* 10000;
2941 submit_timer
= TRUE
;
2945 /* First remove existing timeout. */
2946 if (this->u
.timer
.timer_pending
)
2948 list_remove( &this->u
.timer
.timer_entry
);
2949 this->u
.timer
.timer_pending
= FALSE
;
2952 /* If the timer was enabled, then add it back to the queue. */
2955 this->u
.timer
.timeout
= timestamp
;
2956 this->u
.timer
.period
= period
;
2957 this->u
.timer
.window_length
= window_length
;
2959 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
2960 struct threadpool_object
, u
.timer
.timer_entry
)
2962 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
2963 if (this->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
2966 list_add_before( &other_timer
->u
.timer
.timer_entry
, &this->u
.timer
.timer_entry
);
2968 /* Wake up the timer thread when the timeout has to be updated. */
2969 if (list_head( &timerqueue
.pending_timers
) == &this->u
.timer
.timer_entry
)
2970 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
2972 this->u
.timer
.timer_pending
= TRUE
;
2975 RtlLeaveCriticalSection( &timerqueue
.cs
);
2978 tp_object_submit( this, FALSE
);
2981 /***********************************************************************
2982 * TpSetWait (NTDLL.@)
2984 VOID WINAPI
TpSetWait( TP_WAIT
*wait
, HANDLE handle
, LARGE_INTEGER
*timeout
)
2986 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2987 ULONGLONG timestamp
= MAXLONGLONG
;
2989 TRACE( "%p %p %p\n", wait
, handle
, timeout
);
2991 RtlEnterCriticalSection( &waitqueue
.cs
);
2993 assert( this->u
.wait
.bucket
);
2994 this->u
.wait
.handle
= handle
;
2996 if (handle
|| this->u
.wait
.wait_pending
)
2998 struct waitqueue_bucket
*bucket
= this->u
.wait
.bucket
;
2999 list_remove( &this->u
.wait
.wait_entry
);
3001 /* Convert relative timeout to absolute timestamp. */
3002 if (handle
&& timeout
)
3004 timestamp
= timeout
->QuadPart
;
3005 if ((LONGLONG
)timestamp
< 0)
3008 NtQuerySystemTime( &now
);
3009 timestamp
= now
.QuadPart
- timestamp
;
3013 /* Add wait object back into one of the queues. */
3016 list_add_tail( &bucket
->waiting
, &this->u
.wait
.wait_entry
);
3017 this->u
.wait
.wait_pending
= TRUE
;
3018 this->u
.wait
.timeout
= timestamp
;
3022 list_add_tail( &bucket
->reserved
, &this->u
.wait
.wait_entry
);
3023 this->u
.wait
.wait_pending
= FALSE
;
3026 /* Wake up the wait queue thread. */
3027 NtSetEvent( bucket
->update_event
, NULL
);
3030 RtlLeaveCriticalSection( &waitqueue
.cs
);
3033 /***********************************************************************
3034 * TpSimpleTryPost (NTDLL.@)
3036 NTSTATUS WINAPI
TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback
, PVOID userdata
,
3037 TP_CALLBACK_ENVIRON
*environment
)
3039 struct threadpool_object
*object
;
3040 struct threadpool
*pool
;
3043 TRACE( "%p %p %p\n", callback
, userdata
, environment
);
3045 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
3047 return STATUS_NO_MEMORY
;
3049 status
= tp_threadpool_lock( &pool
, environment
);
3052 RtlFreeHeap( GetProcessHeap(), 0, object
);
3056 object
->type
= TP_OBJECT_TYPE_SIMPLE
;
3057 object
->u
.simple
.callback
= callback
;
3058 tp_object_initialize( object
, pool
, userdata
, environment
);
3060 return STATUS_SUCCESS
;
3063 /***********************************************************************
3064 * TpStartAsyncIoOperation (NTDLL.@)
3066 void WINAPI
TpStartAsyncIoOperation( TP_IO
*io
)
3068 struct threadpool_object
*this = impl_from_TP_IO( io
);
3070 TRACE( "%p\n", io
);
3072 RtlEnterCriticalSection( &this->pool
->cs
);
3074 this->u
.io
.pending_count
++;
3076 RtlLeaveCriticalSection( &this->pool
->cs
);
3079 /***********************************************************************
3080 * TpWaitForIoCompletion (NTDLL.@)
3082 void WINAPI
TpWaitForIoCompletion( TP_IO
*io
, BOOL cancel_pending
)
3084 struct threadpool_object
*this = impl_from_TP_IO( io
);
3086 TRACE( "%p %d\n", io
, cancel_pending
);
3089 tp_object_cancel( this );
3090 tp_object_wait( this, FALSE
);
3093 /***********************************************************************
3094 * TpWaitForTimer (NTDLL.@)
3096 VOID WINAPI
TpWaitForTimer( TP_TIMER
*timer
, BOOL cancel_pending
)
3098 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
3100 TRACE( "%p %d\n", timer
, cancel_pending
);
3103 tp_object_cancel( this );
3104 tp_object_wait( this, FALSE
);
3107 /***********************************************************************
3108 * TpWaitForWait (NTDLL.@)
3110 VOID WINAPI
TpWaitForWait( TP_WAIT
*wait
, BOOL cancel_pending
)
3112 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
3114 TRACE( "%p %d\n", wait
, cancel_pending
);
3117 tp_object_cancel( this );
3118 tp_object_wait( this, FALSE
);
3121 /***********************************************************************
3122 * TpWaitForWork (NTDLL.@)
3124 VOID WINAPI
TpWaitForWork( TP_WORK
*work
, BOOL cancel_pending
)
3126 struct threadpool_object
*this = impl_from_TP_WORK( work
);
3128 TRACE( "%p %u\n", work
, cancel_pending
);
3131 tp_object_cancel( this );
3132 tp_object_wait( this, FALSE
);
3135 /***********************************************************************
3136 * TpSetPoolStackInformation (NTDLL.@)
3138 NTSTATUS WINAPI
TpSetPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3140 struct threadpool
*this = impl_from_TP_POOL( pool
);
3142 TRACE( "%p %p\n", pool
, stack_info
);
3145 return STATUS_INVALID_PARAMETER
;
3147 RtlEnterCriticalSection( &this->cs
);
3148 this->stack_info
= *stack_info
;
3149 RtlLeaveCriticalSection( &this->cs
);
3151 return STATUS_SUCCESS
;
3154 /***********************************************************************
3155 * TpQueryPoolStackInformation (NTDLL.@)
3157 NTSTATUS WINAPI
TpQueryPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3159 struct threadpool
*this = impl_from_TP_POOL( pool
);
3161 TRACE( "%p %p\n", pool
, stack_info
);
3164 return STATUS_INVALID_PARAMETER
;
3166 RtlEnterCriticalSection( &this->cs
);
3167 *stack_info
= this->stack_info
;
3168 RtlLeaveCriticalSection( &this->cs
);
3170 return STATUS_SUCCESS
;
3173 static void CALLBACK
rtl_wait_callback( TP_CALLBACK_INSTANCE
*instance
, void *userdata
, TP_WAIT
*wait
, TP_WAIT_RESULT result
)
3175 struct threadpool_object
*object
= impl_from_TP_WAIT(wait
);
3176 object
->u
.wait
.rtl_callback( userdata
, result
!= STATUS_WAIT_0
);
3179 /***********************************************************************
3180 * RtlRegisterWait (NTDLL.@)
3182 * Registers a wait for a handle to become signaled.
3185 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3186 * Object [I] Object to wait to become signaled.
3187 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3188 * Context [I] Context to pass to the callback function when it is executed.
3189 * Milliseconds [I] Number of milliseconds to wait before timing out.
3190 * Flags [I] Flags. See notes.
3193 * Success: STATUS_SUCCESS.
3194 * Failure: Any NTSTATUS code.
3197 * Flags can be one or more of the following:
3198 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3199 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3200 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3201 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3202 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3204 NTSTATUS WINAPI
RtlRegisterWait( HANDLE
*out
, HANDLE handle
, RTL_WAITORTIMERCALLBACKFUNC callback
,
3205 void *context
, ULONG milliseconds
, ULONG flags
)
3207 struct threadpool_object
*object
;
3208 TP_CALLBACK_ENVIRON environment
;
3209 LARGE_INTEGER timeout
;
3213 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %u, flags %x\n",
3214 out
, handle
, callback
, context
, milliseconds
, flags
);
3216 memset( &environment
, 0, sizeof(environment
) );
3217 environment
.Version
= 1;
3218 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
3219 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
3221 flags
&= (WT_EXECUTEONLYONCE
| WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
);
3222 if ((status
= tp_alloc_wait( &wait
, rtl_wait_callback
, context
, &environment
, flags
)))
3225 object
= impl_from_TP_WAIT(wait
);
3226 object
->u
.wait
.rtl_callback
= callback
;
3228 RtlEnterCriticalSection( &waitqueue
.cs
);
3229 TpSetWait( (TP_WAIT
*)object
, handle
, get_nt_timeout( &timeout
, milliseconds
) );
3232 RtlLeaveCriticalSection( &waitqueue
.cs
);
3234 return STATUS_SUCCESS
;
3237 /***********************************************************************
3238 * RtlDeregisterWaitEx (NTDLL.@)
3240 * Cancels a wait operation and frees the resources associated with calling
3241 * RtlRegisterWait().
3244 * WaitObject [I] Handle to the wait object to free.
3247 * Success: STATUS_SUCCESS.
3248 * Failure: Any NTSTATUS code.
3250 NTSTATUS WINAPI
RtlDeregisterWaitEx( HANDLE handle
, HANDLE event
)
3252 struct threadpool_object
*object
= handle
;
3255 TRACE( "handle %p, event %p\n", handle
, event
);
3257 if (!object
) return STATUS_INVALID_HANDLE
;
3259 TpSetWait( (TP_WAIT
*)object
, NULL
, NULL
);
3261 if (event
== INVALID_HANDLE_VALUE
) TpWaitForWait( (TP_WAIT
*)object
, TRUE
);
3264 assert( object
->completed_event
== NULL
);
3265 object
->completed_event
= event
;
3268 RtlEnterCriticalSection( &object
->pool
->cs
);
3269 if (object
->num_pending_callbacks
+ object
->num_running_callbacks
3270 + object
->num_associated_callbacks
) status
= STATUS_PENDING
;
3271 else status
= STATUS_SUCCESS
;
3272 RtlLeaveCriticalSection( &object
->pool
->cs
);
3274 TpReleaseWait( (TP_WAIT
*)object
);
3278 /***********************************************************************
3279 * RtlDeregisterWait (NTDLL.@)
3281 * Cancels a wait operation and frees the resources associated with calling
3282 * RtlRegisterWait().
3285 * WaitObject [I] Handle to the wait object to free.
3288 * Success: STATUS_SUCCESS.
3289 * Failure: Any NTSTATUS code.
3291 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
3293 return RtlDeregisterWaitEx(WaitHandle
, NULL
);