4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #define NONAMELESSUNION
28 #define WIN32_NO_STATUS
31 #include "wine/debug.h"
32 #include "wine/list.h"
34 #include "ntdll_misc.h"
36 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
39 * Old thread pooling API
44 PRTL_WORK_ITEM_ROUTINE function
;
48 #define EXPIRE_NEVER (~(ULONGLONG)0)
49 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
51 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
;
56 RTL_CRITICAL_SECTION threadpool_compl_cs
;
60 NULL
, /* compl_port */
61 { &critsect_compl_debug
, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
64 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
66 0, 0, &old_threadpool
.threadpool_compl_cs
,
67 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
68 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
74 struct timer_queue
*q
;
76 ULONG runcount
; /* number of callbacks pending execution */
77 RTL_WAITORTIMERCALLBACKFUNC callback
;
82 BOOL destroy
; /* timer should be deleted; once set, never unset */
83 HANDLE event
; /* removal event */
89 RTL_CRITICAL_SECTION cs
;
90 struct list timers
; /* sorted by expiration time */
91 BOOL quit
; /* queue should be deleted; once set, never unset */
97 * Object-oriented thread pooling API
100 #define THREADPOOL_WORKER_TIMEOUT 5000
101 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
103 /* internal threadpool representation */
110 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
111 struct list pools
[3];
112 RTL_CONDITION_VARIABLE update_event
;
113 /* information about worker threads, locked via .cs */
117 int num_busy_workers
;
119 TP_POOL_STACK_INFORMATION stack_info
;
122 enum threadpool_objtype
124 TP_OBJECT_TYPE_SIMPLE
,
126 TP_OBJECT_TYPE_TIMER
,
133 IO_STATUS_BLOCK iosb
;
137 /* internal threadpool object representation */
138 struct threadpool_object
140 void *win32_callback
; /* leave space for kernelbase to store win32 callback */
143 /* read-only information */
144 enum threadpool_objtype type
;
145 struct threadpool
*pool
;
146 struct threadpool_group
*group
;
148 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
;
149 PTP_SIMPLE_CALLBACK finalization_callback
;
152 TP_CALLBACK_PRIORITY priority
;
153 /* information about the group, locked via .group->cs */
154 struct list group_entry
;
155 BOOL is_group_member
;
156 /* information about the pool, locked via .pool->cs */
157 struct list pool_entry
;
158 RTL_CONDITION_VARIABLE finished_event
;
159 RTL_CONDITION_VARIABLE group_finished_event
;
160 HANDLE completed_event
;
161 LONG num_pending_callbacks
;
162 LONG num_running_callbacks
;
163 LONG num_associated_callbacks
;
164 /* arguments for callback */
169 PTP_SIMPLE_CALLBACK callback
;
173 PTP_WORK_CALLBACK callback
;
177 PTP_TIMER_CALLBACK callback
;
178 /* information about the timer, locked via timerqueue.cs */
179 BOOL timer_initialized
;
181 struct list timer_entry
;
189 PTP_WAIT_CALLBACK callback
;
191 /* information about the wait object, locked via waitqueue.cs */
192 struct waitqueue_bucket
*bucket
;
194 struct list wait_entry
;
198 RTL_WAITORTIMERCALLBACKFUNC rtl_callback
;
202 PTP_IO_CALLBACK callback
;
203 /* locked via .pool->cs */
204 unsigned int pending_count
, skipped_count
, completion_count
, completion_max
;
206 struct io_completion
*completions
;
211 /* internal threadpool instance representation */
212 struct threadpool_instance
214 struct threadpool_object
*object
;
220 CRITICAL_SECTION
*critical_section
;
223 LONG semaphore_count
;
229 /* internal threadpool group representation */
230 struct threadpool_group
235 /* list of group members, locked via .cs */
239 /* global timerqueue object */
240 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
;
247 struct list pending_timers
;
248 RTL_CONDITION_VARIABLE update_event
;
252 { &timerqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
254 FALSE
, /* thread_running */
255 LIST_INIT( timerqueue
.pending_timers
), /* pending_timers */
256 RTL_CONDITION_VARIABLE_INIT
/* update_event */
259 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
=
261 0, 0, &timerqueue
.cs
,
262 { &timerqueue_debug
.ProcessLocksList
, &timerqueue_debug
.ProcessLocksList
},
263 0, 0, { (DWORD_PTR
)(__FILE__
": timerqueue.cs") }
266 /* global waitqueue object */
267 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
;
277 { &waitqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
279 LIST_INIT( waitqueue
.buckets
) /* buckets */
282 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
=
285 { &waitqueue_debug
.ProcessLocksList
, &waitqueue_debug
.ProcessLocksList
},
286 0, 0, { (DWORD_PTR
)(__FILE__
": waitqueue.cs") }
289 struct waitqueue_bucket
291 struct list bucket_entry
;
293 struct list reserved
;
299 /* global I/O completion queue object */
300 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
;
308 RTL_CONDITION_VARIABLE update_event
;
312 .cs
= { &ioqueue_debug
, -1, 0, 0, 0, 0 },
315 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
=
318 { &ioqueue_debug
.ProcessLocksList
, &ioqueue_debug
.ProcessLocksList
},
319 0, 0, { (DWORD_PTR
)(__FILE__
": ioqueue.cs") }
322 static inline struct threadpool
*impl_from_TP_POOL( TP_POOL
*pool
)
324 return (struct threadpool
*)pool
;
327 static inline struct threadpool_object
*impl_from_TP_WORK( TP_WORK
*work
)
329 struct threadpool_object
*object
= (struct threadpool_object
*)work
;
330 assert( object
->type
== TP_OBJECT_TYPE_WORK
);
334 static inline struct threadpool_object
*impl_from_TP_TIMER( TP_TIMER
*timer
)
336 struct threadpool_object
*object
= (struct threadpool_object
*)timer
;
337 assert( object
->type
== TP_OBJECT_TYPE_TIMER
);
341 static inline struct threadpool_object
*impl_from_TP_WAIT( TP_WAIT
*wait
)
343 struct threadpool_object
*object
= (struct threadpool_object
*)wait
;
344 assert( object
->type
== TP_OBJECT_TYPE_WAIT
);
348 static inline struct threadpool_object
*impl_from_TP_IO( TP_IO
*io
)
350 struct threadpool_object
*object
= (struct threadpool_object
*)io
;
351 assert( object
->type
== TP_OBJECT_TYPE_IO
);
355 static inline struct threadpool_group
*impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group
)
357 return (struct threadpool_group
*)group
;
360 static inline struct threadpool_instance
*impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE
*instance
)
362 return (struct threadpool_instance
*)instance
;
365 static void CALLBACK
threadpool_worker_proc( void *param
);
366 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
);
367 static void tp_object_execute( struct threadpool_object
*object
, BOOL wait_thread
);
368 static void tp_object_prepare_shutdown( struct threadpool_object
*object
);
369 static BOOL
tp_object_release( struct threadpool_object
*object
);
370 static struct threadpool
*default_threadpool
= NULL
;
372 static BOOL
array_reserve(void **elements
, unsigned int *capacity
, unsigned int count
, unsigned int size
)
374 unsigned int new_capacity
, max_capacity
;
377 if (count
<= *capacity
)
380 max_capacity
= ~(SIZE_T
)0 / size
;
381 if (count
> max_capacity
)
384 new_capacity
= max(4, *capacity
);
385 while (new_capacity
< count
&& new_capacity
<= max_capacity
/ 2)
387 if (new_capacity
< count
)
388 new_capacity
= max_capacity
;
390 if (!(new_elements
= RtlReAllocateHeap( GetProcessHeap(), 0, *elements
, new_capacity
* size
)))
393 *elements
= new_elements
;
394 *capacity
= new_capacity
;
399 static void CALLBACK
process_rtl_work_item( TP_CALLBACK_INSTANCE
*instance
, void *userdata
)
401 struct rtl_work_item
*item
= userdata
;
403 TRACE("executing %p(%p)\n", item
->function
, item
->context
);
404 item
->function( item
->context
);
406 RtlFreeHeap( GetProcessHeap(), 0, item
);
409 /***********************************************************************
410 * RtlQueueWorkItem (NTDLL.@)
412 * Queues a work item into a thread in the thread pool.
415 * function [I] Work function to execute.
416 * context [I] Context to pass to the work function when it is executed.
417 * flags [I] Flags. See notes.
420 * Success: STATUS_SUCCESS.
421 * Failure: Any NTSTATUS code.
424 * Flags can be one or more of the following:
425 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
426 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
427 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
428 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
429 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
431 NTSTATUS WINAPI
RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function
, PVOID context
, ULONG flags
)
433 TP_CALLBACK_ENVIRON environment
;
434 struct rtl_work_item
*item
;
437 TRACE( "%p %p %u\n", function
, context
, flags
);
439 item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item
) );
441 return STATUS_NO_MEMORY
;
443 memset( &environment
, 0, sizeof(environment
) );
444 environment
.Version
= 1;
445 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
446 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
448 item
->function
= function
;
449 item
->context
= context
;
451 status
= TpSimpleTryPost( process_rtl_work_item
, item
, &environment
);
452 if (status
) RtlFreeHeap( GetProcessHeap(), 0, item
);
456 /***********************************************************************
457 * iocp_poller - get completion events and run callbacks
459 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
465 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
467 IO_STATUS_BLOCK iosb
;
468 NTSTATUS res
= NtRemoveIoCompletion( cport
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
471 ERR("NtRemoveIoCompletion failed: 0x%x\n", res
);
475 DWORD transferred
= 0;
478 if (iosb
.u
.Status
== STATUS_SUCCESS
)
479 transferred
= iosb
.Information
;
481 err
= RtlNtStatusToDosError(iosb
.u
.Status
);
483 callback( err
, transferred
, overlapped
);
489 /***********************************************************************
490 * RtlSetIoCompletionCallback (NTDLL.@)
492 * Binds a handle to a thread pool's completion port, and possibly
493 * starts a non-I/O thread to monitor this port and call functions back.
496 * FileHandle [I] Handle to bind to a completion port.
497 * Function [I] Callback function to call on I/O completions.
498 * Flags [I] Not used.
501 * Success: STATUS_SUCCESS.
502 * Failure: Any NTSTATUS code.
505 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
507 IO_STATUS_BLOCK iosb
;
508 FILE_COMPLETION_INFORMATION info
;
510 if (Flags
) FIXME("Unknown value Flags=0x%x\n", Flags
);
512 if (!old_threadpool
.compl_port
)
514 NTSTATUS res
= STATUS_SUCCESS
;
516 RtlEnterCriticalSection(&old_threadpool
.threadpool_compl_cs
);
517 if (!old_threadpool
.compl_port
)
521 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
524 /* FIXME native can start additional threads in case of e.g. hung callback function. */
525 res
= RtlQueueWorkItem( iocp_poller
, cport
, WT_EXECUTEDEFAULT
);
527 old_threadpool
.compl_port
= cport
;
532 RtlLeaveCriticalSection(&old_threadpool
.threadpool_compl_cs
);
536 info
.CompletionPort
= old_threadpool
.compl_port
;
537 info
.CompletionKey
= (ULONG_PTR
)Function
;
539 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
542 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
544 if (timeout
== INFINITE
) return NULL
;
545 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
550 /************************** Timer Queue Impl **************************/
552 static void queue_remove_timer(struct queue_timer
*t
)
554 /* We MUST hold the queue cs while calling this function. This ensures
555 that we cannot queue another callback for this timer. The runcount
556 being zero makes sure we don't have any already queued. */
557 struct timer_queue
*q
= t
->q
;
559 assert(t
->runcount
== 0);
562 list_remove(&t
->entry
);
564 NtSetEvent(t
->event
, NULL
);
565 RtlFreeHeap(GetProcessHeap(), 0, t
);
567 if (q
->quit
&& list_empty(&q
->timers
))
568 NtSetEvent(q
->event
, NULL
);
571 static void timer_cleanup_callback(struct queue_timer
*t
)
573 struct timer_queue
*q
= t
->q
;
574 RtlEnterCriticalSection(&q
->cs
);
576 assert(0 < t
->runcount
);
579 if (t
->destroy
&& t
->runcount
== 0)
580 queue_remove_timer(t
);
582 RtlLeaveCriticalSection(&q
->cs
);
585 static DWORD WINAPI
timer_callback_wrapper(LPVOID p
)
587 struct queue_timer
*t
= p
;
588 t
->callback(t
->param
, TRUE
);
589 timer_cleanup_callback(t
);
593 static inline ULONGLONG
queue_current_time(void)
595 LARGE_INTEGER now
, freq
;
596 NtQueryPerformanceCounter(&now
, &freq
);
597 return now
.QuadPart
* 1000 / freq
.QuadPart
;
600 static void queue_add_timer(struct queue_timer
*t
, ULONGLONG time
,
603 /* We MUST hold the queue cs while calling this function. */
604 struct timer_queue
*q
= t
->q
;
605 struct list
*ptr
= &q
->timers
;
607 assert(!q
->quit
|| (t
->destroy
&& time
== EXPIRE_NEVER
));
609 if (time
!= EXPIRE_NEVER
)
610 LIST_FOR_EACH(ptr
, &q
->timers
)
612 struct queue_timer
*cur
= LIST_ENTRY(ptr
, struct queue_timer
, entry
);
613 if (time
< cur
->expire
)
616 list_add_before(ptr
, &t
->entry
);
620 /* If we insert at the head of the list, we need to expire sooner
622 if (set_event
&& &t
->entry
== list_head(&q
->timers
))
623 NtSetEvent(q
->event
, NULL
);
626 static inline void queue_move_timer(struct queue_timer
*t
, ULONGLONG time
,
629 /* We MUST hold the queue cs while calling this function. */
630 list_remove(&t
->entry
);
631 queue_add_timer(t
, time
, set_event
);
634 static void queue_timer_expire(struct timer_queue
*q
)
636 struct queue_timer
*t
= NULL
;
638 RtlEnterCriticalSection(&q
->cs
);
639 if (list_head(&q
->timers
))
642 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
643 if (!t
->destroy
&& t
->expire
<= ((now
= queue_current_time())))
648 next
= t
->expire
+ t
->period
;
649 /* avoid trigger cascade if overloaded / hibernated */
651 next
= now
+ t
->period
;
655 queue_move_timer(t
, next
, FALSE
);
660 RtlLeaveCriticalSection(&q
->cs
);
664 if (t
->flags
& WT_EXECUTEINTIMERTHREAD
)
665 timer_callback_wrapper(t
);
670 & (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
671 | WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
));
672 NTSTATUS status
= RtlQueueWorkItem(timer_callback_wrapper
, t
, flags
);
673 if (status
!= STATUS_SUCCESS
)
674 timer_cleanup_callback(t
);
679 static ULONG
queue_get_timeout(struct timer_queue
*q
)
681 struct queue_timer
*t
;
682 ULONG timeout
= INFINITE
;
684 RtlEnterCriticalSection(&q
->cs
);
685 if (list_head(&q
->timers
))
687 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
688 assert(!t
->destroy
|| t
->expire
== EXPIRE_NEVER
);
690 if (t
->expire
!= EXPIRE_NEVER
)
692 ULONGLONG time
= queue_current_time();
693 timeout
= t
->expire
< time
? 0 : t
->expire
- time
;
696 RtlLeaveCriticalSection(&q
->cs
);
701 static void WINAPI
timer_queue_thread_proc(LPVOID p
)
703 struct timer_queue
*q
= p
;
706 timeout_ms
= INFINITE
;
709 LARGE_INTEGER timeout
;
713 status
= NtWaitForSingleObject(
714 q
->event
, FALSE
, get_nt_timeout(&timeout
, timeout_ms
));
716 if (status
== STATUS_WAIT_0
)
718 /* There are two possible ways to trigger the event. Either
719 we are quitting and the last timer got removed, or a new
720 timer got put at the head of the list so we need to adjust
722 RtlEnterCriticalSection(&q
->cs
);
723 if (q
->quit
&& list_empty(&q
->timers
))
725 RtlLeaveCriticalSection(&q
->cs
);
727 else if (status
== STATUS_TIMEOUT
)
728 queue_timer_expire(q
);
733 timeout_ms
= queue_get_timeout(q
);
737 RtlDeleteCriticalSection(&q
->cs
);
739 RtlFreeHeap(GetProcessHeap(), 0, q
);
740 RtlExitUserThread( 0 );
743 static void queue_destroy_timer(struct queue_timer
*t
)
745 /* We MUST hold the queue cs while calling this function. */
747 if (t
->runcount
== 0)
748 /* Ensure a timer is promptly removed. If callbacks are pending,
749 it will be removed after the last one finishes by the callback
751 queue_remove_timer(t
);
753 /* Make sure no destroyed timer masks an active timer at the head
754 of the sorted list. */
755 queue_move_timer(t
, EXPIRE_NEVER
, FALSE
);
758 /***********************************************************************
759 * RtlCreateTimerQueue (NTDLL.@)
761 * Creates a timer queue object and returns a handle to it.
764 * NewTimerQueue [O] The newly created queue.
767 * Success: STATUS_SUCCESS.
768 * Failure: Any NTSTATUS code.
770 NTSTATUS WINAPI
RtlCreateTimerQueue(PHANDLE NewTimerQueue
)
773 struct timer_queue
*q
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q
);
775 return STATUS_NO_MEMORY
;
777 RtlInitializeCriticalSection(&q
->cs
);
778 list_init(&q
->timers
);
780 q
->magic
= TIMER_QUEUE_MAGIC
;
781 status
= NtCreateEvent(&q
->event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
782 if (status
!= STATUS_SUCCESS
)
784 RtlFreeHeap(GetProcessHeap(), 0, q
);
787 status
= RtlCreateUserThread(GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
788 timer_queue_thread_proc
, q
, &q
->thread
, NULL
);
789 if (status
!= STATUS_SUCCESS
)
792 RtlFreeHeap(GetProcessHeap(), 0, q
);
797 return STATUS_SUCCESS
;
800 /***********************************************************************
801 * RtlDeleteTimerQueueEx (NTDLL.@)
803 * Deletes a timer queue object.
806 * TimerQueue [I] The timer queue to destroy.
807 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
808 * wait until all timers are finished firing before
809 * returning. Otherwise, return immediately and set the
810 * event when all timers are done.
813 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
814 * Failure: Any NTSTATUS code.
816 NTSTATUS WINAPI
RtlDeleteTimerQueueEx(HANDLE TimerQueue
, HANDLE CompletionEvent
)
818 struct timer_queue
*q
= TimerQueue
;
819 struct queue_timer
*t
, *temp
;
823 if (!q
|| q
->magic
!= TIMER_QUEUE_MAGIC
)
824 return STATUS_INVALID_HANDLE
;
828 RtlEnterCriticalSection(&q
->cs
);
830 if (list_head(&q
->timers
))
831 /* When the last timer is removed, it will signal the timer thread to
833 LIST_FOR_EACH_ENTRY_SAFE(t
, temp
, &q
->timers
, struct queue_timer
, entry
)
834 queue_destroy_timer(t
);
836 /* However if we have none, we must do it ourselves. */
837 NtSetEvent(q
->event
, NULL
);
838 RtlLeaveCriticalSection(&q
->cs
);
840 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
842 NtWaitForSingleObject(thread
, FALSE
, NULL
);
843 status
= STATUS_SUCCESS
;
849 FIXME("asynchronous return on completion event unimplemented\n");
850 NtWaitForSingleObject(thread
, FALSE
, NULL
);
851 NtSetEvent(CompletionEvent
, NULL
);
853 status
= STATUS_PENDING
;
860 static struct timer_queue
*get_timer_queue(HANDLE TimerQueue
)
862 static struct timer_queue
*default_timer_queue
;
868 if (!default_timer_queue
)
871 NTSTATUS status
= RtlCreateTimerQueue(&q
);
872 if (status
== STATUS_SUCCESS
)
874 PVOID p
= InterlockedCompareExchangePointer( (void **) &default_timer_queue
, q
, NULL
);
876 /* Got beat to the punch. */
877 RtlDeleteTimerQueueEx(q
, NULL
);
880 return default_timer_queue
;
884 /***********************************************************************
885 * RtlCreateTimer (NTDLL.@)
887 * Creates a new timer associated with the given queue.
890 * NewTimer [O] The newly created timer.
891 * TimerQueue [I] The queue to hold the timer.
892 * Callback [I] The callback to fire.
893 * Parameter [I] The argument for the callback.
894 * DueTime [I] The delay, in milliseconds, before first firing the
896 * Period [I] The period, in milliseconds, at which to fire the timer
897 * after the first callback. If zero, the timer will only
898 * fire once. It still needs to be deleted with
900 * Flags [I] Flags controlling the execution of the callback. In
901 * addition to the WT_* thread pool flags (see
902 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
903 * WT_EXECUTEONLYONCE are supported.
906 * Success: STATUS_SUCCESS.
907 * Failure: Any NTSTATUS code.
909 NTSTATUS WINAPI
RtlCreateTimer(PHANDLE NewTimer
, HANDLE TimerQueue
,
910 RTL_WAITORTIMERCALLBACKFUNC Callback
,
911 PVOID Parameter
, DWORD DueTime
, DWORD Period
,
915 struct queue_timer
*t
;
916 struct timer_queue
*q
= get_timer_queue(TimerQueue
);
918 if (!q
) return STATUS_NO_MEMORY
;
919 if (q
->magic
!= TIMER_QUEUE_MAGIC
) return STATUS_INVALID_HANDLE
;
921 t
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t
);
923 return STATUS_NO_MEMORY
;
927 t
->callback
= Callback
;
928 t
->param
= Parameter
;
934 status
= STATUS_SUCCESS
;
935 RtlEnterCriticalSection(&q
->cs
);
937 status
= STATUS_INVALID_HANDLE
;
939 queue_add_timer(t
, queue_current_time() + DueTime
, TRUE
);
940 RtlLeaveCriticalSection(&q
->cs
);
942 if (status
== STATUS_SUCCESS
)
945 RtlFreeHeap(GetProcessHeap(), 0, t
);
950 /***********************************************************************
951 * RtlUpdateTimer (NTDLL.@)
953 * Changes the time at which a timer expires.
956 * TimerQueue [I] The queue that holds the timer.
957 * Timer [I] The timer to update.
958 * DueTime [I] The delay, in milliseconds, before next firing the timer.
959 * Period [I] The period, in milliseconds, at which to fire the timer
960 * after the first callback. If zero, the timer will not
961 * refire once. It still needs to be deleted with
965 * Success: STATUS_SUCCESS.
966 * Failure: Any NTSTATUS code.
968 NTSTATUS WINAPI
RtlUpdateTimer(HANDLE TimerQueue
, HANDLE Timer
,
969 DWORD DueTime
, DWORD Period
)
971 struct queue_timer
*t
= Timer
;
972 struct timer_queue
*q
= t
->q
;
974 RtlEnterCriticalSection(&q
->cs
);
975 /* Can't change a timer if it was once-only or destroyed. */
976 if (t
->expire
!= EXPIRE_NEVER
)
979 queue_move_timer(t
, queue_current_time() + DueTime
, TRUE
);
981 RtlLeaveCriticalSection(&q
->cs
);
983 return STATUS_SUCCESS
;
986 /***********************************************************************
987 * RtlDeleteTimer (NTDLL.@)
989 * Cancels a timer-queue timer.
992 * TimerQueue [I] The queue that holds the timer.
993 * Timer [I] The timer to update.
994 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
995 * wait until the timer is finished firing all pending
996 * callbacks before returning. Otherwise, return
997 * immediately and set the timer is done.
1000 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1001 or if the completion event is NULL.
1002 * Failure: Any NTSTATUS code.
1004 NTSTATUS WINAPI
RtlDeleteTimer(HANDLE TimerQueue
, HANDLE Timer
,
1005 HANDLE CompletionEvent
)
1007 struct queue_timer
*t
= Timer
;
1008 struct timer_queue
*q
;
1009 NTSTATUS status
= STATUS_PENDING
;
1010 HANDLE event
= NULL
;
1013 return STATUS_INVALID_PARAMETER_1
;
1015 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1017 status
= NtCreateEvent(&event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1018 if (status
== STATUS_SUCCESS
)
1019 status
= STATUS_PENDING
;
1021 else if (CompletionEvent
)
1022 event
= CompletionEvent
;
1024 RtlEnterCriticalSection(&q
->cs
);
1026 if (t
->runcount
== 0 && event
)
1027 status
= STATUS_SUCCESS
;
1028 queue_destroy_timer(t
);
1029 RtlLeaveCriticalSection(&q
->cs
);
1031 if (CompletionEvent
== INVALID_HANDLE_VALUE
&& event
)
1033 if (status
== STATUS_PENDING
)
1035 NtWaitForSingleObject(event
, FALSE
, NULL
);
1036 status
= STATUS_SUCCESS
;
1044 /***********************************************************************
1045 * timerqueue_thread_proc (internal)
1047 static void CALLBACK
timerqueue_thread_proc( void *param
)
1049 ULONGLONG timeout_lower
, timeout_upper
, new_timeout
;
1050 struct threadpool_object
*other_timer
;
1051 LARGE_INTEGER now
, timeout
;
1054 TRACE( "starting timer queue thread\n" );
1056 RtlEnterCriticalSection( &timerqueue
.cs
);
1059 NtQuerySystemTime( &now
);
1061 /* Check for expired timers. */
1062 while ((ptr
= list_head( &timerqueue
.pending_timers
)))
1064 struct threadpool_object
*timer
= LIST_ENTRY( ptr
, struct threadpool_object
, u
.timer
.timer_entry
);
1065 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1066 assert( timer
->u
.timer
.timer_pending
);
1067 if (timer
->u
.timer
.timeout
> now
.QuadPart
)
1070 /* Queue a new callback in one of the worker threads. */
1071 list_remove( &timer
->u
.timer
.timer_entry
);
1072 timer
->u
.timer
.timer_pending
= FALSE
;
1073 tp_object_submit( timer
, FALSE
);
1075 /* Insert the timer back into the queue, except it's marked for shutdown. */
1076 if (timer
->u
.timer
.period
&& !timer
->shutdown
)
1078 timer
->u
.timer
.timeout
+= (ULONGLONG
)timer
->u
.timer
.period
* 10000;
1079 if (timer
->u
.timer
.timeout
<= now
.QuadPart
)
1080 timer
->u
.timer
.timeout
= now
.QuadPart
+ 1;
1082 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1083 struct threadpool_object
, u
.timer
.timer_entry
)
1085 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1086 if (timer
->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
1089 list_add_before( &other_timer
->u
.timer
.timer_entry
, &timer
->u
.timer
.timer_entry
);
1090 timer
->u
.timer
.timer_pending
= TRUE
;
1094 timeout_lower
= timeout_upper
= MAXLONGLONG
;
1096 /* Determine next timeout and use the window length to optimize wakeup times. */
1097 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1098 struct threadpool_object
, u
.timer
.timer_entry
)
1100 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1101 if (other_timer
->u
.timer
.timeout
>= timeout_upper
)
1104 timeout_lower
= other_timer
->u
.timer
.timeout
;
1105 new_timeout
= timeout_lower
+ (ULONGLONG
)other_timer
->u
.timer
.window_length
* 10000;
1106 if (new_timeout
< timeout_upper
)
1107 timeout_upper
= new_timeout
;
1110 /* Wait for timer update events or until the next timer expires. */
1111 if (timerqueue
.objcount
)
1113 timeout
.QuadPart
= timeout_lower
;
1114 RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
, &timeout
);
1118 /* All timers have been destroyed, if no new timers are created
1119 * within some amount of time, then we can shutdown this thread. */
1120 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1121 if (RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
,
1122 &timeout
) == STATUS_TIMEOUT
&& !timerqueue
.objcount
)
1128 timerqueue
.thread_running
= FALSE
;
1129 RtlLeaveCriticalSection( &timerqueue
.cs
);
1131 TRACE( "terminating timer queue thread\n" );
1132 RtlExitUserThread( 0 );
1135 /***********************************************************************
1136 * tp_new_worker_thread (internal)
1138 * Create and account a new worker thread for the desired pool.
1140 static NTSTATUS
tp_new_worker_thread( struct threadpool
*pool
)
1145 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
1146 threadpool_worker_proc
, pool
, &thread
, NULL
);
1147 if (status
== STATUS_SUCCESS
)
1149 InterlockedIncrement( &pool
->refcount
);
1150 pool
->num_workers
++;
1156 /***********************************************************************
1157 * tp_timerqueue_lock (internal)
1159 * Acquires a lock on the global timerqueue. When the lock is acquired
1160 * successfully, it is guaranteed that the timer thread is running.
1162 static NTSTATUS
tp_timerqueue_lock( struct threadpool_object
*timer
)
1164 NTSTATUS status
= STATUS_SUCCESS
;
1165 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1167 timer
->u
.timer
.timer_initialized
= FALSE
;
1168 timer
->u
.timer
.timer_pending
= FALSE
;
1169 timer
->u
.timer
.timer_set
= FALSE
;
1170 timer
->u
.timer
.timeout
= 0;
1171 timer
->u
.timer
.period
= 0;
1172 timer
->u
.timer
.window_length
= 0;
1174 RtlEnterCriticalSection( &timerqueue
.cs
);
1176 /* Make sure that the timerqueue thread is running. */
1177 if (!timerqueue
.thread_running
)
1180 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
1181 timerqueue_thread_proc
, NULL
, &thread
, NULL
);
1182 if (status
== STATUS_SUCCESS
)
1184 timerqueue
.thread_running
= TRUE
;
1189 if (status
== STATUS_SUCCESS
)
1191 timer
->u
.timer
.timer_initialized
= TRUE
;
1192 timerqueue
.objcount
++;
1195 RtlLeaveCriticalSection( &timerqueue
.cs
);
1199 /***********************************************************************
1200 * tp_timerqueue_unlock (internal)
1202 * Releases a lock on the global timerqueue.
1204 static void tp_timerqueue_unlock( struct threadpool_object
*timer
)
1206 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1208 RtlEnterCriticalSection( &timerqueue
.cs
);
1209 if (timer
->u
.timer
.timer_initialized
)
1211 /* If timer was pending, remove it. */
1212 if (timer
->u
.timer
.timer_pending
)
1214 list_remove( &timer
->u
.timer
.timer_entry
);
1215 timer
->u
.timer
.timer_pending
= FALSE
;
1218 /* If the last timer object was destroyed, then wake up the thread. */
1219 if (!--timerqueue
.objcount
)
1221 assert( list_empty( &timerqueue
.pending_timers
) );
1222 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
1225 timer
->u
.timer
.timer_initialized
= FALSE
;
1227 RtlLeaveCriticalSection( &timerqueue
.cs
);
1230 /***********************************************************************
1231 * waitqueue_thread_proc (internal)
1233 static void CALLBACK
waitqueue_thread_proc( void *param
)
1235 struct threadpool_object
*objects
[MAXIMUM_WAITQUEUE_OBJECTS
];
1236 HANDLE handles
[MAXIMUM_WAITQUEUE_OBJECTS
+ 1];
1237 struct waitqueue_bucket
*bucket
= param
;
1238 struct threadpool_object
*wait
, *next
;
1239 LARGE_INTEGER now
, timeout
;
1243 TRACE( "starting wait queue thread\n" );
1245 RtlEnterCriticalSection( &waitqueue
.cs
);
1249 NtQuerySystemTime( &now
);
1250 timeout
.QuadPart
= MAXLONGLONG
;
1253 LIST_FOR_EACH_ENTRY_SAFE( wait
, next
, &bucket
->waiting
, struct threadpool_object
,
1256 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1257 if (wait
->u
.wait
.timeout
<= now
.QuadPart
)
1259 /* Wait object timed out. */
1260 if ((wait
->u
.wait
.flags
& WT_EXECUTEONLYONCE
))
1262 list_remove( &wait
->u
.wait
.wait_entry
);
1263 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1265 if ((wait
->u
.wait
.flags
& (WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
)))
1267 InterlockedIncrement( &wait
->refcount
);
1268 wait
->num_pending_callbacks
++;
1269 RtlEnterCriticalSection( &wait
->pool
->cs
);
1270 tp_object_execute( wait
, TRUE
);
1271 RtlLeaveCriticalSection( &wait
->pool
->cs
);
1272 tp_object_release( wait
);
1274 else tp_object_submit( wait
, FALSE
);
1278 if (wait
->u
.wait
.timeout
< timeout
.QuadPart
)
1279 timeout
.QuadPart
= wait
->u
.wait
.timeout
;
1281 assert( num_handles
< MAXIMUM_WAITQUEUE_OBJECTS
);
1282 InterlockedIncrement( &wait
->refcount
);
1283 objects
[num_handles
] = wait
;
1284 handles
[num_handles
] = wait
->u
.wait
.handle
;
1289 if (!bucket
->objcount
)
1291 /* All wait objects have been destroyed, if no new wait objects are created
1292 * within some amount of time, then we can shutdown this thread. */
1293 assert( num_handles
== 0 );
1294 RtlLeaveCriticalSection( &waitqueue
.cs
);
1295 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1296 status
= NtWaitForMultipleObjects( 1, &bucket
->update_event
, TRUE
, bucket
->alertable
, &timeout
);
1297 RtlEnterCriticalSection( &waitqueue
.cs
);
1299 if (status
== STATUS_TIMEOUT
&& !bucket
->objcount
)
1304 handles
[num_handles
] = bucket
->update_event
;
1305 RtlLeaveCriticalSection( &waitqueue
.cs
);
1306 status
= NtWaitForMultipleObjects( num_handles
+ 1, handles
, TRUE
, bucket
->alertable
, &timeout
);
1307 RtlEnterCriticalSection( &waitqueue
.cs
);
1309 if (status
>= STATUS_WAIT_0
&& status
< STATUS_WAIT_0
+ num_handles
)
1311 wait
= objects
[status
- STATUS_WAIT_0
];
1312 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1313 if (wait
->u
.wait
.bucket
)
1315 /* Wait object signaled. */
1316 assert( wait
->u
.wait
.bucket
== bucket
);
1317 if ((wait
->u
.wait
.flags
& WT_EXECUTEONLYONCE
))
1319 list_remove( &wait
->u
.wait
.wait_entry
);
1320 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1322 if ((wait
->u
.wait
.flags
& (WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
)))
1324 wait
->u
.wait
.signaled
++;
1325 wait
->num_pending_callbacks
++;
1326 RtlEnterCriticalSection( &wait
->pool
->cs
);
1327 tp_object_execute( wait
, TRUE
);
1328 RtlLeaveCriticalSection( &wait
->pool
->cs
);
1330 else tp_object_submit( wait
, TRUE
);
1333 WARN("wait object %p triggered while object was destroyed\n", wait
);
1336 /* Release temporary references to wait objects. */
1339 wait
= objects
[--num_handles
];
1340 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1341 tp_object_release( wait
);
1345 /* Try to merge bucket with other threads. */
1346 if (waitqueue
.num_buckets
> 1 && bucket
->objcount
&&
1347 bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 1 / 3)
1349 struct waitqueue_bucket
*other_bucket
;
1350 LIST_FOR_EACH_ENTRY( other_bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1352 if (other_bucket
!= bucket
&& other_bucket
->objcount
&& other_bucket
->alertable
== bucket
->alertable
&&
1353 other_bucket
->objcount
+ bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 2 / 3)
1355 other_bucket
->objcount
+= bucket
->objcount
;
1356 bucket
->objcount
= 0;
1358 /* Update reserved list. */
1359 LIST_FOR_EACH_ENTRY( wait
, &bucket
->reserved
, struct threadpool_object
, u
.wait
.wait_entry
)
1361 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1362 wait
->u
.wait
.bucket
= other_bucket
;
1364 list_move_tail( &other_bucket
->reserved
, &bucket
->reserved
);
1366 /* Update waiting list. */
1367 LIST_FOR_EACH_ENTRY( wait
, &bucket
->waiting
, struct threadpool_object
, u
.wait
.wait_entry
)
1369 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1370 wait
->u
.wait
.bucket
= other_bucket
;
1372 list_move_tail( &other_bucket
->waiting
, &bucket
->waiting
);
1374 /* Move bucket to the end, to keep the probability of
1375 * newly added wait objects as small as possible. */
1376 list_remove( &bucket
->bucket_entry
);
1377 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1379 NtSetEvent( other_bucket
->update_event
, NULL
);
1386 /* Remove this bucket from the list. */
1387 list_remove( &bucket
->bucket_entry
);
1388 if (!--waitqueue
.num_buckets
)
1389 assert( list_empty( &waitqueue
.buckets
) );
1391 RtlLeaveCriticalSection( &waitqueue
.cs
);
1393 TRACE( "terminating wait queue thread\n" );
1395 assert( bucket
->objcount
== 0 );
1396 assert( list_empty( &bucket
->reserved
) );
1397 assert( list_empty( &bucket
->waiting
) );
1398 NtClose( bucket
->update_event
);
1400 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1401 RtlExitUserThread( 0 );
1404 /***********************************************************************
1405 * tp_waitqueue_lock (internal)
1407 static NTSTATUS
tp_waitqueue_lock( struct threadpool_object
*wait
)
1409 struct waitqueue_bucket
*bucket
;
1412 BOOL alertable
= (wait
->u
.wait
.flags
& WT_EXECUTEINIOTHREAD
) != 0;
1413 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1415 wait
->u
.wait
.signaled
= 0;
1416 wait
->u
.wait
.bucket
= NULL
;
1417 wait
->u
.wait
.wait_pending
= FALSE
;
1418 wait
->u
.wait
.timeout
= 0;
1419 wait
->u
.wait
.handle
= INVALID_HANDLE_VALUE
;
1421 RtlEnterCriticalSection( &waitqueue
.cs
);
1423 /* Try to assign to existing bucket if possible. */
1424 LIST_FOR_EACH_ENTRY( bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1426 if (bucket
->objcount
< MAXIMUM_WAITQUEUE_OBJECTS
&& bucket
->alertable
== alertable
)
1428 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1429 wait
->u
.wait
.bucket
= bucket
;
1432 status
= STATUS_SUCCESS
;
1437 /* Create a new bucket and corresponding worker thread. */
1438 bucket
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket
) );
1441 status
= STATUS_NO_MEMORY
;
1445 bucket
->objcount
= 0;
1446 bucket
->alertable
= alertable
;
1447 list_init( &bucket
->reserved
);
1448 list_init( &bucket
->waiting
);
1450 status
= NtCreateEvent( &bucket
->update_event
, EVENT_ALL_ACCESS
,
1451 NULL
, SynchronizationEvent
, FALSE
);
1454 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1458 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
1459 waitqueue_thread_proc
, bucket
, &thread
, NULL
);
1460 if (status
== STATUS_SUCCESS
)
1462 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1463 waitqueue
.num_buckets
++;
1465 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1466 wait
->u
.wait
.bucket
= bucket
;
1473 NtClose( bucket
->update_event
);
1474 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1478 RtlLeaveCriticalSection( &waitqueue
.cs
);
1482 /***********************************************************************
1483 * tp_waitqueue_unlock (internal)
1485 static void tp_waitqueue_unlock( struct threadpool_object
*wait
)
1487 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1489 RtlEnterCriticalSection( &waitqueue
.cs
);
1490 if (wait
->u
.wait
.bucket
)
1492 struct waitqueue_bucket
*bucket
= wait
->u
.wait
.bucket
;
1493 assert( bucket
->objcount
> 0 );
1495 list_remove( &wait
->u
.wait
.wait_entry
);
1496 wait
->u
.wait
.bucket
= NULL
;
1499 NtSetEvent( bucket
->update_event
, NULL
);
1501 RtlLeaveCriticalSection( &waitqueue
.cs
);
1504 static void CALLBACK
ioqueue_thread_proc( void *param
)
1506 struct io_completion
*completion
;
1507 struct threadpool_object
*io
;
1508 IO_STATUS_BLOCK iosb
;
1509 ULONG_PTR key
, value
;
1513 TRACE( "starting I/O completion thread\n" );
1515 RtlEnterCriticalSection( &ioqueue
.cs
);
1519 RtlLeaveCriticalSection( &ioqueue
.cs
);
1520 if ((status
= NtRemoveIoCompletion( ioqueue
.port
, &key
, &value
, &iosb
, NULL
)))
1521 ERR("NtRemoveIoCompletion failed, status %#x.\n", status
);
1522 RtlEnterCriticalSection( &ioqueue
.cs
);
1524 destroy
= skip
= FALSE
;
1525 io
= (struct threadpool_object
*)key
;
1527 TRACE( "io %p, iosb.Status %#x.\n", io
, iosb
.u
.Status
);
1529 if (io
&& (io
->shutdown
|| io
->u
.io
.shutting_down
))
1531 RtlEnterCriticalSection( &io
->pool
->cs
);
1532 if (!io
->u
.io
.pending_count
)
1534 if (io
->u
.io
.skipped_count
)
1535 --io
->u
.io
.skipped_count
;
1537 if (io
->u
.io
.skipped_count
)
1542 RtlLeaveCriticalSection( &io
->pool
->cs
);
1549 TRACE( "Releasing io %p.\n", io
);
1550 io
->shutdown
= TRUE
;
1551 tp_object_release( io
);
1555 RtlEnterCriticalSection( &io
->pool
->cs
);
1557 TRACE( "pending_count %u.\n", io
->u
.io
.pending_count
);
1559 if (io
->u
.io
.pending_count
)
1561 --io
->u
.io
.pending_count
;
1562 if (!array_reserve((void **)&io
->u
.io
.completions
, &io
->u
.io
.completion_max
,
1563 io
->u
.io
.completion_count
+ 1, sizeof(*io
->u
.io
.completions
)))
1565 ERR( "Failed to allocate memory.\n" );
1566 RtlLeaveCriticalSection( &io
->pool
->cs
);
1570 completion
= &io
->u
.io
.completions
[io
->u
.io
.completion_count
++];
1571 completion
->iosb
= iosb
;
1572 completion
->cvalue
= value
;
1574 tp_object_submit( io
, FALSE
);
1576 RtlLeaveCriticalSection( &io
->pool
->cs
);
1579 if (!ioqueue
.objcount
)
1581 /* All I/O objects have been destroyed; if no new objects are
1582 * created within some amount of time, then we can shutdown this
1584 LARGE_INTEGER timeout
= {.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000};
1585 if (RtlSleepConditionVariableCS( &ioqueue
.update_event
, &ioqueue
.cs
,
1586 &timeout
) == STATUS_TIMEOUT
&& !ioqueue
.objcount
)
1591 ioqueue
.thread_running
= FALSE
;
1592 RtlLeaveCriticalSection( &ioqueue
.cs
);
1594 TRACE( "terminating I/O completion thread\n" );
1596 RtlExitUserThread( 0 );
1599 static NTSTATUS
tp_ioqueue_lock( struct threadpool_object
*io
, HANDLE file
)
1601 NTSTATUS status
= STATUS_SUCCESS
;
1603 assert( io
->type
== TP_OBJECT_TYPE_IO
);
1605 RtlEnterCriticalSection( &ioqueue
.cs
);
1607 if (!ioqueue
.port
&& (status
= NtCreateIoCompletion( &ioqueue
.port
,
1608 IO_COMPLETION_ALL_ACCESS
, NULL
, 0 )))
1610 RtlLeaveCriticalSection( &ioqueue
.cs
);
1614 if (!ioqueue
.thread_running
)
1618 if (!(status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
,
1619 0, 0, 0, ioqueue_thread_proc
, NULL
, &thread
, NULL
)))
1621 ioqueue
.thread_running
= TRUE
;
1626 if (status
== STATUS_SUCCESS
)
1628 FILE_COMPLETION_INFORMATION info
;
1629 IO_STATUS_BLOCK iosb
;
1631 info
.CompletionPort
= ioqueue
.port
;
1632 info
.CompletionKey
= (ULONG_PTR
)io
;
1634 status
= NtSetInformationFile( file
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
1637 if (status
== STATUS_SUCCESS
)
1639 if (!ioqueue
.objcount
++)
1640 RtlWakeConditionVariable( &ioqueue
.update_event
);
1643 RtlLeaveCriticalSection( &ioqueue
.cs
);
1647 /***********************************************************************
1648 * tp_threadpool_alloc (internal)
1650 * Allocates a new threadpool object.
1652 static NTSTATUS
tp_threadpool_alloc( struct threadpool
**out
)
1654 IMAGE_NT_HEADERS
*nt
= RtlImageNtHeader( NtCurrentTeb()->Peb
->ImageBaseAddress
);
1655 struct threadpool
*pool
;
1658 pool
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool
) );
1660 return STATUS_NO_MEMORY
;
1664 pool
->shutdown
= FALSE
;
1666 RtlInitializeCriticalSection( &pool
->cs
);
1667 pool
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool.cs");
1669 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1670 list_init( &pool
->pools
[i
] );
1671 RtlInitializeConditionVariable( &pool
->update_event
);
1673 pool
->max_workers
= 500;
1674 pool
->min_workers
= 0;
1675 pool
->num_workers
= 0;
1676 pool
->num_busy_workers
= 0;
1677 pool
->stack_info
.StackReserve
= nt
->OptionalHeader
.SizeOfStackReserve
;
1678 pool
->stack_info
.StackCommit
= nt
->OptionalHeader
.SizeOfStackCommit
;
1680 TRACE( "allocated threadpool %p\n", pool
);
1683 return STATUS_SUCCESS
;
1686 /***********************************************************************
1687 * tp_threadpool_shutdown (internal)
1689 * Prepares the shutdown of a threadpool object and notifies all worker
1690 * threads to terminate (after all remaining work items have been
1693 static void tp_threadpool_shutdown( struct threadpool
*pool
)
1695 assert( pool
!= default_threadpool
);
1697 pool
->shutdown
= TRUE
;
1698 RtlWakeAllConditionVariable( &pool
->update_event
);
1701 /***********************************************************************
1702 * tp_threadpool_release (internal)
1704 * Releases a reference to a threadpool object.
1706 static BOOL
tp_threadpool_release( struct threadpool
*pool
)
1710 if (InterlockedDecrement( &pool
->refcount
))
1713 TRACE( "destroying threadpool %p\n", pool
);
1715 assert( pool
->shutdown
);
1716 assert( !pool
->objcount
);
1717 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1718 assert( list_empty( &pool
->pools
[i
] ) );
1720 pool
->cs
.DebugInfo
->Spare
[0] = 0;
1721 RtlDeleteCriticalSection( &pool
->cs
);
1723 RtlFreeHeap( GetProcessHeap(), 0, pool
);
1727 /***********************************************************************
1728 * tp_threadpool_lock (internal)
1730 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1731 * block. When the lock is acquired successfully, it is guaranteed that
1732 * there is at least one worker thread to process tasks.
1734 static NTSTATUS
tp_threadpool_lock( struct threadpool
**out
, TP_CALLBACK_ENVIRON
*environment
)
1736 struct threadpool
*pool
= NULL
;
1737 NTSTATUS status
= STATUS_SUCCESS
;
1741 /* Validate environment parameters. */
1742 if (environment
->Version
== 3)
1744 TP_CALLBACK_ENVIRON_V3
*environment3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1746 switch (environment3
->CallbackPriority
)
1748 case TP_CALLBACK_PRIORITY_HIGH
:
1749 case TP_CALLBACK_PRIORITY_NORMAL
:
1750 case TP_CALLBACK_PRIORITY_LOW
:
1753 return STATUS_INVALID_PARAMETER
;
1757 pool
= (struct threadpool
*)environment
->Pool
;
1762 if (!default_threadpool
)
1764 status
= tp_threadpool_alloc( &pool
);
1765 if (status
!= STATUS_SUCCESS
)
1768 if (InterlockedCompareExchangePointer( (void *)&default_threadpool
, pool
, NULL
) != NULL
)
1770 tp_threadpool_shutdown( pool
);
1771 tp_threadpool_release( pool
);
1775 pool
= default_threadpool
;
1778 RtlEnterCriticalSection( &pool
->cs
);
1780 /* Make sure that the threadpool has at least one thread. */
1781 if (!pool
->num_workers
)
1782 status
= tp_new_worker_thread( pool
);
1784 /* Keep a reference, and increment objcount to ensure that the
1785 * last thread doesn't terminate. */
1786 if (status
== STATUS_SUCCESS
)
1788 InterlockedIncrement( &pool
->refcount
);
1792 RtlLeaveCriticalSection( &pool
->cs
);
1794 if (status
!= STATUS_SUCCESS
)
1798 return STATUS_SUCCESS
;
1801 /***********************************************************************
1802 * tp_threadpool_unlock (internal)
1804 * Releases a lock on a threadpool.
1806 static void tp_threadpool_unlock( struct threadpool
*pool
)
1808 RtlEnterCriticalSection( &pool
->cs
);
1810 RtlLeaveCriticalSection( &pool
->cs
);
1811 tp_threadpool_release( pool
);
1814 /***********************************************************************
1815 * tp_group_alloc (internal)
1817 * Allocates a new threadpool group object.
1819 static NTSTATUS
tp_group_alloc( struct threadpool_group
**out
)
1821 struct threadpool_group
*group
;
1823 group
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group
) );
1825 return STATUS_NO_MEMORY
;
1827 group
->refcount
= 1;
1828 group
->shutdown
= FALSE
;
1830 RtlInitializeCriticalSection( &group
->cs
);
1831 group
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool_group.cs");
1833 list_init( &group
->members
);
1835 TRACE( "allocated group %p\n", group
);
1838 return STATUS_SUCCESS
;
1841 /***********************************************************************
1842 * tp_group_shutdown (internal)
1844 * Marks the group object for shutdown.
1846 static void tp_group_shutdown( struct threadpool_group
*group
)
1848 group
->shutdown
= TRUE
;
1851 /***********************************************************************
1852 * tp_group_release (internal)
1854 * Releases a reference to a group object.
1856 static BOOL
tp_group_release( struct threadpool_group
*group
)
1858 if (InterlockedDecrement( &group
->refcount
))
1861 TRACE( "destroying group %p\n", group
);
1863 assert( group
->shutdown
);
1864 assert( list_empty( &group
->members
) );
1866 group
->cs
.DebugInfo
->Spare
[0] = 0;
1867 RtlDeleteCriticalSection( &group
->cs
);
1869 RtlFreeHeap( GetProcessHeap(), 0, group
);
1873 /***********************************************************************
1874 * tp_object_initialize (internal)
1876 * Initializes members of a threadpool object.
1878 static void tp_object_initialize( struct threadpool_object
*object
, struct threadpool
*pool
,
1879 PVOID userdata
, TP_CALLBACK_ENVIRON
*environment
)
1881 BOOL is_simple_callback
= (object
->type
== TP_OBJECT_TYPE_SIMPLE
);
1883 object
->refcount
= 1;
1884 object
->shutdown
= FALSE
;
1886 object
->pool
= pool
;
1887 object
->group
= NULL
;
1888 object
->userdata
= userdata
;
1889 object
->group_cancel_callback
= NULL
;
1890 object
->finalization_callback
= NULL
;
1891 object
->may_run_long
= 0;
1892 object
->race_dll
= NULL
;
1893 object
->priority
= TP_CALLBACK_PRIORITY_NORMAL
;
1895 memset( &object
->group_entry
, 0, sizeof(object
->group_entry
) );
1896 object
->is_group_member
= FALSE
;
1898 memset( &object
->pool_entry
, 0, sizeof(object
->pool_entry
) );
1899 RtlInitializeConditionVariable( &object
->finished_event
);
1900 RtlInitializeConditionVariable( &object
->group_finished_event
);
1901 object
->completed_event
= NULL
;
1902 object
->num_pending_callbacks
= 0;
1903 object
->num_running_callbacks
= 0;
1904 object
->num_associated_callbacks
= 0;
1908 if (environment
->Version
!= 1 && environment
->Version
!= 3)
1909 FIXME( "unsupported environment version %u\n", environment
->Version
);
1911 object
->group
= impl_from_TP_CLEANUP_GROUP( environment
->CleanupGroup
);
1912 object
->group_cancel_callback
= environment
->CleanupGroupCancelCallback
;
1913 object
->finalization_callback
= environment
->FinalizationCallback
;
1914 object
->may_run_long
= environment
->u
.s
.LongFunction
!= 0;
1915 object
->race_dll
= environment
->RaceDll
;
1916 if (environment
->Version
== 3)
1918 TP_CALLBACK_ENVIRON_V3
*environment_v3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1920 object
->priority
= environment_v3
->CallbackPriority
;
1921 assert( object
->priority
< ARRAY_SIZE(pool
->pools
) );
1924 if (environment
->ActivationContext
)
1925 FIXME( "activation context not supported yet\n" );
1927 if (environment
->u
.s
.Persistent
)
1928 FIXME( "persistent threads not supported yet\n" );
1931 if (object
->race_dll
)
1932 LdrAddRefDll( 0, object
->race_dll
);
1934 TRACE( "allocated object %p of type %u\n", object
, object
->type
);
1936 /* For simple callbacks we have to run tp_object_submit before adding this object
1937 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1938 * will be set, and tp_object_submit would fail with an assertion. */
1940 if (is_simple_callback
)
1941 tp_object_submit( object
, FALSE
);
1945 struct threadpool_group
*group
= object
->group
;
1946 InterlockedIncrement( &group
->refcount
);
1948 RtlEnterCriticalSection( &group
->cs
);
1949 list_add_tail( &group
->members
, &object
->group_entry
);
1950 object
->is_group_member
= TRUE
;
1951 RtlLeaveCriticalSection( &group
->cs
);
1954 if (is_simple_callback
)
1955 tp_object_release( object
);
1958 static void tp_object_prio_queue( struct threadpool_object
*object
)
1960 ++object
->pool
->num_busy_workers
;
1961 list_add_tail( &object
->pool
->pools
[object
->priority
], &object
->pool_entry
);
1964 /***********************************************************************
1965 * tp_object_submit (internal)
1967 * Submits a threadpool object to the associated threadpool. This
1968 * function has to be VOID because TpPostWork can never fail on Windows.
1970 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
)
1972 struct threadpool
*pool
= object
->pool
;
1973 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
1975 assert( !object
->shutdown
);
1976 assert( !pool
->shutdown
);
1978 RtlEnterCriticalSection( &pool
->cs
);
1980 /* Start new worker threads if required. */
1981 if (pool
->num_busy_workers
>= pool
->num_workers
&&
1982 pool
->num_workers
< pool
->max_workers
)
1983 status
= tp_new_worker_thread( pool
);
1985 /* Queue work item and increment refcount. */
1986 InterlockedIncrement( &object
->refcount
);
1987 if (!object
->num_pending_callbacks
++)
1988 tp_object_prio_queue( object
);
1990 /* Count how often the object was signaled. */
1991 if (object
->type
== TP_OBJECT_TYPE_WAIT
&& signaled
)
1992 object
->u
.wait
.signaled
++;
1994 /* No new thread started - wake up one existing thread. */
1995 if (status
!= STATUS_SUCCESS
)
1997 assert( pool
->num_workers
> 0 );
1998 RtlWakeConditionVariable( &pool
->update_event
);
2001 RtlLeaveCriticalSection( &pool
->cs
);
2004 /***********************************************************************
2005 * tp_object_cancel (internal)
2007 * Cancels all currently pending callbacks for a specific object.
2009 static void tp_object_cancel( struct threadpool_object
*object
)
2011 struct threadpool
*pool
= object
->pool
;
2012 LONG pending_callbacks
= 0;
2014 RtlEnterCriticalSection( &pool
->cs
);
2015 if (object
->num_pending_callbacks
)
2017 pending_callbacks
= object
->num_pending_callbacks
;
2018 object
->num_pending_callbacks
= 0;
2019 list_remove( &object
->pool_entry
);
2021 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2022 object
->u
.wait
.signaled
= 0;
2024 if (object
->type
== TP_OBJECT_TYPE_IO
)
2026 object
->u
.io
.skipped_count
+= object
->u
.io
.pending_count
;
2027 object
->u
.io
.pending_count
= 0;
2029 RtlLeaveCriticalSection( &pool
->cs
);
2031 while (pending_callbacks
--)
2032 tp_object_release( object
);
2035 static BOOL
object_is_finished( struct threadpool_object
*object
, BOOL group
)
2037 if (object
->num_pending_callbacks
)
2039 if (object
->type
== TP_OBJECT_TYPE_IO
&& object
->u
.io
.pending_count
)
2043 return !object
->num_running_callbacks
;
2045 return !object
->num_associated_callbacks
;
2048 /***********************************************************************
2049 * tp_object_wait (internal)
2051 * Waits until all pending and running callbacks of a specific object
2052 * have been processed.
2054 static void tp_object_wait( struct threadpool_object
*object
, BOOL group_wait
)
2056 struct threadpool
*pool
= object
->pool
;
2058 RtlEnterCriticalSection( &pool
->cs
);
2059 while (!object_is_finished( object
, group_wait
))
2062 RtlSleepConditionVariableCS( &object
->group_finished_event
, &pool
->cs
, NULL
);
2064 RtlSleepConditionVariableCS( &object
->finished_event
, &pool
->cs
, NULL
);
2066 RtlLeaveCriticalSection( &pool
->cs
);
2069 static void tp_ioqueue_unlock( struct threadpool_object
*io
)
2071 assert( io
->type
== TP_OBJECT_TYPE_IO
);
2073 RtlEnterCriticalSection( &ioqueue
.cs
);
2075 assert(ioqueue
.objcount
);
2077 if (!io
->shutdown
&& !--ioqueue
.objcount
)
2078 NtSetIoCompletion( ioqueue
.port
, 0, 0, STATUS_SUCCESS
, 0 );
2080 RtlLeaveCriticalSection( &ioqueue
.cs
);
2083 /***********************************************************************
2084 * tp_object_prepare_shutdown (internal)
2086 * Prepares a threadpool object for shutdown.
2088 static void tp_object_prepare_shutdown( struct threadpool_object
*object
)
2090 if (object
->type
== TP_OBJECT_TYPE_TIMER
)
2091 tp_timerqueue_unlock( object
);
2092 else if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2093 tp_waitqueue_unlock( object
);
2094 else if (object
->type
== TP_OBJECT_TYPE_IO
)
2095 tp_ioqueue_unlock( object
);
2098 /***********************************************************************
2099 * tp_object_release (internal)
2101 * Releases a reference to a threadpool object.
2103 static BOOL
tp_object_release( struct threadpool_object
*object
)
2105 if (InterlockedDecrement( &object
->refcount
))
2108 TRACE( "destroying object %p of type %u\n", object
, object
->type
);
2110 assert( object
->shutdown
);
2111 assert( !object
->num_pending_callbacks
);
2112 assert( !object
->num_running_callbacks
);
2113 assert( !object
->num_associated_callbacks
);
2115 /* release reference to the group */
2118 struct threadpool_group
*group
= object
->group
;
2120 RtlEnterCriticalSection( &group
->cs
);
2121 if (object
->is_group_member
)
2123 list_remove( &object
->group_entry
);
2124 object
->is_group_member
= FALSE
;
2126 RtlLeaveCriticalSection( &group
->cs
);
2128 tp_group_release( group
);
2131 tp_threadpool_unlock( object
->pool
);
2133 if (object
->race_dll
)
2134 LdrUnloadDll( object
->race_dll
);
2136 if (object
->completed_event
&& object
->completed_event
!= INVALID_HANDLE_VALUE
)
2137 NtSetEvent( object
->completed_event
, NULL
);
2139 RtlFreeHeap( GetProcessHeap(), 0, object
);
2143 static struct list
*threadpool_get_next_item( const struct threadpool
*pool
)
2148 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
2150 if ((ptr
= list_head( &pool
->pools
[i
] )))
2157 /***********************************************************************
2158 * tp_object_execute (internal)
2160 * Executes a threadpool object callback, object->pool->cs has to be
2163 static void tp_object_execute( struct threadpool_object
*object
, BOOL wait_thread
)
2165 TP_CALLBACK_INSTANCE
*callback_instance
;
2166 struct threadpool_instance instance
;
2167 struct io_completion completion
;
2168 struct threadpool
*pool
= object
->pool
;
2169 TP_WAIT_RESULT wait_result
= 0;
2172 object
->num_pending_callbacks
--;
2174 /* For wait objects check if they were signaled or have timed out. */
2175 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2177 wait_result
= object
->u
.wait
.signaled
? WAIT_OBJECT_0
: WAIT_TIMEOUT
;
2178 if (wait_result
== WAIT_OBJECT_0
) object
->u
.wait
.signaled
--;
2180 else if (object
->type
== TP_OBJECT_TYPE_IO
)
2182 assert( object
->u
.io
.completion_count
);
2183 completion
= object
->u
.io
.completions
[--object
->u
.io
.completion_count
];
2186 /* Leave critical section and do the actual callback. */
2187 object
->num_associated_callbacks
++;
2188 object
->num_running_callbacks
++;
2189 RtlLeaveCriticalSection( &pool
->cs
);
2190 if (wait_thread
) RtlLeaveCriticalSection( &waitqueue
.cs
);
2192 /* Initialize threadpool instance struct. */
2193 callback_instance
= (TP_CALLBACK_INSTANCE
*)&instance
;
2194 instance
.object
= object
;
2195 instance
.threadid
= GetCurrentThreadId();
2196 instance
.associated
= TRUE
;
2197 instance
.may_run_long
= object
->may_run_long
;
2198 instance
.cleanup
.critical_section
= NULL
;
2199 instance
.cleanup
.mutex
= NULL
;
2200 instance
.cleanup
.semaphore
= NULL
;
2201 instance
.cleanup
.semaphore_count
= 0;
2202 instance
.cleanup
.event
= NULL
;
2203 instance
.cleanup
.library
= NULL
;
2205 switch (object
->type
)
2207 case TP_OBJECT_TYPE_SIMPLE
:
2209 TRACE( "executing simple callback %p(%p, %p)\n",
2210 object
->u
.simple
.callback
, callback_instance
, object
->userdata
);
2211 object
->u
.simple
.callback( callback_instance
, object
->userdata
);
2212 TRACE( "callback %p returned\n", object
->u
.simple
.callback
);
2216 case TP_OBJECT_TYPE_WORK
:
2218 TRACE( "executing work callback %p(%p, %p, %p)\n",
2219 object
->u
.work
.callback
, callback_instance
, object
->userdata
, object
);
2220 object
->u
.work
.callback( callback_instance
, object
->userdata
, (TP_WORK
*)object
);
2221 TRACE( "callback %p returned\n", object
->u
.work
.callback
);
2225 case TP_OBJECT_TYPE_TIMER
:
2227 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2228 object
->u
.timer
.callback
, callback_instance
, object
->userdata
, object
);
2229 object
->u
.timer
.callback( callback_instance
, object
->userdata
, (TP_TIMER
*)object
);
2230 TRACE( "callback %p returned\n", object
->u
.timer
.callback
);
2234 case TP_OBJECT_TYPE_WAIT
:
2236 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2237 object
->u
.wait
.callback
, callback_instance
, object
->userdata
, object
, wait_result
);
2238 object
->u
.wait
.callback( callback_instance
, object
->userdata
, (TP_WAIT
*)object
, wait_result
);
2239 TRACE( "callback %p returned\n", object
->u
.wait
.callback
);
2243 case TP_OBJECT_TYPE_IO
:
2245 TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
2246 object
->u
.io
.callback
, callback_instance
, object
->userdata
,
2247 completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2248 object
->u
.io
.callback( callback_instance
, object
->userdata
,
2249 (void *)completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2250 TRACE( "callback %p returned\n", object
->u
.io
.callback
);
2259 /* Execute finalization callback. */
2260 if (object
->finalization_callback
)
2262 TRACE( "executing finalization callback %p(%p, %p)\n",
2263 object
->finalization_callback
, callback_instance
, object
->userdata
);
2264 object
->finalization_callback( callback_instance
, object
->userdata
);
2265 TRACE( "callback %p returned\n", object
->finalization_callback
);
2268 /* Execute cleanup tasks. */
2269 if (instance
.cleanup
.critical_section
)
2271 RtlLeaveCriticalSection( instance
.cleanup
.critical_section
);
2273 if (instance
.cleanup
.mutex
)
2275 status
= NtReleaseMutant( instance
.cleanup
.mutex
, NULL
);
2276 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2278 if (instance
.cleanup
.semaphore
)
2280 status
= NtReleaseSemaphore( instance
.cleanup
.semaphore
, instance
.cleanup
.semaphore_count
, NULL
);
2281 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2283 if (instance
.cleanup
.event
)
2285 status
= NtSetEvent( instance
.cleanup
.event
, NULL
);
2286 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2288 if (instance
.cleanup
.library
)
2290 LdrUnloadDll( instance
.cleanup
.library
);
2294 if (wait_thread
) RtlEnterCriticalSection( &waitqueue
.cs
);
2295 RtlEnterCriticalSection( &pool
->cs
);
2297 /* Simple callbacks are automatically shutdown after execution. */
2298 if (object
->type
== TP_OBJECT_TYPE_SIMPLE
)
2300 tp_object_prepare_shutdown( object
);
2301 object
->shutdown
= TRUE
;
2304 object
->num_running_callbacks
--;
2305 if (object_is_finished( object
, TRUE
))
2306 RtlWakeAllConditionVariable( &object
->group_finished_event
);
2308 if (instance
.associated
)
2310 object
->num_associated_callbacks
--;
2311 if (object_is_finished( object
, FALSE
))
2312 RtlWakeAllConditionVariable( &object
->finished_event
);
2316 /***********************************************************************
2317 * threadpool_worker_proc (internal)
2319 static void CALLBACK
threadpool_worker_proc( void *param
)
2321 struct threadpool
*pool
= param
;
2322 LARGE_INTEGER timeout
;
2325 TRACE( "starting worker thread for pool %p\n", pool
);
2327 RtlEnterCriticalSection( &pool
->cs
);
2330 while ((ptr
= threadpool_get_next_item( pool
)))
2332 struct threadpool_object
*object
= LIST_ENTRY( ptr
, struct threadpool_object
, pool_entry
);
2333 assert( object
->num_pending_callbacks
> 0 );
2335 /* If further pending callbacks are queued, move the work item to
2336 * the end of the pool list. Otherwise remove it from the pool. */
2337 list_remove( &object
->pool_entry
);
2338 if (object
->num_pending_callbacks
> 1)
2339 tp_object_prio_queue( object
);
2341 tp_object_execute( object
, FALSE
);
2343 assert(pool
->num_busy_workers
);
2344 pool
->num_busy_workers
--;
2346 tp_object_release( object
);
2349 /* Shutdown worker thread if requested. */
2353 /* Wait for new tasks or until the timeout expires. A thread only terminates
2354 * when no new tasks are available, and the number of threads can be
2355 * decreased without violating the min_workers limit. An exception is when
2356 * min_workers == 0, then objcount is used to detect if the last thread
2357 * can be terminated. */
2358 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
2359 if (RtlSleepConditionVariableCS( &pool
->update_event
, &pool
->cs
, &timeout
) == STATUS_TIMEOUT
&&
2360 !threadpool_get_next_item( pool
) && (pool
->num_workers
> max( pool
->min_workers
, 1 ) ||
2361 (!pool
->min_workers
&& !pool
->objcount
)))
2366 pool
->num_workers
--;
2367 RtlLeaveCriticalSection( &pool
->cs
);
2369 TRACE( "terminating worker thread for pool %p\n", pool
);
2370 tp_threadpool_release( pool
);
2371 RtlExitUserThread( 0 );
2374 /***********************************************************************
2375 * TpAllocCleanupGroup (NTDLL.@)
2377 NTSTATUS WINAPI
TpAllocCleanupGroup( TP_CLEANUP_GROUP
**out
)
2379 TRACE( "%p\n", out
);
2381 return tp_group_alloc( (struct threadpool_group
**)out
);
2384 /***********************************************************************
2385 * TpAllocIoCompletion (NTDLL.@)
2387 NTSTATUS WINAPI
TpAllocIoCompletion( TP_IO
**out
, HANDLE file
, PTP_IO_CALLBACK callback
,
2388 void *userdata
, TP_CALLBACK_ENVIRON
*environment
)
2390 struct threadpool_object
*object
;
2391 struct threadpool
*pool
;
2394 TRACE( "%p %p %p %p %p\n", out
, file
, callback
, userdata
, environment
);
2396 if (!(object
= RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY
, sizeof(*object
) )))
2397 return STATUS_NO_MEMORY
;
2399 if ((status
= tp_threadpool_lock( &pool
, environment
)))
2401 RtlFreeHeap( GetProcessHeap(), 0, object
);
2405 object
->type
= TP_OBJECT_TYPE_IO
;
2406 object
->u
.io
.callback
= callback
;
2407 if (!(object
->u
.io
.completions
= RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object
->u
.io
.completions
) )))
2409 tp_threadpool_unlock( pool
);
2410 RtlFreeHeap( GetProcessHeap(), 0, object
);
2414 if ((status
= tp_ioqueue_lock( object
, file
)))
2416 tp_threadpool_unlock( pool
);
2417 RtlFreeHeap( GetProcessHeap(), 0, object
->u
.io
.completions
);
2418 RtlFreeHeap( GetProcessHeap(), 0, object
);
2422 tp_object_initialize( object
, pool
, userdata
, environment
);
2424 *out
= (TP_IO
*)object
;
2425 return STATUS_SUCCESS
;
2428 /***********************************************************************
2429 * TpAllocPool (NTDLL.@)
2431 NTSTATUS WINAPI
TpAllocPool( TP_POOL
**out
, PVOID reserved
)
2433 TRACE( "%p %p\n", out
, reserved
);
2436 FIXME( "reserved argument is nonzero (%p)\n", reserved
);
2438 return tp_threadpool_alloc( (struct threadpool
**)out
);
2441 /***********************************************************************
2442 * TpAllocTimer (NTDLL.@)
2444 NTSTATUS WINAPI
TpAllocTimer( TP_TIMER
**out
, PTP_TIMER_CALLBACK callback
, PVOID userdata
,
2445 TP_CALLBACK_ENVIRON
*environment
)
2447 struct threadpool_object
*object
;
2448 struct threadpool
*pool
;
2451 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2453 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2455 return STATUS_NO_MEMORY
;
2457 status
= tp_threadpool_lock( &pool
, environment
);
2460 RtlFreeHeap( GetProcessHeap(), 0, object
);
2464 object
->type
= TP_OBJECT_TYPE_TIMER
;
2465 object
->u
.timer
.callback
= callback
;
2467 status
= tp_timerqueue_lock( object
);
2470 tp_threadpool_unlock( pool
);
2471 RtlFreeHeap( GetProcessHeap(), 0, object
);
2475 tp_object_initialize( object
, pool
, userdata
, environment
);
2477 *out
= (TP_TIMER
*)object
;
2478 return STATUS_SUCCESS
;
2481 static NTSTATUS
tp_alloc_wait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2482 TP_CALLBACK_ENVIRON
*environment
, DWORD flags
)
2484 struct threadpool_object
*object
;
2485 struct threadpool
*pool
;
2488 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2490 return STATUS_NO_MEMORY
;
2492 status
= tp_threadpool_lock( &pool
, environment
);
2495 RtlFreeHeap( GetProcessHeap(), 0, object
);
2499 object
->type
= TP_OBJECT_TYPE_WAIT
;
2500 object
->u
.wait
.callback
= callback
;
2501 object
->u
.wait
.flags
= flags
;
2503 status
= tp_waitqueue_lock( object
);
2506 tp_threadpool_unlock( pool
);
2507 RtlFreeHeap( GetProcessHeap(), 0, object
);
2511 tp_object_initialize( object
, pool
, userdata
, environment
);
2513 *out
= (TP_WAIT
*)object
;
2514 return STATUS_SUCCESS
;
2517 /***********************************************************************
2518 * TpAllocWait (NTDLL.@)
2520 NTSTATUS WINAPI
TpAllocWait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2521 TP_CALLBACK_ENVIRON
*environment
)
2523 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2524 return tp_alloc_wait( out
, callback
, userdata
, environment
, WT_EXECUTEONLYONCE
);
2527 /***********************************************************************
2528 * TpAllocWork (NTDLL.@)
2530 NTSTATUS WINAPI
TpAllocWork( TP_WORK
**out
, PTP_WORK_CALLBACK callback
, PVOID userdata
,
2531 TP_CALLBACK_ENVIRON
*environment
)
2533 struct threadpool_object
*object
;
2534 struct threadpool
*pool
;
2537 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2539 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2541 return STATUS_NO_MEMORY
;
2543 status
= tp_threadpool_lock( &pool
, environment
);
2546 RtlFreeHeap( GetProcessHeap(), 0, object
);
2550 object
->type
= TP_OBJECT_TYPE_WORK
;
2551 object
->u
.work
.callback
= callback
;
2552 tp_object_initialize( object
, pool
, userdata
, environment
);
2554 *out
= (TP_WORK
*)object
;
2555 return STATUS_SUCCESS
;
2558 /***********************************************************************
2559 * TpCancelAsyncIoOperation (NTDLL.@)
2561 void WINAPI
TpCancelAsyncIoOperation( TP_IO
*io
)
2563 struct threadpool_object
*this = impl_from_TP_IO( io
);
2565 TRACE( "%p\n", io
);
2567 RtlEnterCriticalSection( &this->pool
->cs
);
2569 TRACE("pending_count %u.\n", this->u
.io
.pending_count
);
2571 this->u
.io
.pending_count
--;
2572 if (object_is_finished( this, TRUE
))
2573 RtlWakeAllConditionVariable( &this->group_finished_event
);
2574 if (object_is_finished( this, FALSE
))
2575 RtlWakeAllConditionVariable( &this->finished_event
);
2577 RtlLeaveCriticalSection( &this->pool
->cs
);
2580 /***********************************************************************
2581 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2583 VOID WINAPI
TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE
*instance
, CRITICAL_SECTION
*crit
)
2585 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2587 TRACE( "%p %p\n", instance
, crit
);
2589 if (!this->cleanup
.critical_section
)
2590 this->cleanup
.critical_section
= crit
;
2593 /***********************************************************************
2594 * TpCallbackMayRunLong (NTDLL.@)
2596 NTSTATUS WINAPI
TpCallbackMayRunLong( TP_CALLBACK_INSTANCE
*instance
)
2598 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2599 struct threadpool_object
*object
= this->object
;
2600 struct threadpool
*pool
;
2601 NTSTATUS status
= STATUS_SUCCESS
;
2603 TRACE( "%p\n", instance
);
2605 if (this->threadid
!= GetCurrentThreadId())
2607 ERR("called from wrong thread, ignoring\n");
2608 return STATUS_UNSUCCESSFUL
; /* FIXME */
2611 if (this->may_run_long
)
2612 return STATUS_SUCCESS
;
2614 pool
= object
->pool
;
2615 RtlEnterCriticalSection( &pool
->cs
);
2617 /* Start new worker threads if required. */
2618 if (pool
->num_busy_workers
>= pool
->num_workers
)
2620 if (pool
->num_workers
< pool
->max_workers
)
2622 status
= tp_new_worker_thread( pool
);
2626 status
= STATUS_TOO_MANY_THREADS
;
2630 RtlLeaveCriticalSection( &pool
->cs
);
2631 this->may_run_long
= TRUE
;
2635 /***********************************************************************
2636 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2638 VOID WINAPI
TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE mutex
)
2640 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2642 TRACE( "%p %p\n", instance
, mutex
);
2644 if (!this->cleanup
.mutex
)
2645 this->cleanup
.mutex
= mutex
;
2648 /***********************************************************************
2649 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2651 VOID WINAPI
TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE semaphore
, DWORD count
)
2653 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2655 TRACE( "%p %p %u\n", instance
, semaphore
, count
);
2657 if (!this->cleanup
.semaphore
)
2659 this->cleanup
.semaphore
= semaphore
;
2660 this->cleanup
.semaphore_count
= count
;
2664 /***********************************************************************
2665 * TpCallbackSetEventOnCompletion (NTDLL.@)
2667 VOID WINAPI
TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE event
)
2669 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2671 TRACE( "%p %p\n", instance
, event
);
2673 if (!this->cleanup
.event
)
2674 this->cleanup
.event
= event
;
2677 /***********************************************************************
2678 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2680 VOID WINAPI
TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HMODULE module
)
2682 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2684 TRACE( "%p %p\n", instance
, module
);
2686 if (!this->cleanup
.library
)
2687 this->cleanup
.library
= module
;
2690 /***********************************************************************
2691 * TpDisassociateCallback (NTDLL.@)
2693 VOID WINAPI
TpDisassociateCallback( TP_CALLBACK_INSTANCE
*instance
)
2695 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2696 struct threadpool_object
*object
= this->object
;
2697 struct threadpool
*pool
;
2699 TRACE( "%p\n", instance
);
2701 if (this->threadid
!= GetCurrentThreadId())
2703 ERR("called from wrong thread, ignoring\n");
2707 if (!this->associated
)
2710 pool
= object
->pool
;
2711 RtlEnterCriticalSection( &pool
->cs
);
2713 object
->num_associated_callbacks
--;
2714 if (object_is_finished( object
, FALSE
))
2715 RtlWakeAllConditionVariable( &object
->finished_event
);
2717 RtlLeaveCriticalSection( &pool
->cs
);
2718 this->associated
= FALSE
;
2721 /***********************************************************************
2722 * TpIsTimerSet (NTDLL.@)
2724 BOOL WINAPI
TpIsTimerSet( TP_TIMER
*timer
)
2726 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2728 TRACE( "%p\n", timer
);
2730 return this->u
.timer
.timer_set
;
2733 /***********************************************************************
2734 * TpPostWork (NTDLL.@)
2736 VOID WINAPI
TpPostWork( TP_WORK
*work
)
2738 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2740 TRACE( "%p\n", work
);
2742 tp_object_submit( this, FALSE
);
2745 /***********************************************************************
2746 * TpReleaseCleanupGroup (NTDLL.@)
2748 VOID WINAPI
TpReleaseCleanupGroup( TP_CLEANUP_GROUP
*group
)
2750 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2752 TRACE( "%p\n", group
);
2754 tp_group_shutdown( this );
2755 tp_group_release( this );
2758 /***********************************************************************
2759 * TpReleaseCleanupGroupMembers (NTDLL.@)
2761 VOID WINAPI
TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP
*group
, BOOL cancel_pending
, PVOID userdata
)
2763 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2764 struct threadpool_object
*object
, *next
;
2765 struct list members
;
2767 TRACE( "%p %u %p\n", group
, cancel_pending
, userdata
);
2769 RtlEnterCriticalSection( &this->cs
);
2771 /* Unset group, increase references, and mark objects for shutdown */
2772 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &this->members
, struct threadpool_object
, group_entry
)
2774 assert( object
->group
== this );
2775 assert( object
->is_group_member
);
2777 if (InterlockedIncrement( &object
->refcount
) == 1)
2779 /* Object is basically already destroyed, but group reference
2780 * was not deleted yet. We can safely ignore this object. */
2781 InterlockedDecrement( &object
->refcount
);
2782 list_remove( &object
->group_entry
);
2783 object
->is_group_member
= FALSE
;
2787 object
->is_group_member
= FALSE
;
2788 tp_object_prepare_shutdown( object
);
2791 /* Move members to a new temporary list */
2792 list_init( &members
);
2793 list_move_tail( &members
, &this->members
);
2795 RtlLeaveCriticalSection( &this->cs
);
2797 /* Cancel pending callbacks if requested */
2800 LIST_FOR_EACH_ENTRY( object
, &members
, struct threadpool_object
, group_entry
)
2802 tp_object_cancel( object
);
2806 /* Wait for remaining callbacks to finish */
2807 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &members
, struct threadpool_object
, group_entry
)
2809 tp_object_wait( object
, TRUE
);
2811 if (!object
->shutdown
)
2813 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2814 if (cancel_pending
&& object
->group_cancel_callback
)
2816 TRACE( "executing group cancel callback %p(%p, %p)\n",
2817 object
->group_cancel_callback
, object
->userdata
, userdata
);
2818 object
->group_cancel_callback( object
->userdata
, userdata
);
2819 TRACE( "callback %p returned\n", object
->group_cancel_callback
);
2822 if (object
->type
!= TP_OBJECT_TYPE_SIMPLE
)
2823 tp_object_release( object
);
2826 object
->shutdown
= TRUE
;
2827 tp_object_release( object
);
2831 /***********************************************************************
2832 * TpReleaseIoCompletion (NTDLL.@)
2834 void WINAPI
TpReleaseIoCompletion( TP_IO
*io
)
2836 struct threadpool_object
*this = impl_from_TP_IO( io
);
2839 TRACE( "%p\n", io
);
2841 RtlEnterCriticalSection( &this->pool
->cs
);
2842 this->u
.io
.shutting_down
= TRUE
;
2843 can_destroy
= !this->u
.io
.pending_count
&& !this->u
.io
.skipped_count
;
2844 RtlLeaveCriticalSection( &this->pool
->cs
);
2848 tp_object_prepare_shutdown( this );
2849 this->shutdown
= TRUE
;
2850 tp_object_release( this );
2854 /***********************************************************************
2855 * TpReleasePool (NTDLL.@)
2857 VOID WINAPI
TpReleasePool( TP_POOL
*pool
)
2859 struct threadpool
*this = impl_from_TP_POOL( pool
);
2861 TRACE( "%p\n", pool
);
2863 tp_threadpool_shutdown( this );
2864 tp_threadpool_release( this );
2867 /***********************************************************************
2868 * TpReleaseTimer (NTDLL.@)
2870 VOID WINAPI
TpReleaseTimer( TP_TIMER
*timer
)
2872 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2874 TRACE( "%p\n", timer
);
2876 tp_object_prepare_shutdown( this );
2877 this->shutdown
= TRUE
;
2878 tp_object_release( this );
2881 /***********************************************************************
2882 * TpReleaseWait (NTDLL.@)
2884 VOID WINAPI
TpReleaseWait( TP_WAIT
*wait
)
2886 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2888 TRACE( "%p\n", wait
);
2890 tp_object_prepare_shutdown( this );
2891 this->shutdown
= TRUE
;
2892 tp_object_release( this );
2895 /***********************************************************************
2896 * TpReleaseWork (NTDLL.@)
2898 VOID WINAPI
TpReleaseWork( TP_WORK
*work
)
2900 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2902 TRACE( "%p\n", work
);
2904 tp_object_prepare_shutdown( this );
2905 this->shutdown
= TRUE
;
2906 tp_object_release( this );
2909 /***********************************************************************
2910 * TpSetPoolMaxThreads (NTDLL.@)
2912 VOID WINAPI
TpSetPoolMaxThreads( TP_POOL
*pool
, DWORD maximum
)
2914 struct threadpool
*this = impl_from_TP_POOL( pool
);
2916 TRACE( "%p %u\n", pool
, maximum
);
2918 RtlEnterCriticalSection( &this->cs
);
2919 this->max_workers
= max( maximum
, 1 );
2920 this->min_workers
= min( this->min_workers
, this->max_workers
);
2921 RtlLeaveCriticalSection( &this->cs
);
2924 /***********************************************************************
2925 * TpSetPoolMinThreads (NTDLL.@)
2927 BOOL WINAPI
TpSetPoolMinThreads( TP_POOL
*pool
, DWORD minimum
)
2929 struct threadpool
*this = impl_from_TP_POOL( pool
);
2930 NTSTATUS status
= STATUS_SUCCESS
;
2932 TRACE( "%p %u\n", pool
, minimum
);
2934 RtlEnterCriticalSection( &this->cs
);
2936 while (this->num_workers
< minimum
)
2938 status
= tp_new_worker_thread( this );
2939 if (status
!= STATUS_SUCCESS
)
2943 if (status
== STATUS_SUCCESS
)
2945 this->min_workers
= minimum
;
2946 this->max_workers
= max( this->min_workers
, this->max_workers
);
2949 RtlLeaveCriticalSection( &this->cs
);
2953 /***********************************************************************
2954 * TpSetTimer (NTDLL.@)
2956 VOID WINAPI
TpSetTimer( TP_TIMER
*timer
, LARGE_INTEGER
*timeout
, LONG period
, LONG window_length
)
2958 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2959 struct threadpool_object
*other_timer
;
2960 BOOL submit_timer
= FALSE
;
2961 ULONGLONG timestamp
;
2963 TRACE( "%p %p %u %u\n", timer
, timeout
, period
, window_length
);
2965 RtlEnterCriticalSection( &timerqueue
.cs
);
2967 assert( this->u
.timer
.timer_initialized
);
2968 this->u
.timer
.timer_set
= timeout
!= NULL
;
2970 /* Convert relative timeout to absolute timestamp and handle a timeout
2971 * of zero, which means that the timer is submitted immediately. */
2974 timestamp
= timeout
->QuadPart
;
2975 if ((LONGLONG
)timestamp
< 0)
2978 NtQuerySystemTime( &now
);
2979 timestamp
= now
.QuadPart
- timestamp
;
2981 else if (!timestamp
)
2988 NtQuerySystemTime( &now
);
2989 timestamp
= now
.QuadPart
+ (ULONGLONG
)period
* 10000;
2991 submit_timer
= TRUE
;
2995 /* First remove existing timeout. */
2996 if (this->u
.timer
.timer_pending
)
2998 list_remove( &this->u
.timer
.timer_entry
);
2999 this->u
.timer
.timer_pending
= FALSE
;
3002 /* If the timer was enabled, then add it back to the queue. */
3005 this->u
.timer
.timeout
= timestamp
;
3006 this->u
.timer
.period
= period
;
3007 this->u
.timer
.window_length
= window_length
;
3009 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
3010 struct threadpool_object
, u
.timer
.timer_entry
)
3012 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
3013 if (this->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
3016 list_add_before( &other_timer
->u
.timer
.timer_entry
, &this->u
.timer
.timer_entry
);
3018 /* Wake up the timer thread when the timeout has to be updated. */
3019 if (list_head( &timerqueue
.pending_timers
) == &this->u
.timer
.timer_entry
)
3020 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
3022 this->u
.timer
.timer_pending
= TRUE
;
3025 RtlLeaveCriticalSection( &timerqueue
.cs
);
3028 tp_object_submit( this, FALSE
);
3031 /***********************************************************************
3032 * TpSetWait (NTDLL.@)
3034 VOID WINAPI
TpSetWait( TP_WAIT
*wait
, HANDLE handle
, LARGE_INTEGER
*timeout
)
3036 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
3037 ULONGLONG timestamp
= MAXLONGLONG
;
3039 TRACE( "%p %p %p\n", wait
, handle
, timeout
);
3041 RtlEnterCriticalSection( &waitqueue
.cs
);
3043 assert( this->u
.wait
.bucket
);
3044 this->u
.wait
.handle
= handle
;
3046 if (handle
|| this->u
.wait
.wait_pending
)
3048 struct waitqueue_bucket
*bucket
= this->u
.wait
.bucket
;
3049 list_remove( &this->u
.wait
.wait_entry
);
3051 /* Convert relative timeout to absolute timestamp. */
3052 if (handle
&& timeout
)
3054 timestamp
= timeout
->QuadPart
;
3055 if ((LONGLONG
)timestamp
< 0)
3058 NtQuerySystemTime( &now
);
3059 timestamp
= now
.QuadPart
- timestamp
;
3063 /* Add wait object back into one of the queues. */
3066 list_add_tail( &bucket
->waiting
, &this->u
.wait
.wait_entry
);
3067 this->u
.wait
.wait_pending
= TRUE
;
3068 this->u
.wait
.timeout
= timestamp
;
3072 list_add_tail( &bucket
->reserved
, &this->u
.wait
.wait_entry
);
3073 this->u
.wait
.wait_pending
= FALSE
;
3076 /* Wake up the wait queue thread. */
3077 NtSetEvent( bucket
->update_event
, NULL
);
3080 RtlLeaveCriticalSection( &waitqueue
.cs
);
3083 /***********************************************************************
3084 * TpSimpleTryPost (NTDLL.@)
3086 NTSTATUS WINAPI
TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback
, PVOID userdata
,
3087 TP_CALLBACK_ENVIRON
*environment
)
3089 struct threadpool_object
*object
;
3090 struct threadpool
*pool
;
3093 TRACE( "%p %p %p\n", callback
, userdata
, environment
);
3095 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
3097 return STATUS_NO_MEMORY
;
3099 status
= tp_threadpool_lock( &pool
, environment
);
3102 RtlFreeHeap( GetProcessHeap(), 0, object
);
3106 object
->type
= TP_OBJECT_TYPE_SIMPLE
;
3107 object
->u
.simple
.callback
= callback
;
3108 tp_object_initialize( object
, pool
, userdata
, environment
);
3110 return STATUS_SUCCESS
;
3113 /***********************************************************************
3114 * TpStartAsyncIoOperation (NTDLL.@)
3116 void WINAPI
TpStartAsyncIoOperation( TP_IO
*io
)
3118 struct threadpool_object
*this = impl_from_TP_IO( io
);
3120 TRACE( "%p\n", io
);
3122 RtlEnterCriticalSection( &this->pool
->cs
);
3124 this->u
.io
.pending_count
++;
3126 RtlLeaveCriticalSection( &this->pool
->cs
);
3129 /***********************************************************************
3130 * TpWaitForIoCompletion (NTDLL.@)
3132 void WINAPI
TpWaitForIoCompletion( TP_IO
*io
, BOOL cancel_pending
)
3134 struct threadpool_object
*this = impl_from_TP_IO( io
);
3136 TRACE( "%p %d\n", io
, cancel_pending
);
3139 tp_object_cancel( this );
3140 tp_object_wait( this, FALSE
);
3143 /***********************************************************************
3144 * TpWaitForTimer (NTDLL.@)
3146 VOID WINAPI
TpWaitForTimer( TP_TIMER
*timer
, BOOL cancel_pending
)
3148 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
3150 TRACE( "%p %d\n", timer
, cancel_pending
);
3153 tp_object_cancel( this );
3154 tp_object_wait( this, FALSE
);
3157 /***********************************************************************
3158 * TpWaitForWait (NTDLL.@)
3160 VOID WINAPI
TpWaitForWait( TP_WAIT
*wait
, BOOL cancel_pending
)
3162 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
3164 TRACE( "%p %d\n", wait
, cancel_pending
);
3167 tp_object_cancel( this );
3168 tp_object_wait( this, FALSE
);
3171 /***********************************************************************
3172 * TpWaitForWork (NTDLL.@)
3174 VOID WINAPI
TpWaitForWork( TP_WORK
*work
, BOOL cancel_pending
)
3176 struct threadpool_object
*this = impl_from_TP_WORK( work
);
3178 TRACE( "%p %u\n", work
, cancel_pending
);
3181 tp_object_cancel( this );
3182 tp_object_wait( this, FALSE
);
3185 /***********************************************************************
3186 * TpSetPoolStackInformation (NTDLL.@)
3188 NTSTATUS WINAPI
TpSetPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3190 struct threadpool
*this = impl_from_TP_POOL( pool
);
3192 TRACE( "%p %p\n", pool
, stack_info
);
3195 return STATUS_INVALID_PARAMETER
;
3197 RtlEnterCriticalSection( &this->cs
);
3198 this->stack_info
= *stack_info
;
3199 RtlLeaveCriticalSection( &this->cs
);
3201 return STATUS_SUCCESS
;
3204 /***********************************************************************
3205 * TpQueryPoolStackInformation (NTDLL.@)
3207 NTSTATUS WINAPI
TpQueryPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3209 struct threadpool
*this = impl_from_TP_POOL( pool
);
3211 TRACE( "%p %p\n", pool
, stack_info
);
3214 return STATUS_INVALID_PARAMETER
;
3216 RtlEnterCriticalSection( &this->cs
);
3217 *stack_info
= this->stack_info
;
3218 RtlLeaveCriticalSection( &this->cs
);
3220 return STATUS_SUCCESS
;
3223 static void CALLBACK
rtl_wait_callback( TP_CALLBACK_INSTANCE
*instance
, void *userdata
, TP_WAIT
*wait
, TP_WAIT_RESULT result
)
3225 struct threadpool_object
*object
= impl_from_TP_WAIT(wait
);
3226 object
->u
.wait
.rtl_callback( userdata
, result
!= STATUS_WAIT_0
);
3229 /***********************************************************************
3230 * RtlRegisterWait (NTDLL.@)
3232 * Registers a wait for a handle to become signaled.
3235 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3236 * Object [I] Object to wait to become signaled.
3237 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3238 * Context [I] Context to pass to the callback function when it is executed.
3239 * Milliseconds [I] Number of milliseconds to wait before timing out.
3240 * Flags [I] Flags. See notes.
3243 * Success: STATUS_SUCCESS.
3244 * Failure: Any NTSTATUS code.
3247 * Flags can be one or more of the following:
3248 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3249 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3250 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3251 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3252 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3254 NTSTATUS WINAPI
RtlRegisterWait( HANDLE
*out
, HANDLE handle
, RTL_WAITORTIMERCALLBACKFUNC callback
,
3255 void *context
, ULONG milliseconds
, ULONG flags
)
3257 struct threadpool_object
*object
;
3258 TP_CALLBACK_ENVIRON environment
;
3259 LARGE_INTEGER timeout
;
3263 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %u, flags %x\n",
3264 out
, handle
, callback
, context
, milliseconds
, flags
);
3266 memset( &environment
, 0, sizeof(environment
) );
3267 environment
.Version
= 1;
3268 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
3269 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
3271 flags
&= (WT_EXECUTEONLYONCE
| WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
);
3272 if ((status
= tp_alloc_wait( &wait
, rtl_wait_callback
, context
, &environment
, flags
)))
3275 object
= impl_from_TP_WAIT(wait
);
3276 object
->u
.wait
.rtl_callback
= callback
;
3278 RtlEnterCriticalSection( &waitqueue
.cs
);
3279 TpSetWait( (TP_WAIT
*)object
, handle
, get_nt_timeout( &timeout
, milliseconds
) );
3282 RtlLeaveCriticalSection( &waitqueue
.cs
);
3284 return STATUS_SUCCESS
;
3287 /***********************************************************************
3288 * RtlDeregisterWaitEx (NTDLL.@)
3290 * Cancels a wait operation and frees the resources associated with calling
3291 * RtlRegisterWait().
3294 * WaitObject [I] Handle to the wait object to free.
3297 * Success: STATUS_SUCCESS.
3298 * Failure: Any NTSTATUS code.
3300 NTSTATUS WINAPI
RtlDeregisterWaitEx( HANDLE handle
, HANDLE event
)
3302 struct threadpool_object
*object
= handle
;
3305 TRACE( "handle %p, event %p\n", handle
, event
);
3307 if (!object
) return STATUS_INVALID_HANDLE
;
3309 TpSetWait( (TP_WAIT
*)object
, NULL
, NULL
);
3311 if (event
== INVALID_HANDLE_VALUE
) TpWaitForWait( (TP_WAIT
*)object
, TRUE
);
3314 assert( object
->completed_event
== NULL
);
3315 object
->completed_event
= event
;
3318 RtlEnterCriticalSection( &object
->pool
->cs
);
3319 if (object
->num_pending_callbacks
+ object
->num_running_callbacks
3320 + object
->num_associated_callbacks
) status
= STATUS_PENDING
;
3321 else status
= STATUS_SUCCESS
;
3322 RtlLeaveCriticalSection( &object
->pool
->cs
);
3324 TpReleaseWait( (TP_WAIT
*)object
);
3328 /***********************************************************************
3329 * RtlDeregisterWait (NTDLL.@)
3331 * Cancels a wait operation and frees the resources associated with calling
3332 * RtlRegisterWait().
3335 * WaitObject [I] Handle to the wait object to free.
3338 * Success: STATUS_SUCCESS.
3339 * Failure: Any NTSTATUS code.
3341 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
3343 return RtlDeregisterWaitEx(WaitHandle
, NULL
);