4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #define NONAMELESSUNION
28 #define WIN32_NO_STATUS
31 #include "wine/debug.h"
32 #include "wine/list.h"
34 #include "ntdll_misc.h"
36 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
39 * Old thread pooling API
44 PRTL_WORK_ITEM_ROUTINE function
;
48 #define EXPIRE_NEVER (~(ULONGLONG)0)
49 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
51 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
;
56 RTL_CRITICAL_SECTION threadpool_compl_cs
;
60 NULL
, /* compl_port */
61 { &critsect_compl_debug
, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
64 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
66 0, 0, &old_threadpool
.threadpool_compl_cs
,
67 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
68 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
74 struct timer_queue
*q
;
76 ULONG runcount
; /* number of callbacks pending execution */
77 RTL_WAITORTIMERCALLBACKFUNC callback
;
82 BOOL destroy
; /* timer should be deleted; once set, never unset */
83 HANDLE event
; /* removal event */
89 RTL_CRITICAL_SECTION cs
;
90 struct list timers
; /* sorted by expiration time */
91 BOOL quit
; /* queue should be deleted; once set, never unset */
97 * Object-oriented thread pooling API
100 #define THREADPOOL_WORKER_TIMEOUT 5000
101 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
103 /* internal threadpool representation */
110 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
111 struct list pools
[3];
112 RTL_CONDITION_VARIABLE update_event
;
113 /* information about worker threads, locked via .cs */
117 int num_busy_workers
;
119 TP_POOL_STACK_INFORMATION stack_info
;
122 enum threadpool_objtype
124 TP_OBJECT_TYPE_SIMPLE
,
126 TP_OBJECT_TYPE_TIMER
,
133 IO_STATUS_BLOCK iosb
;
137 /* internal threadpool object representation */
138 struct threadpool_object
140 void *win32_callback
; /* leave space for kernelbase to store win32 callback */
143 /* read-only information */
144 enum threadpool_objtype type
;
145 struct threadpool
*pool
;
146 struct threadpool_group
*group
;
148 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
;
149 PTP_SIMPLE_CALLBACK finalization_callback
;
152 TP_CALLBACK_PRIORITY priority
;
153 /* information about the group, locked via .group->cs */
154 struct list group_entry
;
155 BOOL is_group_member
;
156 /* information about the pool, locked via .pool->cs */
157 struct list pool_entry
;
158 RTL_CONDITION_VARIABLE finished_event
;
159 RTL_CONDITION_VARIABLE group_finished_event
;
160 HANDLE completed_event
;
161 LONG num_pending_callbacks
;
162 LONG num_running_callbacks
;
163 LONG num_associated_callbacks
;
164 /* arguments for callback */
169 PTP_SIMPLE_CALLBACK callback
;
173 PTP_WORK_CALLBACK callback
;
177 PTP_TIMER_CALLBACK callback
;
178 /* information about the timer, locked via timerqueue.cs */
179 BOOL timer_initialized
;
181 struct list timer_entry
;
189 PTP_WAIT_CALLBACK callback
;
191 /* information about the wait object, locked via waitqueue.cs */
192 struct waitqueue_bucket
*bucket
;
194 struct list wait_entry
;
198 RTL_WAITORTIMERCALLBACKFUNC rtl_callback
;
202 PTP_IO_CALLBACK callback
;
203 /* locked via .pool->cs */
204 unsigned int pending_count
, skipped_count
, completion_count
, completion_max
;
206 struct io_completion
*completions
;
211 /* internal threadpool instance representation */
212 struct threadpool_instance
214 struct threadpool_object
*object
;
220 CRITICAL_SECTION
*critical_section
;
223 LONG semaphore_count
;
229 /* internal threadpool group representation */
230 struct threadpool_group
235 /* list of group members, locked via .cs */
239 /* global timerqueue object */
240 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
;
247 struct list pending_timers
;
248 RTL_CONDITION_VARIABLE update_event
;
252 { &timerqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
254 FALSE
, /* thread_running */
255 LIST_INIT( timerqueue
.pending_timers
), /* pending_timers */
256 RTL_CONDITION_VARIABLE_INIT
/* update_event */
259 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
=
261 0, 0, &timerqueue
.cs
,
262 { &timerqueue_debug
.ProcessLocksList
, &timerqueue_debug
.ProcessLocksList
},
263 0, 0, { (DWORD_PTR
)(__FILE__
": timerqueue.cs") }
266 /* global waitqueue object */
267 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
;
277 { &waitqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
279 LIST_INIT( waitqueue
.buckets
) /* buckets */
282 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
=
285 { &waitqueue_debug
.ProcessLocksList
, &waitqueue_debug
.ProcessLocksList
},
286 0, 0, { (DWORD_PTR
)(__FILE__
": waitqueue.cs") }
289 struct waitqueue_bucket
291 struct list bucket_entry
;
293 struct list reserved
;
299 /* global I/O completion queue object */
300 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
;
308 RTL_CONDITION_VARIABLE update_event
;
312 .cs
= { &ioqueue_debug
, -1, 0, 0, 0, 0 },
315 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
=
318 { &ioqueue_debug
.ProcessLocksList
, &ioqueue_debug
.ProcessLocksList
},
319 0, 0, { (DWORD_PTR
)(__FILE__
": ioqueue.cs") }
322 static inline struct threadpool
*impl_from_TP_POOL( TP_POOL
*pool
)
324 return (struct threadpool
*)pool
;
327 static inline struct threadpool_object
*impl_from_TP_WORK( TP_WORK
*work
)
329 struct threadpool_object
*object
= (struct threadpool_object
*)work
;
330 assert( object
->type
== TP_OBJECT_TYPE_WORK
);
334 static inline struct threadpool_object
*impl_from_TP_TIMER( TP_TIMER
*timer
)
336 struct threadpool_object
*object
= (struct threadpool_object
*)timer
;
337 assert( object
->type
== TP_OBJECT_TYPE_TIMER
);
341 static inline struct threadpool_object
*impl_from_TP_WAIT( TP_WAIT
*wait
)
343 struct threadpool_object
*object
= (struct threadpool_object
*)wait
;
344 assert( object
->type
== TP_OBJECT_TYPE_WAIT
);
348 static inline struct threadpool_object
*impl_from_TP_IO( TP_IO
*io
)
350 struct threadpool_object
*object
= (struct threadpool_object
*)io
;
351 assert( object
->type
== TP_OBJECT_TYPE_IO
);
355 static inline struct threadpool_group
*impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group
)
357 return (struct threadpool_group
*)group
;
360 static inline struct threadpool_instance
*impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE
*instance
)
362 return (struct threadpool_instance
*)instance
;
365 static void CALLBACK
threadpool_worker_proc( void *param
);
366 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
);
367 static void tp_object_execute( struct threadpool_object
*object
, BOOL wait_thread
);
368 static void tp_object_prepare_shutdown( struct threadpool_object
*object
);
369 static BOOL
tp_object_release( struct threadpool_object
*object
);
370 static struct threadpool
*default_threadpool
= NULL
;
372 static BOOL
array_reserve(void **elements
, unsigned int *capacity
, unsigned int count
, unsigned int size
)
374 unsigned int new_capacity
, max_capacity
;
377 if (count
<= *capacity
)
380 max_capacity
= ~(SIZE_T
)0 / size
;
381 if (count
> max_capacity
)
384 new_capacity
= max(4, *capacity
);
385 while (new_capacity
< count
&& new_capacity
<= max_capacity
/ 2)
387 if (new_capacity
< count
)
388 new_capacity
= max_capacity
;
390 if (!(new_elements
= RtlReAllocateHeap( GetProcessHeap(), 0, *elements
, new_capacity
* size
)))
393 *elements
= new_elements
;
394 *capacity
= new_capacity
;
399 static void set_thread_name(const WCHAR
*name
)
401 THREAD_NAME_INFORMATION info
;
403 RtlInitUnicodeString(&info
.ThreadName
, name
);
404 NtSetInformationThread(GetCurrentThread(), ThreadNameInformation
, &info
, sizeof(info
));
407 static void CALLBACK
process_rtl_work_item( TP_CALLBACK_INSTANCE
*instance
, void *userdata
)
409 struct rtl_work_item
*item
= userdata
;
411 TRACE("executing %p(%p)\n", item
->function
, item
->context
);
412 item
->function( item
->context
);
414 RtlFreeHeap( GetProcessHeap(), 0, item
);
417 /***********************************************************************
418 * RtlQueueWorkItem (NTDLL.@)
420 * Queues a work item into a thread in the thread pool.
423 * function [I] Work function to execute.
424 * context [I] Context to pass to the work function when it is executed.
425 * flags [I] Flags. See notes.
428 * Success: STATUS_SUCCESS.
429 * Failure: Any NTSTATUS code.
432 * Flags can be one or more of the following:
433 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
434 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
435 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
436 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
437 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
439 NTSTATUS WINAPI
RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function
, PVOID context
, ULONG flags
)
441 TP_CALLBACK_ENVIRON environment
;
442 struct rtl_work_item
*item
;
445 TRACE( "%p %p %u\n", function
, context
, flags
);
447 item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item
) );
449 return STATUS_NO_MEMORY
;
451 memset( &environment
, 0, sizeof(environment
) );
452 environment
.Version
= 1;
453 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
454 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
456 item
->function
= function
;
457 item
->context
= context
;
459 status
= TpSimpleTryPost( process_rtl_work_item
, item
, &environment
);
460 if (status
) RtlFreeHeap( GetProcessHeap(), 0, item
);
464 /***********************************************************************
465 * iocp_poller - get completion events and run callbacks
467 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
473 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
475 IO_STATUS_BLOCK iosb
;
476 NTSTATUS res
= NtRemoveIoCompletion( cport
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
479 ERR("NtRemoveIoCompletion failed: 0x%x\n", res
);
483 DWORD transferred
= 0;
486 if (iosb
.u
.Status
== STATUS_SUCCESS
)
487 transferred
= iosb
.Information
;
489 err
= RtlNtStatusToDosError(iosb
.u
.Status
);
491 callback( err
, transferred
, overlapped
);
497 /***********************************************************************
498 * RtlSetIoCompletionCallback (NTDLL.@)
500 * Binds a handle to a thread pool's completion port, and possibly
501 * starts a non-I/O thread to monitor this port and call functions back.
504 * FileHandle [I] Handle to bind to a completion port.
505 * Function [I] Callback function to call on I/O completions.
506 * Flags [I] Not used.
509 * Success: STATUS_SUCCESS.
510 * Failure: Any NTSTATUS code.
513 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
515 IO_STATUS_BLOCK iosb
;
516 FILE_COMPLETION_INFORMATION info
;
518 if (Flags
) FIXME("Unknown value Flags=0x%x\n", Flags
);
520 if (!old_threadpool
.compl_port
)
522 NTSTATUS res
= STATUS_SUCCESS
;
524 RtlEnterCriticalSection(&old_threadpool
.threadpool_compl_cs
);
525 if (!old_threadpool
.compl_port
)
529 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
532 /* FIXME native can start additional threads in case of e.g. hung callback function. */
533 res
= RtlQueueWorkItem( iocp_poller
, cport
, WT_EXECUTEDEFAULT
);
535 old_threadpool
.compl_port
= cport
;
540 RtlLeaveCriticalSection(&old_threadpool
.threadpool_compl_cs
);
544 info
.CompletionPort
= old_threadpool
.compl_port
;
545 info
.CompletionKey
= (ULONG_PTR
)Function
;
547 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
550 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
552 if (timeout
== INFINITE
) return NULL
;
553 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
558 /************************** Timer Queue Impl **************************/
560 static void queue_remove_timer(struct queue_timer
*t
)
562 /* We MUST hold the queue cs while calling this function. This ensures
563 that we cannot queue another callback for this timer. The runcount
564 being zero makes sure we don't have any already queued. */
565 struct timer_queue
*q
= t
->q
;
567 assert(t
->runcount
== 0);
570 list_remove(&t
->entry
);
572 NtSetEvent(t
->event
, NULL
);
573 RtlFreeHeap(GetProcessHeap(), 0, t
);
575 if (q
->quit
&& list_empty(&q
->timers
))
576 NtSetEvent(q
->event
, NULL
);
579 static void timer_cleanup_callback(struct queue_timer
*t
)
581 struct timer_queue
*q
= t
->q
;
582 RtlEnterCriticalSection(&q
->cs
);
584 assert(0 < t
->runcount
);
587 if (t
->destroy
&& t
->runcount
== 0)
588 queue_remove_timer(t
);
590 RtlLeaveCriticalSection(&q
->cs
);
593 static DWORD WINAPI
timer_callback_wrapper(LPVOID p
)
595 struct queue_timer
*t
= p
;
596 t
->callback(t
->param
, TRUE
);
597 timer_cleanup_callback(t
);
601 static inline ULONGLONG
queue_current_time(void)
603 LARGE_INTEGER now
, freq
;
604 NtQueryPerformanceCounter(&now
, &freq
);
605 return now
.QuadPart
* 1000 / freq
.QuadPart
;
608 static void queue_add_timer(struct queue_timer
*t
, ULONGLONG time
,
611 /* We MUST hold the queue cs while calling this function. */
612 struct timer_queue
*q
= t
->q
;
613 struct list
*ptr
= &q
->timers
;
615 assert(!q
->quit
|| (t
->destroy
&& time
== EXPIRE_NEVER
));
617 if (time
!= EXPIRE_NEVER
)
618 LIST_FOR_EACH(ptr
, &q
->timers
)
620 struct queue_timer
*cur
= LIST_ENTRY(ptr
, struct queue_timer
, entry
);
621 if (time
< cur
->expire
)
624 list_add_before(ptr
, &t
->entry
);
628 /* If we insert at the head of the list, we need to expire sooner
630 if (set_event
&& &t
->entry
== list_head(&q
->timers
))
631 NtSetEvent(q
->event
, NULL
);
634 static inline void queue_move_timer(struct queue_timer
*t
, ULONGLONG time
,
637 /* We MUST hold the queue cs while calling this function. */
638 list_remove(&t
->entry
);
639 queue_add_timer(t
, time
, set_event
);
642 static void queue_timer_expire(struct timer_queue
*q
)
644 struct queue_timer
*t
= NULL
;
646 RtlEnterCriticalSection(&q
->cs
);
647 if (list_head(&q
->timers
))
650 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
651 if (!t
->destroy
&& t
->expire
<= ((now
= queue_current_time())))
656 next
= t
->expire
+ t
->period
;
657 /* avoid trigger cascade if overloaded / hibernated */
659 next
= now
+ t
->period
;
663 queue_move_timer(t
, next
, FALSE
);
668 RtlLeaveCriticalSection(&q
->cs
);
672 if (t
->flags
& WT_EXECUTEINTIMERTHREAD
)
673 timer_callback_wrapper(t
);
678 & (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
679 | WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
));
680 NTSTATUS status
= RtlQueueWorkItem(timer_callback_wrapper
, t
, flags
);
681 if (status
!= STATUS_SUCCESS
)
682 timer_cleanup_callback(t
);
687 static ULONG
queue_get_timeout(struct timer_queue
*q
)
689 struct queue_timer
*t
;
690 ULONG timeout
= INFINITE
;
692 RtlEnterCriticalSection(&q
->cs
);
693 if (list_head(&q
->timers
))
695 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
696 assert(!t
->destroy
|| t
->expire
== EXPIRE_NEVER
);
698 if (t
->expire
!= EXPIRE_NEVER
)
700 ULONGLONG time
= queue_current_time();
701 timeout
= t
->expire
< time
? 0 : t
->expire
- time
;
704 RtlLeaveCriticalSection(&q
->cs
);
709 static void WINAPI
timer_queue_thread_proc(LPVOID p
)
711 struct timer_queue
*q
= p
;
714 set_thread_name(L
"wine_threadpool_timer_queue");
715 timeout_ms
= INFINITE
;
718 LARGE_INTEGER timeout
;
722 status
= NtWaitForSingleObject(
723 q
->event
, FALSE
, get_nt_timeout(&timeout
, timeout_ms
));
725 if (status
== STATUS_WAIT_0
)
727 /* There are two possible ways to trigger the event. Either
728 we are quitting and the last timer got removed, or a new
729 timer got put at the head of the list so we need to adjust
731 RtlEnterCriticalSection(&q
->cs
);
732 if (q
->quit
&& list_empty(&q
->timers
))
734 RtlLeaveCriticalSection(&q
->cs
);
736 else if (status
== STATUS_TIMEOUT
)
737 queue_timer_expire(q
);
742 timeout_ms
= queue_get_timeout(q
);
746 RtlDeleteCriticalSection(&q
->cs
);
748 RtlFreeHeap(GetProcessHeap(), 0, q
);
749 RtlExitUserThread( 0 );
752 static void queue_destroy_timer(struct queue_timer
*t
)
754 /* We MUST hold the queue cs while calling this function. */
756 if (t
->runcount
== 0)
757 /* Ensure a timer is promptly removed. If callbacks are pending,
758 it will be removed after the last one finishes by the callback
760 queue_remove_timer(t
);
762 /* Make sure no destroyed timer masks an active timer at the head
763 of the sorted list. */
764 queue_move_timer(t
, EXPIRE_NEVER
, FALSE
);
767 /***********************************************************************
768 * RtlCreateTimerQueue (NTDLL.@)
770 * Creates a timer queue object and returns a handle to it.
773 * NewTimerQueue [O] The newly created queue.
776 * Success: STATUS_SUCCESS.
777 * Failure: Any NTSTATUS code.
779 NTSTATUS WINAPI
RtlCreateTimerQueue(PHANDLE NewTimerQueue
)
782 struct timer_queue
*q
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q
);
784 return STATUS_NO_MEMORY
;
786 RtlInitializeCriticalSection(&q
->cs
);
787 list_init(&q
->timers
);
789 q
->magic
= TIMER_QUEUE_MAGIC
;
790 status
= NtCreateEvent(&q
->event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
791 if (status
!= STATUS_SUCCESS
)
793 RtlFreeHeap(GetProcessHeap(), 0, q
);
796 status
= RtlCreateUserThread(GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
797 timer_queue_thread_proc
, q
, &q
->thread
, NULL
);
798 if (status
!= STATUS_SUCCESS
)
801 RtlFreeHeap(GetProcessHeap(), 0, q
);
806 return STATUS_SUCCESS
;
809 /***********************************************************************
810 * RtlDeleteTimerQueueEx (NTDLL.@)
812 * Deletes a timer queue object.
815 * TimerQueue [I] The timer queue to destroy.
816 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
817 * wait until all timers are finished firing before
818 * returning. Otherwise, return immediately and set the
819 * event when all timers are done.
822 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
823 * Failure: Any NTSTATUS code.
825 NTSTATUS WINAPI
RtlDeleteTimerQueueEx(HANDLE TimerQueue
, HANDLE CompletionEvent
)
827 struct timer_queue
*q
= TimerQueue
;
828 struct queue_timer
*t
, *temp
;
832 if (!q
|| q
->magic
!= TIMER_QUEUE_MAGIC
)
833 return STATUS_INVALID_HANDLE
;
837 RtlEnterCriticalSection(&q
->cs
);
839 if (list_head(&q
->timers
))
840 /* When the last timer is removed, it will signal the timer thread to
842 LIST_FOR_EACH_ENTRY_SAFE(t
, temp
, &q
->timers
, struct queue_timer
, entry
)
843 queue_destroy_timer(t
);
845 /* However if we have none, we must do it ourselves. */
846 NtSetEvent(q
->event
, NULL
);
847 RtlLeaveCriticalSection(&q
->cs
);
849 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
851 NtWaitForSingleObject(thread
, FALSE
, NULL
);
852 status
= STATUS_SUCCESS
;
858 FIXME("asynchronous return on completion event unimplemented\n");
859 NtWaitForSingleObject(thread
, FALSE
, NULL
);
860 NtSetEvent(CompletionEvent
, NULL
);
862 status
= STATUS_PENDING
;
869 static struct timer_queue
*get_timer_queue(HANDLE TimerQueue
)
871 static struct timer_queue
*default_timer_queue
;
877 if (!default_timer_queue
)
880 NTSTATUS status
= RtlCreateTimerQueue(&q
);
881 if (status
== STATUS_SUCCESS
)
883 PVOID p
= InterlockedCompareExchangePointer( (void **) &default_timer_queue
, q
, NULL
);
885 /* Got beat to the punch. */
886 RtlDeleteTimerQueueEx(q
, NULL
);
889 return default_timer_queue
;
893 /***********************************************************************
894 * RtlCreateTimer (NTDLL.@)
896 * Creates a new timer associated with the given queue.
899 * TimerQueue [I] The queue to hold the timer.
900 * NewTimer [O] The newly created timer.
901 * Callback [I] The callback to fire.
902 * Parameter [I] The argument for the callback.
903 * DueTime [I] The delay, in milliseconds, before first firing the
905 * Period [I] The period, in milliseconds, at which to fire the timer
906 * after the first callback. If zero, the timer will only
907 * fire once. It still needs to be deleted with
909 * Flags [I] Flags controlling the execution of the callback. In
910 * addition to the WT_* thread pool flags (see
911 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
912 * WT_EXECUTEONLYONCE are supported.
915 * Success: STATUS_SUCCESS.
916 * Failure: Any NTSTATUS code.
918 NTSTATUS WINAPI
RtlCreateTimer(HANDLE TimerQueue
, HANDLE
*NewTimer
,
919 RTL_WAITORTIMERCALLBACKFUNC Callback
,
920 PVOID Parameter
, DWORD DueTime
, DWORD Period
,
924 struct queue_timer
*t
;
925 struct timer_queue
*q
= get_timer_queue(TimerQueue
);
927 if (!q
) return STATUS_NO_MEMORY
;
928 if (q
->magic
!= TIMER_QUEUE_MAGIC
) return STATUS_INVALID_HANDLE
;
930 t
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t
);
932 return STATUS_NO_MEMORY
;
936 t
->callback
= Callback
;
937 t
->param
= Parameter
;
943 status
= STATUS_SUCCESS
;
944 RtlEnterCriticalSection(&q
->cs
);
946 status
= STATUS_INVALID_HANDLE
;
948 queue_add_timer(t
, queue_current_time() + DueTime
, TRUE
);
949 RtlLeaveCriticalSection(&q
->cs
);
951 if (status
== STATUS_SUCCESS
)
954 RtlFreeHeap(GetProcessHeap(), 0, t
);
959 /***********************************************************************
960 * RtlUpdateTimer (NTDLL.@)
962 * Changes the time at which a timer expires.
965 * TimerQueue [I] The queue that holds the timer.
966 * Timer [I] The timer to update.
967 * DueTime [I] The delay, in milliseconds, before next firing the timer.
968 * Period [I] The period, in milliseconds, at which to fire the timer
969 * after the first callback. If zero, the timer will not
970 * refire once. It still needs to be deleted with
974 * Success: STATUS_SUCCESS.
975 * Failure: Any NTSTATUS code.
977 NTSTATUS WINAPI
RtlUpdateTimer(HANDLE TimerQueue
, HANDLE Timer
,
978 DWORD DueTime
, DWORD Period
)
980 struct queue_timer
*t
= Timer
;
981 struct timer_queue
*q
= t
->q
;
983 RtlEnterCriticalSection(&q
->cs
);
984 /* Can't change a timer if it was once-only or destroyed. */
985 if (t
->expire
!= EXPIRE_NEVER
)
988 queue_move_timer(t
, queue_current_time() + DueTime
, TRUE
);
990 RtlLeaveCriticalSection(&q
->cs
);
992 return STATUS_SUCCESS
;
995 /***********************************************************************
996 * RtlDeleteTimer (NTDLL.@)
998 * Cancels a timer-queue timer.
1001 * TimerQueue [I] The queue that holds the timer.
1002 * Timer [I] The timer to update.
1003 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1004 * wait until the timer is finished firing all pending
1005 * callbacks before returning. Otherwise, return
1006 * immediately and set the timer is done.
1009 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1010 or if the completion event is NULL.
1011 * Failure: Any NTSTATUS code.
1013 NTSTATUS WINAPI
RtlDeleteTimer(HANDLE TimerQueue
, HANDLE Timer
,
1014 HANDLE CompletionEvent
)
1016 struct queue_timer
*t
= Timer
;
1017 struct timer_queue
*q
;
1018 NTSTATUS status
= STATUS_PENDING
;
1019 HANDLE event
= NULL
;
1022 return STATUS_INVALID_PARAMETER_1
;
1024 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1026 status
= NtCreateEvent(&event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1027 if (status
== STATUS_SUCCESS
)
1028 status
= STATUS_PENDING
;
1030 else if (CompletionEvent
)
1031 event
= CompletionEvent
;
1033 RtlEnterCriticalSection(&q
->cs
);
1035 if (t
->runcount
== 0 && event
)
1036 status
= STATUS_SUCCESS
;
1037 queue_destroy_timer(t
);
1038 RtlLeaveCriticalSection(&q
->cs
);
1040 if (CompletionEvent
== INVALID_HANDLE_VALUE
&& event
)
1042 if (status
== STATUS_PENDING
)
1044 NtWaitForSingleObject(event
, FALSE
, NULL
);
1045 status
= STATUS_SUCCESS
;
1053 /***********************************************************************
1054 * timerqueue_thread_proc (internal)
1056 static void CALLBACK
timerqueue_thread_proc( void *param
)
1058 ULONGLONG timeout_lower
, timeout_upper
, new_timeout
;
1059 struct threadpool_object
*other_timer
;
1060 LARGE_INTEGER now
, timeout
;
1063 TRACE( "starting timer queue thread\n" );
1064 set_thread_name(L
"wine_threadpool_timerqueue");
1066 RtlEnterCriticalSection( &timerqueue
.cs
);
1069 NtQuerySystemTime( &now
);
1071 /* Check for expired timers. */
1072 while ((ptr
= list_head( &timerqueue
.pending_timers
)))
1074 struct threadpool_object
*timer
= LIST_ENTRY( ptr
, struct threadpool_object
, u
.timer
.timer_entry
);
1075 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1076 assert( timer
->u
.timer
.timer_pending
);
1077 if (timer
->u
.timer
.timeout
> now
.QuadPart
)
1080 /* Queue a new callback in one of the worker threads. */
1081 list_remove( &timer
->u
.timer
.timer_entry
);
1082 timer
->u
.timer
.timer_pending
= FALSE
;
1083 tp_object_submit( timer
, FALSE
);
1085 /* Insert the timer back into the queue, except it's marked for shutdown. */
1086 if (timer
->u
.timer
.period
&& !timer
->shutdown
)
1088 timer
->u
.timer
.timeout
+= (ULONGLONG
)timer
->u
.timer
.period
* 10000;
1089 if (timer
->u
.timer
.timeout
<= now
.QuadPart
)
1090 timer
->u
.timer
.timeout
= now
.QuadPart
+ 1;
1092 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1093 struct threadpool_object
, u
.timer
.timer_entry
)
1095 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1096 if (timer
->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
1099 list_add_before( &other_timer
->u
.timer
.timer_entry
, &timer
->u
.timer
.timer_entry
);
1100 timer
->u
.timer
.timer_pending
= TRUE
;
1104 timeout_lower
= timeout_upper
= MAXLONGLONG
;
1106 /* Determine next timeout and use the window length to optimize wakeup times. */
1107 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1108 struct threadpool_object
, u
.timer
.timer_entry
)
1110 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1111 if (other_timer
->u
.timer
.timeout
>= timeout_upper
)
1114 timeout_lower
= other_timer
->u
.timer
.timeout
;
1115 new_timeout
= timeout_lower
+ (ULONGLONG
)other_timer
->u
.timer
.window_length
* 10000;
1116 if (new_timeout
< timeout_upper
)
1117 timeout_upper
= new_timeout
;
1120 /* Wait for timer update events or until the next timer expires. */
1121 if (timerqueue
.objcount
)
1123 timeout
.QuadPart
= timeout_lower
;
1124 RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
, &timeout
);
1128 /* All timers have been destroyed, if no new timers are created
1129 * within some amount of time, then we can shutdown this thread. */
1130 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1131 if (RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
,
1132 &timeout
) == STATUS_TIMEOUT
&& !timerqueue
.objcount
)
1138 timerqueue
.thread_running
= FALSE
;
1139 RtlLeaveCriticalSection( &timerqueue
.cs
);
1141 TRACE( "terminating timer queue thread\n" );
1142 RtlExitUserThread( 0 );
1145 /***********************************************************************
1146 * tp_new_worker_thread (internal)
1148 * Create and account a new worker thread for the desired pool.
1150 static NTSTATUS
tp_new_worker_thread( struct threadpool
*pool
)
1155 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
1156 threadpool_worker_proc
, pool
, &thread
, NULL
);
1157 if (status
== STATUS_SUCCESS
)
1159 InterlockedIncrement( &pool
->refcount
);
1160 pool
->num_workers
++;
1166 /***********************************************************************
1167 * tp_timerqueue_lock (internal)
1169 * Acquires a lock on the global timerqueue. When the lock is acquired
1170 * successfully, it is guaranteed that the timer thread is running.
1172 static NTSTATUS
tp_timerqueue_lock( struct threadpool_object
*timer
)
1174 NTSTATUS status
= STATUS_SUCCESS
;
1175 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1177 timer
->u
.timer
.timer_initialized
= FALSE
;
1178 timer
->u
.timer
.timer_pending
= FALSE
;
1179 timer
->u
.timer
.timer_set
= FALSE
;
1180 timer
->u
.timer
.timeout
= 0;
1181 timer
->u
.timer
.period
= 0;
1182 timer
->u
.timer
.window_length
= 0;
1184 RtlEnterCriticalSection( &timerqueue
.cs
);
1186 /* Make sure that the timerqueue thread is running. */
1187 if (!timerqueue
.thread_running
)
1190 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
1191 timerqueue_thread_proc
, NULL
, &thread
, NULL
);
1192 if (status
== STATUS_SUCCESS
)
1194 timerqueue
.thread_running
= TRUE
;
1199 if (status
== STATUS_SUCCESS
)
1201 timer
->u
.timer
.timer_initialized
= TRUE
;
1202 timerqueue
.objcount
++;
1205 RtlLeaveCriticalSection( &timerqueue
.cs
);
1209 /***********************************************************************
1210 * tp_timerqueue_unlock (internal)
1212 * Releases a lock on the global timerqueue.
1214 static void tp_timerqueue_unlock( struct threadpool_object
*timer
)
1216 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1218 RtlEnterCriticalSection( &timerqueue
.cs
);
1219 if (timer
->u
.timer
.timer_initialized
)
1221 /* If timer was pending, remove it. */
1222 if (timer
->u
.timer
.timer_pending
)
1224 list_remove( &timer
->u
.timer
.timer_entry
);
1225 timer
->u
.timer
.timer_pending
= FALSE
;
1228 /* If the last timer object was destroyed, then wake up the thread. */
1229 if (!--timerqueue
.objcount
)
1231 assert( list_empty( &timerqueue
.pending_timers
) );
1232 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
1235 timer
->u
.timer
.timer_initialized
= FALSE
;
1237 RtlLeaveCriticalSection( &timerqueue
.cs
);
1240 /***********************************************************************
1241 * waitqueue_thread_proc (internal)
1243 static void CALLBACK
waitqueue_thread_proc( void *param
)
1245 struct threadpool_object
*objects
[MAXIMUM_WAITQUEUE_OBJECTS
];
1246 HANDLE handles
[MAXIMUM_WAITQUEUE_OBJECTS
+ 1];
1247 struct waitqueue_bucket
*bucket
= param
;
1248 struct threadpool_object
*wait
, *next
;
1249 LARGE_INTEGER now
, timeout
;
1253 TRACE( "starting wait queue thread\n" );
1254 set_thread_name(L
"wine_threadpool_waitqueue");
1256 RtlEnterCriticalSection( &waitqueue
.cs
);
1260 NtQuerySystemTime( &now
);
1261 timeout
.QuadPart
= MAXLONGLONG
;
1264 LIST_FOR_EACH_ENTRY_SAFE( wait
, next
, &bucket
->waiting
, struct threadpool_object
,
1267 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1268 if (wait
->u
.wait
.timeout
<= now
.QuadPart
)
1270 /* Wait object timed out. */
1271 if ((wait
->u
.wait
.flags
& WT_EXECUTEONLYONCE
))
1273 list_remove( &wait
->u
.wait
.wait_entry
);
1274 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1276 if ((wait
->u
.wait
.flags
& (WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
)))
1278 InterlockedIncrement( &wait
->refcount
);
1279 wait
->num_pending_callbacks
++;
1280 RtlEnterCriticalSection( &wait
->pool
->cs
);
1281 tp_object_execute( wait
, TRUE
);
1282 RtlLeaveCriticalSection( &wait
->pool
->cs
);
1283 tp_object_release( wait
);
1285 else tp_object_submit( wait
, FALSE
);
1289 if (wait
->u
.wait
.timeout
< timeout
.QuadPart
)
1290 timeout
.QuadPart
= wait
->u
.wait
.timeout
;
1292 assert( num_handles
< MAXIMUM_WAITQUEUE_OBJECTS
);
1293 InterlockedIncrement( &wait
->refcount
);
1294 objects
[num_handles
] = wait
;
1295 handles
[num_handles
] = wait
->u
.wait
.handle
;
1300 if (!bucket
->objcount
)
1302 /* All wait objects have been destroyed, if no new wait objects are created
1303 * within some amount of time, then we can shutdown this thread. */
1304 assert( num_handles
== 0 );
1305 RtlLeaveCriticalSection( &waitqueue
.cs
);
1306 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1307 status
= NtWaitForMultipleObjects( 1, &bucket
->update_event
, TRUE
, bucket
->alertable
, &timeout
);
1308 RtlEnterCriticalSection( &waitqueue
.cs
);
1310 if (status
== STATUS_TIMEOUT
&& !bucket
->objcount
)
1315 handles
[num_handles
] = bucket
->update_event
;
1316 RtlLeaveCriticalSection( &waitqueue
.cs
);
1317 status
= NtWaitForMultipleObjects( num_handles
+ 1, handles
, TRUE
, bucket
->alertable
, &timeout
);
1318 RtlEnterCriticalSection( &waitqueue
.cs
);
1320 if (status
>= STATUS_WAIT_0
&& status
< STATUS_WAIT_0
+ num_handles
)
1322 wait
= objects
[status
- STATUS_WAIT_0
];
1323 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1324 if (wait
->u
.wait
.bucket
)
1326 /* Wait object signaled. */
1327 assert( wait
->u
.wait
.bucket
== bucket
);
1328 if ((wait
->u
.wait
.flags
& WT_EXECUTEONLYONCE
))
1330 list_remove( &wait
->u
.wait
.wait_entry
);
1331 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1333 if ((wait
->u
.wait
.flags
& (WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
)))
1335 wait
->u
.wait
.signaled
++;
1336 wait
->num_pending_callbacks
++;
1337 RtlEnterCriticalSection( &wait
->pool
->cs
);
1338 tp_object_execute( wait
, TRUE
);
1339 RtlLeaveCriticalSection( &wait
->pool
->cs
);
1341 else tp_object_submit( wait
, TRUE
);
1344 WARN("wait object %p triggered while object was destroyed\n", wait
);
1347 /* Release temporary references to wait objects. */
1350 wait
= objects
[--num_handles
];
1351 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1352 tp_object_release( wait
);
1356 /* Try to merge bucket with other threads. */
1357 if (waitqueue
.num_buckets
> 1 && bucket
->objcount
&&
1358 bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 1 / 3)
1360 struct waitqueue_bucket
*other_bucket
;
1361 LIST_FOR_EACH_ENTRY( other_bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1363 if (other_bucket
!= bucket
&& other_bucket
->objcount
&& other_bucket
->alertable
== bucket
->alertable
&&
1364 other_bucket
->objcount
+ bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 2 / 3)
1366 other_bucket
->objcount
+= bucket
->objcount
;
1367 bucket
->objcount
= 0;
1369 /* Update reserved list. */
1370 LIST_FOR_EACH_ENTRY( wait
, &bucket
->reserved
, struct threadpool_object
, u
.wait
.wait_entry
)
1372 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1373 wait
->u
.wait
.bucket
= other_bucket
;
1375 list_move_tail( &other_bucket
->reserved
, &bucket
->reserved
);
1377 /* Update waiting list. */
1378 LIST_FOR_EACH_ENTRY( wait
, &bucket
->waiting
, struct threadpool_object
, u
.wait
.wait_entry
)
1380 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1381 wait
->u
.wait
.bucket
= other_bucket
;
1383 list_move_tail( &other_bucket
->waiting
, &bucket
->waiting
);
1385 /* Move bucket to the end, to keep the probability of
1386 * newly added wait objects as small as possible. */
1387 list_remove( &bucket
->bucket_entry
);
1388 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1390 NtSetEvent( other_bucket
->update_event
, NULL
);
1397 /* Remove this bucket from the list. */
1398 list_remove( &bucket
->bucket_entry
);
1399 if (!--waitqueue
.num_buckets
)
1400 assert( list_empty( &waitqueue
.buckets
) );
1402 RtlLeaveCriticalSection( &waitqueue
.cs
);
1404 TRACE( "terminating wait queue thread\n" );
1406 assert( bucket
->objcount
== 0 );
1407 assert( list_empty( &bucket
->reserved
) );
1408 assert( list_empty( &bucket
->waiting
) );
1409 NtClose( bucket
->update_event
);
1411 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1412 RtlExitUserThread( 0 );
1415 /***********************************************************************
1416 * tp_waitqueue_lock (internal)
1418 static NTSTATUS
tp_waitqueue_lock( struct threadpool_object
*wait
)
1420 struct waitqueue_bucket
*bucket
;
1423 BOOL alertable
= (wait
->u
.wait
.flags
& WT_EXECUTEINIOTHREAD
) != 0;
1424 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1426 wait
->u
.wait
.signaled
= 0;
1427 wait
->u
.wait
.bucket
= NULL
;
1428 wait
->u
.wait
.wait_pending
= FALSE
;
1429 wait
->u
.wait
.timeout
= 0;
1430 wait
->u
.wait
.handle
= INVALID_HANDLE_VALUE
;
1432 RtlEnterCriticalSection( &waitqueue
.cs
);
1434 /* Try to assign to existing bucket if possible. */
1435 LIST_FOR_EACH_ENTRY( bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1437 if (bucket
->objcount
< MAXIMUM_WAITQUEUE_OBJECTS
&& bucket
->alertable
== alertable
)
1439 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1440 wait
->u
.wait
.bucket
= bucket
;
1443 status
= STATUS_SUCCESS
;
1448 /* Create a new bucket and corresponding worker thread. */
1449 bucket
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket
) );
1452 status
= STATUS_NO_MEMORY
;
1456 bucket
->objcount
= 0;
1457 bucket
->alertable
= alertable
;
1458 list_init( &bucket
->reserved
);
1459 list_init( &bucket
->waiting
);
1461 status
= NtCreateEvent( &bucket
->update_event
, EVENT_ALL_ACCESS
,
1462 NULL
, SynchronizationEvent
, FALSE
);
1465 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1469 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
1470 waitqueue_thread_proc
, bucket
, &thread
, NULL
);
1471 if (status
== STATUS_SUCCESS
)
1473 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1474 waitqueue
.num_buckets
++;
1476 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1477 wait
->u
.wait
.bucket
= bucket
;
1484 NtClose( bucket
->update_event
);
1485 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1489 RtlLeaveCriticalSection( &waitqueue
.cs
);
1493 /***********************************************************************
1494 * tp_waitqueue_unlock (internal)
1496 static void tp_waitqueue_unlock( struct threadpool_object
*wait
)
1498 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1500 RtlEnterCriticalSection( &waitqueue
.cs
);
1501 if (wait
->u
.wait
.bucket
)
1503 struct waitqueue_bucket
*bucket
= wait
->u
.wait
.bucket
;
1504 assert( bucket
->objcount
> 0 );
1506 list_remove( &wait
->u
.wait
.wait_entry
);
1507 wait
->u
.wait
.bucket
= NULL
;
1510 NtSetEvent( bucket
->update_event
, NULL
);
1512 RtlLeaveCriticalSection( &waitqueue
.cs
);
1515 static void CALLBACK
ioqueue_thread_proc( void *param
)
1517 struct io_completion
*completion
;
1518 struct threadpool_object
*io
;
1519 IO_STATUS_BLOCK iosb
;
1520 ULONG_PTR key
, value
;
1524 TRACE( "starting I/O completion thread\n" );
1525 set_thread_name(L
"wine_threadpool_ioqueue");
1527 RtlEnterCriticalSection( &ioqueue
.cs
);
1531 RtlLeaveCriticalSection( &ioqueue
.cs
);
1532 if ((status
= NtRemoveIoCompletion( ioqueue
.port
, &key
, &value
, &iosb
, NULL
)))
1533 ERR("NtRemoveIoCompletion failed, status %#x.\n", status
);
1534 RtlEnterCriticalSection( &ioqueue
.cs
);
1536 destroy
= skip
= FALSE
;
1537 io
= (struct threadpool_object
*)key
;
1539 TRACE( "io %p, iosb.Status %#x.\n", io
, iosb
.u
.Status
);
1541 if (io
&& (io
->shutdown
|| io
->u
.io
.shutting_down
))
1543 RtlEnterCriticalSection( &io
->pool
->cs
);
1544 if (!io
->u
.io
.pending_count
)
1546 if (io
->u
.io
.skipped_count
)
1547 --io
->u
.io
.skipped_count
;
1549 if (io
->u
.io
.skipped_count
)
1554 RtlLeaveCriticalSection( &io
->pool
->cs
);
1561 TRACE( "Releasing io %p.\n", io
);
1562 io
->shutdown
= TRUE
;
1563 tp_object_release( io
);
1567 RtlEnterCriticalSection( &io
->pool
->cs
);
1569 TRACE( "pending_count %u.\n", io
->u
.io
.pending_count
);
1571 if (io
->u
.io
.pending_count
)
1573 --io
->u
.io
.pending_count
;
1574 if (!array_reserve((void **)&io
->u
.io
.completions
, &io
->u
.io
.completion_max
,
1575 io
->u
.io
.completion_count
+ 1, sizeof(*io
->u
.io
.completions
)))
1577 ERR( "Failed to allocate memory.\n" );
1578 RtlLeaveCriticalSection( &io
->pool
->cs
);
1582 completion
= &io
->u
.io
.completions
[io
->u
.io
.completion_count
++];
1583 completion
->iosb
= iosb
;
1584 completion
->cvalue
= value
;
1586 tp_object_submit( io
, FALSE
);
1588 RtlLeaveCriticalSection( &io
->pool
->cs
);
1591 if (!ioqueue
.objcount
)
1593 /* All I/O objects have been destroyed; if no new objects are
1594 * created within some amount of time, then we can shutdown this
1596 LARGE_INTEGER timeout
= {.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000};
1597 if (RtlSleepConditionVariableCS( &ioqueue
.update_event
, &ioqueue
.cs
,
1598 &timeout
) == STATUS_TIMEOUT
&& !ioqueue
.objcount
)
1603 ioqueue
.thread_running
= FALSE
;
1604 RtlLeaveCriticalSection( &ioqueue
.cs
);
1606 TRACE( "terminating I/O completion thread\n" );
1608 RtlExitUserThread( 0 );
1611 static NTSTATUS
tp_ioqueue_lock( struct threadpool_object
*io
, HANDLE file
)
1613 NTSTATUS status
= STATUS_SUCCESS
;
1615 assert( io
->type
== TP_OBJECT_TYPE_IO
);
1617 RtlEnterCriticalSection( &ioqueue
.cs
);
1619 if (!ioqueue
.port
&& (status
= NtCreateIoCompletion( &ioqueue
.port
,
1620 IO_COMPLETION_ALL_ACCESS
, NULL
, 0 )))
1622 RtlLeaveCriticalSection( &ioqueue
.cs
);
1626 if (!ioqueue
.thread_running
)
1630 if (!(status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
,
1631 0, 0, 0, ioqueue_thread_proc
, NULL
, &thread
, NULL
)))
1633 ioqueue
.thread_running
= TRUE
;
1638 if (status
== STATUS_SUCCESS
)
1640 FILE_COMPLETION_INFORMATION info
;
1641 IO_STATUS_BLOCK iosb
;
1643 info
.CompletionPort
= ioqueue
.port
;
1644 info
.CompletionKey
= (ULONG_PTR
)io
;
1646 status
= NtSetInformationFile( file
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
1649 if (status
== STATUS_SUCCESS
)
1651 if (!ioqueue
.objcount
++)
1652 RtlWakeConditionVariable( &ioqueue
.update_event
);
1655 RtlLeaveCriticalSection( &ioqueue
.cs
);
1659 /***********************************************************************
1660 * tp_threadpool_alloc (internal)
1662 * Allocates a new threadpool object.
1664 static NTSTATUS
tp_threadpool_alloc( struct threadpool
**out
)
1666 IMAGE_NT_HEADERS
*nt
= RtlImageNtHeader( NtCurrentTeb()->Peb
->ImageBaseAddress
);
1667 struct threadpool
*pool
;
1670 pool
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool
) );
1672 return STATUS_NO_MEMORY
;
1676 pool
->shutdown
= FALSE
;
1678 RtlInitializeCriticalSection( &pool
->cs
);
1679 pool
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool.cs");
1681 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1682 list_init( &pool
->pools
[i
] );
1683 RtlInitializeConditionVariable( &pool
->update_event
);
1685 pool
->max_workers
= 500;
1686 pool
->min_workers
= 0;
1687 pool
->num_workers
= 0;
1688 pool
->num_busy_workers
= 0;
1689 pool
->stack_info
.StackReserve
= nt
->OptionalHeader
.SizeOfStackReserve
;
1690 pool
->stack_info
.StackCommit
= nt
->OptionalHeader
.SizeOfStackCommit
;
1692 TRACE( "allocated threadpool %p\n", pool
);
1695 return STATUS_SUCCESS
;
1698 /***********************************************************************
1699 * tp_threadpool_shutdown (internal)
1701 * Prepares the shutdown of a threadpool object and notifies all worker
1702 * threads to terminate (after all remaining work items have been
1705 static void tp_threadpool_shutdown( struct threadpool
*pool
)
1707 assert( pool
!= default_threadpool
);
1709 pool
->shutdown
= TRUE
;
1710 RtlWakeAllConditionVariable( &pool
->update_event
);
1713 /***********************************************************************
1714 * tp_threadpool_release (internal)
1716 * Releases a reference to a threadpool object.
1718 static BOOL
tp_threadpool_release( struct threadpool
*pool
)
1722 if (InterlockedDecrement( &pool
->refcount
))
1725 TRACE( "destroying threadpool %p\n", pool
);
1727 assert( pool
->shutdown
);
1728 assert( !pool
->objcount
);
1729 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1730 assert( list_empty( &pool
->pools
[i
] ) );
1732 pool
->cs
.DebugInfo
->Spare
[0] = 0;
1733 RtlDeleteCriticalSection( &pool
->cs
);
1735 RtlFreeHeap( GetProcessHeap(), 0, pool
);
1739 /***********************************************************************
1740 * tp_threadpool_lock (internal)
1742 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1743 * block. When the lock is acquired successfully, it is guaranteed that
1744 * there is at least one worker thread to process tasks.
1746 static NTSTATUS
tp_threadpool_lock( struct threadpool
**out
, TP_CALLBACK_ENVIRON
*environment
)
1748 struct threadpool
*pool
= NULL
;
1749 NTSTATUS status
= STATUS_SUCCESS
;
1753 /* Validate environment parameters. */
1754 if (environment
->Version
== 3)
1756 TP_CALLBACK_ENVIRON_V3
*environment3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1758 switch (environment3
->CallbackPriority
)
1760 case TP_CALLBACK_PRIORITY_HIGH
:
1761 case TP_CALLBACK_PRIORITY_NORMAL
:
1762 case TP_CALLBACK_PRIORITY_LOW
:
1765 return STATUS_INVALID_PARAMETER
;
1769 pool
= (struct threadpool
*)environment
->Pool
;
1774 if (!default_threadpool
)
1776 status
= tp_threadpool_alloc( &pool
);
1777 if (status
!= STATUS_SUCCESS
)
1780 if (InterlockedCompareExchangePointer( (void *)&default_threadpool
, pool
, NULL
) != NULL
)
1782 tp_threadpool_shutdown( pool
);
1783 tp_threadpool_release( pool
);
1787 pool
= default_threadpool
;
1790 RtlEnterCriticalSection( &pool
->cs
);
1792 /* Make sure that the threadpool has at least one thread. */
1793 if (!pool
->num_workers
)
1794 status
= tp_new_worker_thread( pool
);
1796 /* Keep a reference, and increment objcount to ensure that the
1797 * last thread doesn't terminate. */
1798 if (status
== STATUS_SUCCESS
)
1800 InterlockedIncrement( &pool
->refcount
);
1804 RtlLeaveCriticalSection( &pool
->cs
);
1806 if (status
!= STATUS_SUCCESS
)
1810 return STATUS_SUCCESS
;
1813 /***********************************************************************
1814 * tp_threadpool_unlock (internal)
1816 * Releases a lock on a threadpool.
1818 static void tp_threadpool_unlock( struct threadpool
*pool
)
1820 RtlEnterCriticalSection( &pool
->cs
);
1822 RtlLeaveCriticalSection( &pool
->cs
);
1823 tp_threadpool_release( pool
);
1826 /***********************************************************************
1827 * tp_group_alloc (internal)
1829 * Allocates a new threadpool group object.
1831 static NTSTATUS
tp_group_alloc( struct threadpool_group
**out
)
1833 struct threadpool_group
*group
;
1835 group
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group
) );
1837 return STATUS_NO_MEMORY
;
1839 group
->refcount
= 1;
1840 group
->shutdown
= FALSE
;
1842 RtlInitializeCriticalSection( &group
->cs
);
1843 group
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool_group.cs");
1845 list_init( &group
->members
);
1847 TRACE( "allocated group %p\n", group
);
1850 return STATUS_SUCCESS
;
1853 /***********************************************************************
1854 * tp_group_shutdown (internal)
1856 * Marks the group object for shutdown.
1858 static void tp_group_shutdown( struct threadpool_group
*group
)
1860 group
->shutdown
= TRUE
;
1863 /***********************************************************************
1864 * tp_group_release (internal)
1866 * Releases a reference to a group object.
1868 static BOOL
tp_group_release( struct threadpool_group
*group
)
1870 if (InterlockedDecrement( &group
->refcount
))
1873 TRACE( "destroying group %p\n", group
);
1875 assert( group
->shutdown
);
1876 assert( list_empty( &group
->members
) );
1878 group
->cs
.DebugInfo
->Spare
[0] = 0;
1879 RtlDeleteCriticalSection( &group
->cs
);
1881 RtlFreeHeap( GetProcessHeap(), 0, group
);
1885 /***********************************************************************
1886 * tp_object_initialize (internal)
1888 * Initializes members of a threadpool object.
1890 static void tp_object_initialize( struct threadpool_object
*object
, struct threadpool
*pool
,
1891 PVOID userdata
, TP_CALLBACK_ENVIRON
*environment
)
1893 BOOL is_simple_callback
= (object
->type
== TP_OBJECT_TYPE_SIMPLE
);
1895 object
->refcount
= 1;
1896 object
->shutdown
= FALSE
;
1898 object
->pool
= pool
;
1899 object
->group
= NULL
;
1900 object
->userdata
= userdata
;
1901 object
->group_cancel_callback
= NULL
;
1902 object
->finalization_callback
= NULL
;
1903 object
->may_run_long
= 0;
1904 object
->race_dll
= NULL
;
1905 object
->priority
= TP_CALLBACK_PRIORITY_NORMAL
;
1907 memset( &object
->group_entry
, 0, sizeof(object
->group_entry
) );
1908 object
->is_group_member
= FALSE
;
1910 memset( &object
->pool_entry
, 0, sizeof(object
->pool_entry
) );
1911 RtlInitializeConditionVariable( &object
->finished_event
);
1912 RtlInitializeConditionVariable( &object
->group_finished_event
);
1913 object
->completed_event
= NULL
;
1914 object
->num_pending_callbacks
= 0;
1915 object
->num_running_callbacks
= 0;
1916 object
->num_associated_callbacks
= 0;
1920 if (environment
->Version
!= 1 && environment
->Version
!= 3)
1921 FIXME( "unsupported environment version %u\n", environment
->Version
);
1923 object
->group
= impl_from_TP_CLEANUP_GROUP( environment
->CleanupGroup
);
1924 object
->group_cancel_callback
= environment
->CleanupGroupCancelCallback
;
1925 object
->finalization_callback
= environment
->FinalizationCallback
;
1926 object
->may_run_long
= environment
->u
.s
.LongFunction
!= 0;
1927 object
->race_dll
= environment
->RaceDll
;
1928 if (environment
->Version
== 3)
1930 TP_CALLBACK_ENVIRON_V3
*environment_v3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1932 object
->priority
= environment_v3
->CallbackPriority
;
1933 assert( object
->priority
< ARRAY_SIZE(pool
->pools
) );
1936 if (environment
->ActivationContext
)
1937 FIXME( "activation context not supported yet\n" );
1939 if (environment
->u
.s
.Persistent
)
1940 FIXME( "persistent threads not supported yet\n" );
1943 if (object
->race_dll
)
1944 LdrAddRefDll( 0, object
->race_dll
);
1946 TRACE( "allocated object %p of type %u\n", object
, object
->type
);
1948 /* For simple callbacks we have to run tp_object_submit before adding this object
1949 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1950 * will be set, and tp_object_submit would fail with an assertion. */
1952 if (is_simple_callback
)
1953 tp_object_submit( object
, FALSE
);
1957 struct threadpool_group
*group
= object
->group
;
1958 InterlockedIncrement( &group
->refcount
);
1960 RtlEnterCriticalSection( &group
->cs
);
1961 list_add_tail( &group
->members
, &object
->group_entry
);
1962 object
->is_group_member
= TRUE
;
1963 RtlLeaveCriticalSection( &group
->cs
);
1966 if (is_simple_callback
)
1967 tp_object_release( object
);
1970 static void tp_object_prio_queue( struct threadpool_object
*object
)
1972 ++object
->pool
->num_busy_workers
;
1973 list_add_tail( &object
->pool
->pools
[object
->priority
], &object
->pool_entry
);
1976 /***********************************************************************
1977 * tp_object_submit (internal)
1979 * Submits a threadpool object to the associated threadpool. This
1980 * function has to be VOID because TpPostWork can never fail on Windows.
1982 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
)
1984 struct threadpool
*pool
= object
->pool
;
1985 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
1987 assert( !object
->shutdown
);
1988 assert( !pool
->shutdown
);
1990 RtlEnterCriticalSection( &pool
->cs
);
1992 /* Start new worker threads if required. */
1993 if (pool
->num_busy_workers
>= pool
->num_workers
&&
1994 pool
->num_workers
< pool
->max_workers
)
1995 status
= tp_new_worker_thread( pool
);
1997 /* Queue work item and increment refcount. */
1998 InterlockedIncrement( &object
->refcount
);
1999 if (!object
->num_pending_callbacks
++)
2000 tp_object_prio_queue( object
);
2002 /* Count how often the object was signaled. */
2003 if (object
->type
== TP_OBJECT_TYPE_WAIT
&& signaled
)
2004 object
->u
.wait
.signaled
++;
2006 /* No new thread started - wake up one existing thread. */
2007 if (status
!= STATUS_SUCCESS
)
2009 assert( pool
->num_workers
> 0 );
2010 RtlWakeConditionVariable( &pool
->update_event
);
2013 RtlLeaveCriticalSection( &pool
->cs
);
2016 /***********************************************************************
2017 * tp_object_cancel (internal)
2019 * Cancels all currently pending callbacks for a specific object.
2021 static void tp_object_cancel( struct threadpool_object
*object
)
2023 struct threadpool
*pool
= object
->pool
;
2024 LONG pending_callbacks
= 0;
2026 RtlEnterCriticalSection( &pool
->cs
);
2027 if (object
->num_pending_callbacks
)
2029 pending_callbacks
= object
->num_pending_callbacks
;
2030 object
->num_pending_callbacks
= 0;
2031 list_remove( &object
->pool_entry
);
2033 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2034 object
->u
.wait
.signaled
= 0;
2036 if (object
->type
== TP_OBJECT_TYPE_IO
)
2038 object
->u
.io
.skipped_count
+= object
->u
.io
.pending_count
;
2039 object
->u
.io
.pending_count
= 0;
2041 RtlLeaveCriticalSection( &pool
->cs
);
2043 while (pending_callbacks
--)
2044 tp_object_release( object
);
2047 static BOOL
object_is_finished( struct threadpool_object
*object
, BOOL group
)
2049 if (object
->num_pending_callbacks
)
2051 if (object
->type
== TP_OBJECT_TYPE_IO
&& object
->u
.io
.pending_count
)
2055 return !object
->num_running_callbacks
;
2057 return !object
->num_associated_callbacks
;
2060 /***********************************************************************
2061 * tp_object_wait (internal)
2063 * Waits until all pending and running callbacks of a specific object
2064 * have been processed.
2066 static void tp_object_wait( struct threadpool_object
*object
, BOOL group_wait
)
2068 struct threadpool
*pool
= object
->pool
;
2070 RtlEnterCriticalSection( &pool
->cs
);
2071 while (!object_is_finished( object
, group_wait
))
2074 RtlSleepConditionVariableCS( &object
->group_finished_event
, &pool
->cs
, NULL
);
2076 RtlSleepConditionVariableCS( &object
->finished_event
, &pool
->cs
, NULL
);
2078 RtlLeaveCriticalSection( &pool
->cs
);
2081 static void tp_ioqueue_unlock( struct threadpool_object
*io
)
2083 assert( io
->type
== TP_OBJECT_TYPE_IO
);
2085 RtlEnterCriticalSection( &ioqueue
.cs
);
2087 assert(ioqueue
.objcount
);
2089 if (!io
->shutdown
&& !--ioqueue
.objcount
)
2090 NtSetIoCompletion( ioqueue
.port
, 0, 0, STATUS_SUCCESS
, 0 );
2092 RtlLeaveCriticalSection( &ioqueue
.cs
);
2095 /***********************************************************************
2096 * tp_object_prepare_shutdown (internal)
2098 * Prepares a threadpool object for shutdown.
2100 static void tp_object_prepare_shutdown( struct threadpool_object
*object
)
2102 if (object
->type
== TP_OBJECT_TYPE_TIMER
)
2103 tp_timerqueue_unlock( object
);
2104 else if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2105 tp_waitqueue_unlock( object
);
2106 else if (object
->type
== TP_OBJECT_TYPE_IO
)
2107 tp_ioqueue_unlock( object
);
2110 /***********************************************************************
2111 * tp_object_release (internal)
2113 * Releases a reference to a threadpool object.
2115 static BOOL
tp_object_release( struct threadpool_object
*object
)
2117 if (InterlockedDecrement( &object
->refcount
))
2120 TRACE( "destroying object %p of type %u\n", object
, object
->type
);
2122 assert( object
->shutdown
);
2123 assert( !object
->num_pending_callbacks
);
2124 assert( !object
->num_running_callbacks
);
2125 assert( !object
->num_associated_callbacks
);
2127 /* release reference to the group */
2130 struct threadpool_group
*group
= object
->group
;
2132 RtlEnterCriticalSection( &group
->cs
);
2133 if (object
->is_group_member
)
2135 list_remove( &object
->group_entry
);
2136 object
->is_group_member
= FALSE
;
2138 RtlLeaveCriticalSection( &group
->cs
);
2140 tp_group_release( group
);
2143 tp_threadpool_unlock( object
->pool
);
2145 if (object
->race_dll
)
2146 LdrUnloadDll( object
->race_dll
);
2148 if (object
->completed_event
&& object
->completed_event
!= INVALID_HANDLE_VALUE
)
2149 NtSetEvent( object
->completed_event
, NULL
);
2151 RtlFreeHeap( GetProcessHeap(), 0, object
);
2155 static struct list
*threadpool_get_next_item( const struct threadpool
*pool
)
2160 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
2162 if ((ptr
= list_head( &pool
->pools
[i
] )))
2169 /***********************************************************************
2170 * tp_object_execute (internal)
2172 * Executes a threadpool object callback, object->pool->cs has to be
2175 static void tp_object_execute( struct threadpool_object
*object
, BOOL wait_thread
)
2177 TP_CALLBACK_INSTANCE
*callback_instance
;
2178 struct threadpool_instance instance
;
2179 struct io_completion completion
;
2180 struct threadpool
*pool
= object
->pool
;
2181 TP_WAIT_RESULT wait_result
= 0;
2184 object
->num_pending_callbacks
--;
2186 /* For wait objects check if they were signaled or have timed out. */
2187 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2189 wait_result
= object
->u
.wait
.signaled
? WAIT_OBJECT_0
: WAIT_TIMEOUT
;
2190 if (wait_result
== WAIT_OBJECT_0
) object
->u
.wait
.signaled
--;
2192 else if (object
->type
== TP_OBJECT_TYPE_IO
)
2194 assert( object
->u
.io
.completion_count
);
2195 completion
= object
->u
.io
.completions
[--object
->u
.io
.completion_count
];
2198 /* Leave critical section and do the actual callback. */
2199 object
->num_associated_callbacks
++;
2200 object
->num_running_callbacks
++;
2201 RtlLeaveCriticalSection( &pool
->cs
);
2202 if (wait_thread
) RtlLeaveCriticalSection( &waitqueue
.cs
);
2204 /* Initialize threadpool instance struct. */
2205 callback_instance
= (TP_CALLBACK_INSTANCE
*)&instance
;
2206 instance
.object
= object
;
2207 instance
.threadid
= GetCurrentThreadId();
2208 instance
.associated
= TRUE
;
2209 instance
.may_run_long
= object
->may_run_long
;
2210 instance
.cleanup
.critical_section
= NULL
;
2211 instance
.cleanup
.mutex
= NULL
;
2212 instance
.cleanup
.semaphore
= NULL
;
2213 instance
.cleanup
.semaphore_count
= 0;
2214 instance
.cleanup
.event
= NULL
;
2215 instance
.cleanup
.library
= NULL
;
2217 switch (object
->type
)
2219 case TP_OBJECT_TYPE_SIMPLE
:
2221 TRACE( "executing simple callback %p(%p, %p)\n",
2222 object
->u
.simple
.callback
, callback_instance
, object
->userdata
);
2223 object
->u
.simple
.callback( callback_instance
, object
->userdata
);
2224 TRACE( "callback %p returned\n", object
->u
.simple
.callback
);
2228 case TP_OBJECT_TYPE_WORK
:
2230 TRACE( "executing work callback %p(%p, %p, %p)\n",
2231 object
->u
.work
.callback
, callback_instance
, object
->userdata
, object
);
2232 object
->u
.work
.callback( callback_instance
, object
->userdata
, (TP_WORK
*)object
);
2233 TRACE( "callback %p returned\n", object
->u
.work
.callback
);
2237 case TP_OBJECT_TYPE_TIMER
:
2239 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2240 object
->u
.timer
.callback
, callback_instance
, object
->userdata
, object
);
2241 object
->u
.timer
.callback( callback_instance
, object
->userdata
, (TP_TIMER
*)object
);
2242 TRACE( "callback %p returned\n", object
->u
.timer
.callback
);
2246 case TP_OBJECT_TYPE_WAIT
:
2248 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2249 object
->u
.wait
.callback
, callback_instance
, object
->userdata
, object
, wait_result
);
2250 object
->u
.wait
.callback( callback_instance
, object
->userdata
, (TP_WAIT
*)object
, wait_result
);
2251 TRACE( "callback %p returned\n", object
->u
.wait
.callback
);
2255 case TP_OBJECT_TYPE_IO
:
2257 TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
2258 object
->u
.io
.callback
, callback_instance
, object
->userdata
,
2259 completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2260 object
->u
.io
.callback( callback_instance
, object
->userdata
,
2261 (void *)completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2262 TRACE( "callback %p returned\n", object
->u
.io
.callback
);
2271 /* Execute finalization callback. */
2272 if (object
->finalization_callback
)
2274 TRACE( "executing finalization callback %p(%p, %p)\n",
2275 object
->finalization_callback
, callback_instance
, object
->userdata
);
2276 object
->finalization_callback( callback_instance
, object
->userdata
);
2277 TRACE( "callback %p returned\n", object
->finalization_callback
);
2280 /* Execute cleanup tasks. */
2281 if (instance
.cleanup
.critical_section
)
2283 RtlLeaveCriticalSection( instance
.cleanup
.critical_section
);
2285 if (instance
.cleanup
.mutex
)
2287 status
= NtReleaseMutant( instance
.cleanup
.mutex
, NULL
);
2288 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2290 if (instance
.cleanup
.semaphore
)
2292 status
= NtReleaseSemaphore( instance
.cleanup
.semaphore
, instance
.cleanup
.semaphore_count
, NULL
);
2293 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2295 if (instance
.cleanup
.event
)
2297 status
= NtSetEvent( instance
.cleanup
.event
, NULL
);
2298 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2300 if (instance
.cleanup
.library
)
2302 LdrUnloadDll( instance
.cleanup
.library
);
2306 if (wait_thread
) RtlEnterCriticalSection( &waitqueue
.cs
);
2307 RtlEnterCriticalSection( &pool
->cs
);
2309 /* Simple callbacks are automatically shutdown after execution. */
2310 if (object
->type
== TP_OBJECT_TYPE_SIMPLE
)
2312 tp_object_prepare_shutdown( object
);
2313 object
->shutdown
= TRUE
;
2316 object
->num_running_callbacks
--;
2317 if (object_is_finished( object
, TRUE
))
2318 RtlWakeAllConditionVariable( &object
->group_finished_event
);
2320 if (instance
.associated
)
2322 object
->num_associated_callbacks
--;
2323 if (object_is_finished( object
, FALSE
))
2324 RtlWakeAllConditionVariable( &object
->finished_event
);
2328 /***********************************************************************
2329 * threadpool_worker_proc (internal)
2331 static void CALLBACK
threadpool_worker_proc( void *param
)
2333 struct threadpool
*pool
= param
;
2334 LARGE_INTEGER timeout
;
2337 TRACE( "starting worker thread for pool %p\n", pool
);
2338 set_thread_name(L
"wine_threadpool_worker");
2340 RtlEnterCriticalSection( &pool
->cs
);
2343 while ((ptr
= threadpool_get_next_item( pool
)))
2345 struct threadpool_object
*object
= LIST_ENTRY( ptr
, struct threadpool_object
, pool_entry
);
2346 assert( object
->num_pending_callbacks
> 0 );
2348 /* If further pending callbacks are queued, move the work item to
2349 * the end of the pool list. Otherwise remove it from the pool. */
2350 list_remove( &object
->pool_entry
);
2351 if (object
->num_pending_callbacks
> 1)
2352 tp_object_prio_queue( object
);
2354 tp_object_execute( object
, FALSE
);
2356 assert(pool
->num_busy_workers
);
2357 pool
->num_busy_workers
--;
2359 tp_object_release( object
);
2362 /* Shutdown worker thread if requested. */
2366 /* Wait for new tasks or until the timeout expires. A thread only terminates
2367 * when no new tasks are available, and the number of threads can be
2368 * decreased without violating the min_workers limit. An exception is when
2369 * min_workers == 0, then objcount is used to detect if the last thread
2370 * can be terminated. */
2371 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
2372 if (RtlSleepConditionVariableCS( &pool
->update_event
, &pool
->cs
, &timeout
) == STATUS_TIMEOUT
&&
2373 !threadpool_get_next_item( pool
) && (pool
->num_workers
> max( pool
->min_workers
, 1 ) ||
2374 (!pool
->min_workers
&& !pool
->objcount
)))
2379 pool
->num_workers
--;
2380 RtlLeaveCriticalSection( &pool
->cs
);
2382 TRACE( "terminating worker thread for pool %p\n", pool
);
2383 tp_threadpool_release( pool
);
2384 RtlExitUserThread( 0 );
2387 /***********************************************************************
2388 * TpAllocCleanupGroup (NTDLL.@)
2390 NTSTATUS WINAPI
TpAllocCleanupGroup( TP_CLEANUP_GROUP
**out
)
2392 TRACE( "%p\n", out
);
2394 return tp_group_alloc( (struct threadpool_group
**)out
);
2397 /***********************************************************************
2398 * TpAllocIoCompletion (NTDLL.@)
2400 NTSTATUS WINAPI
TpAllocIoCompletion( TP_IO
**out
, HANDLE file
, PTP_IO_CALLBACK callback
,
2401 void *userdata
, TP_CALLBACK_ENVIRON
*environment
)
2403 struct threadpool_object
*object
;
2404 struct threadpool
*pool
;
2407 TRACE( "%p %p %p %p %p\n", out
, file
, callback
, userdata
, environment
);
2409 if (!(object
= RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY
, sizeof(*object
) )))
2410 return STATUS_NO_MEMORY
;
2412 if ((status
= tp_threadpool_lock( &pool
, environment
)))
2414 RtlFreeHeap( GetProcessHeap(), 0, object
);
2418 object
->type
= TP_OBJECT_TYPE_IO
;
2419 object
->u
.io
.callback
= callback
;
2420 if (!(object
->u
.io
.completions
= RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object
->u
.io
.completions
) )))
2422 tp_threadpool_unlock( pool
);
2423 RtlFreeHeap( GetProcessHeap(), 0, object
);
2427 if ((status
= tp_ioqueue_lock( object
, file
)))
2429 tp_threadpool_unlock( pool
);
2430 RtlFreeHeap( GetProcessHeap(), 0, object
->u
.io
.completions
);
2431 RtlFreeHeap( GetProcessHeap(), 0, object
);
2435 tp_object_initialize( object
, pool
, userdata
, environment
);
2437 *out
= (TP_IO
*)object
;
2438 return STATUS_SUCCESS
;
2441 /***********************************************************************
2442 * TpAllocPool (NTDLL.@)
2444 NTSTATUS WINAPI
TpAllocPool( TP_POOL
**out
, PVOID reserved
)
2446 TRACE( "%p %p\n", out
, reserved
);
2449 FIXME( "reserved argument is nonzero (%p)\n", reserved
);
2451 return tp_threadpool_alloc( (struct threadpool
**)out
);
2454 /***********************************************************************
2455 * TpAllocTimer (NTDLL.@)
2457 NTSTATUS WINAPI
TpAllocTimer( TP_TIMER
**out
, PTP_TIMER_CALLBACK callback
, PVOID userdata
,
2458 TP_CALLBACK_ENVIRON
*environment
)
2460 struct threadpool_object
*object
;
2461 struct threadpool
*pool
;
2464 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2466 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2468 return STATUS_NO_MEMORY
;
2470 status
= tp_threadpool_lock( &pool
, environment
);
2473 RtlFreeHeap( GetProcessHeap(), 0, object
);
2477 object
->type
= TP_OBJECT_TYPE_TIMER
;
2478 object
->u
.timer
.callback
= callback
;
2480 status
= tp_timerqueue_lock( object
);
2483 tp_threadpool_unlock( pool
);
2484 RtlFreeHeap( GetProcessHeap(), 0, object
);
2488 tp_object_initialize( object
, pool
, userdata
, environment
);
2490 *out
= (TP_TIMER
*)object
;
2491 return STATUS_SUCCESS
;
2494 static NTSTATUS
tp_alloc_wait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2495 TP_CALLBACK_ENVIRON
*environment
, DWORD flags
)
2497 struct threadpool_object
*object
;
2498 struct threadpool
*pool
;
2501 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2503 return STATUS_NO_MEMORY
;
2505 status
= tp_threadpool_lock( &pool
, environment
);
2508 RtlFreeHeap( GetProcessHeap(), 0, object
);
2512 object
->type
= TP_OBJECT_TYPE_WAIT
;
2513 object
->u
.wait
.callback
= callback
;
2514 object
->u
.wait
.flags
= flags
;
2516 status
= tp_waitqueue_lock( object
);
2519 tp_threadpool_unlock( pool
);
2520 RtlFreeHeap( GetProcessHeap(), 0, object
);
2524 tp_object_initialize( object
, pool
, userdata
, environment
);
2526 *out
= (TP_WAIT
*)object
;
2527 return STATUS_SUCCESS
;
2530 /***********************************************************************
2531 * TpAllocWait (NTDLL.@)
2533 NTSTATUS WINAPI
TpAllocWait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2534 TP_CALLBACK_ENVIRON
*environment
)
2536 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2537 return tp_alloc_wait( out
, callback
, userdata
, environment
, WT_EXECUTEONLYONCE
);
2540 /***********************************************************************
2541 * TpAllocWork (NTDLL.@)
2543 NTSTATUS WINAPI
TpAllocWork( TP_WORK
**out
, PTP_WORK_CALLBACK callback
, PVOID userdata
,
2544 TP_CALLBACK_ENVIRON
*environment
)
2546 struct threadpool_object
*object
;
2547 struct threadpool
*pool
;
2550 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2552 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2554 return STATUS_NO_MEMORY
;
2556 status
= tp_threadpool_lock( &pool
, environment
);
2559 RtlFreeHeap( GetProcessHeap(), 0, object
);
2563 object
->type
= TP_OBJECT_TYPE_WORK
;
2564 object
->u
.work
.callback
= callback
;
2565 tp_object_initialize( object
, pool
, userdata
, environment
);
2567 *out
= (TP_WORK
*)object
;
2568 return STATUS_SUCCESS
;
2571 /***********************************************************************
2572 * TpCancelAsyncIoOperation (NTDLL.@)
2574 void WINAPI
TpCancelAsyncIoOperation( TP_IO
*io
)
2576 struct threadpool_object
*this = impl_from_TP_IO( io
);
2578 TRACE( "%p\n", io
);
2580 RtlEnterCriticalSection( &this->pool
->cs
);
2582 TRACE("pending_count %u.\n", this->u
.io
.pending_count
);
2584 this->u
.io
.pending_count
--;
2585 if (object_is_finished( this, TRUE
))
2586 RtlWakeAllConditionVariable( &this->group_finished_event
);
2587 if (object_is_finished( this, FALSE
))
2588 RtlWakeAllConditionVariable( &this->finished_event
);
2590 RtlLeaveCriticalSection( &this->pool
->cs
);
2593 /***********************************************************************
2594 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2596 VOID WINAPI
TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE
*instance
, CRITICAL_SECTION
*crit
)
2598 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2600 TRACE( "%p %p\n", instance
, crit
);
2602 if (!this->cleanup
.critical_section
)
2603 this->cleanup
.critical_section
= crit
;
2606 /***********************************************************************
2607 * TpCallbackMayRunLong (NTDLL.@)
2609 NTSTATUS WINAPI
TpCallbackMayRunLong( TP_CALLBACK_INSTANCE
*instance
)
2611 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2612 struct threadpool_object
*object
= this->object
;
2613 struct threadpool
*pool
;
2614 NTSTATUS status
= STATUS_SUCCESS
;
2616 TRACE( "%p\n", instance
);
2618 if (this->threadid
!= GetCurrentThreadId())
2620 ERR("called from wrong thread, ignoring\n");
2621 return STATUS_UNSUCCESSFUL
; /* FIXME */
2624 if (this->may_run_long
)
2625 return STATUS_SUCCESS
;
2627 pool
= object
->pool
;
2628 RtlEnterCriticalSection( &pool
->cs
);
2630 /* Start new worker threads if required. */
2631 if (pool
->num_busy_workers
>= pool
->num_workers
)
2633 if (pool
->num_workers
< pool
->max_workers
)
2635 status
= tp_new_worker_thread( pool
);
2639 status
= STATUS_TOO_MANY_THREADS
;
2643 RtlLeaveCriticalSection( &pool
->cs
);
2644 this->may_run_long
= TRUE
;
2648 /***********************************************************************
2649 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2651 VOID WINAPI
TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE mutex
)
2653 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2655 TRACE( "%p %p\n", instance
, mutex
);
2657 if (!this->cleanup
.mutex
)
2658 this->cleanup
.mutex
= mutex
;
2661 /***********************************************************************
2662 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2664 VOID WINAPI
TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE semaphore
, DWORD count
)
2666 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2668 TRACE( "%p %p %u\n", instance
, semaphore
, count
);
2670 if (!this->cleanup
.semaphore
)
2672 this->cleanup
.semaphore
= semaphore
;
2673 this->cleanup
.semaphore_count
= count
;
2677 /***********************************************************************
2678 * TpCallbackSetEventOnCompletion (NTDLL.@)
2680 VOID WINAPI
TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE event
)
2682 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2684 TRACE( "%p %p\n", instance
, event
);
2686 if (!this->cleanup
.event
)
2687 this->cleanup
.event
= event
;
2690 /***********************************************************************
2691 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2693 VOID WINAPI
TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HMODULE module
)
2695 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2697 TRACE( "%p %p\n", instance
, module
);
2699 if (!this->cleanup
.library
)
2700 this->cleanup
.library
= module
;
2703 /***********************************************************************
2704 * TpDisassociateCallback (NTDLL.@)
2706 VOID WINAPI
TpDisassociateCallback( TP_CALLBACK_INSTANCE
*instance
)
2708 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2709 struct threadpool_object
*object
= this->object
;
2710 struct threadpool
*pool
;
2712 TRACE( "%p\n", instance
);
2714 if (this->threadid
!= GetCurrentThreadId())
2716 ERR("called from wrong thread, ignoring\n");
2720 if (!this->associated
)
2723 pool
= object
->pool
;
2724 RtlEnterCriticalSection( &pool
->cs
);
2726 object
->num_associated_callbacks
--;
2727 if (object_is_finished( object
, FALSE
))
2728 RtlWakeAllConditionVariable( &object
->finished_event
);
2730 RtlLeaveCriticalSection( &pool
->cs
);
2731 this->associated
= FALSE
;
2734 /***********************************************************************
2735 * TpIsTimerSet (NTDLL.@)
2737 BOOL WINAPI
TpIsTimerSet( TP_TIMER
*timer
)
2739 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2741 TRACE( "%p\n", timer
);
2743 return this->u
.timer
.timer_set
;
2746 /***********************************************************************
2747 * TpPostWork (NTDLL.@)
2749 VOID WINAPI
TpPostWork( TP_WORK
*work
)
2751 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2753 TRACE( "%p\n", work
);
2755 tp_object_submit( this, FALSE
);
2758 /***********************************************************************
2759 * TpReleaseCleanupGroup (NTDLL.@)
2761 VOID WINAPI
TpReleaseCleanupGroup( TP_CLEANUP_GROUP
*group
)
2763 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2765 TRACE( "%p\n", group
);
2767 tp_group_shutdown( this );
2768 tp_group_release( this );
2771 /***********************************************************************
2772 * TpReleaseCleanupGroupMembers (NTDLL.@)
2774 VOID WINAPI
TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP
*group
, BOOL cancel_pending
, PVOID userdata
)
2776 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2777 struct threadpool_object
*object
, *next
;
2778 struct list members
;
2780 TRACE( "%p %u %p\n", group
, cancel_pending
, userdata
);
2782 RtlEnterCriticalSection( &this->cs
);
2784 /* Unset group, increase references, and mark objects for shutdown */
2785 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &this->members
, struct threadpool_object
, group_entry
)
2787 assert( object
->group
== this );
2788 assert( object
->is_group_member
);
2790 if (InterlockedIncrement( &object
->refcount
) == 1)
2792 /* Object is basically already destroyed, but group reference
2793 * was not deleted yet. We can safely ignore this object. */
2794 InterlockedDecrement( &object
->refcount
);
2795 list_remove( &object
->group_entry
);
2796 object
->is_group_member
= FALSE
;
2800 object
->is_group_member
= FALSE
;
2801 tp_object_prepare_shutdown( object
);
2804 /* Move members to a new temporary list */
2805 list_init( &members
);
2806 list_move_tail( &members
, &this->members
);
2808 RtlLeaveCriticalSection( &this->cs
);
2810 /* Cancel pending callbacks if requested */
2813 LIST_FOR_EACH_ENTRY( object
, &members
, struct threadpool_object
, group_entry
)
2815 tp_object_cancel( object
);
2819 /* Wait for remaining callbacks to finish */
2820 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &members
, struct threadpool_object
, group_entry
)
2822 tp_object_wait( object
, TRUE
);
2824 if (!object
->shutdown
)
2826 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2827 if (cancel_pending
&& object
->group_cancel_callback
)
2829 TRACE( "executing group cancel callback %p(%p, %p)\n",
2830 object
->group_cancel_callback
, object
->userdata
, userdata
);
2831 object
->group_cancel_callback( object
->userdata
, userdata
);
2832 TRACE( "callback %p returned\n", object
->group_cancel_callback
);
2835 if (object
->type
!= TP_OBJECT_TYPE_SIMPLE
)
2836 tp_object_release( object
);
2839 object
->shutdown
= TRUE
;
2840 tp_object_release( object
);
2844 /***********************************************************************
2845 * TpReleaseIoCompletion (NTDLL.@)
2847 void WINAPI
TpReleaseIoCompletion( TP_IO
*io
)
2849 struct threadpool_object
*this = impl_from_TP_IO( io
);
2852 TRACE( "%p\n", io
);
2854 RtlEnterCriticalSection( &this->pool
->cs
);
2855 this->u
.io
.shutting_down
= TRUE
;
2856 can_destroy
= !this->u
.io
.pending_count
&& !this->u
.io
.skipped_count
;
2857 RtlLeaveCriticalSection( &this->pool
->cs
);
2861 tp_object_prepare_shutdown( this );
2862 this->shutdown
= TRUE
;
2863 tp_object_release( this );
2867 /***********************************************************************
2868 * TpReleasePool (NTDLL.@)
2870 VOID WINAPI
TpReleasePool( TP_POOL
*pool
)
2872 struct threadpool
*this = impl_from_TP_POOL( pool
);
2874 TRACE( "%p\n", pool
);
2876 tp_threadpool_shutdown( this );
2877 tp_threadpool_release( this );
2880 /***********************************************************************
2881 * TpReleaseTimer (NTDLL.@)
2883 VOID WINAPI
TpReleaseTimer( TP_TIMER
*timer
)
2885 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2887 TRACE( "%p\n", timer
);
2889 tp_object_prepare_shutdown( this );
2890 this->shutdown
= TRUE
;
2891 tp_object_release( this );
2894 /***********************************************************************
2895 * TpReleaseWait (NTDLL.@)
2897 VOID WINAPI
TpReleaseWait( TP_WAIT
*wait
)
2899 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2901 TRACE( "%p\n", wait
);
2903 tp_object_prepare_shutdown( this );
2904 this->shutdown
= TRUE
;
2905 tp_object_release( this );
2908 /***********************************************************************
2909 * TpReleaseWork (NTDLL.@)
2911 VOID WINAPI
TpReleaseWork( TP_WORK
*work
)
2913 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2915 TRACE( "%p\n", work
);
2917 tp_object_prepare_shutdown( this );
2918 this->shutdown
= TRUE
;
2919 tp_object_release( this );
2922 /***********************************************************************
2923 * TpSetPoolMaxThreads (NTDLL.@)
2925 VOID WINAPI
TpSetPoolMaxThreads( TP_POOL
*pool
, DWORD maximum
)
2927 struct threadpool
*this = impl_from_TP_POOL( pool
);
2929 TRACE( "%p %u\n", pool
, maximum
);
2931 RtlEnterCriticalSection( &this->cs
);
2932 this->max_workers
= max( maximum
, 1 );
2933 this->min_workers
= min( this->min_workers
, this->max_workers
);
2934 RtlLeaveCriticalSection( &this->cs
);
2937 /***********************************************************************
2938 * TpSetPoolMinThreads (NTDLL.@)
2940 BOOL WINAPI
TpSetPoolMinThreads( TP_POOL
*pool
, DWORD minimum
)
2942 struct threadpool
*this = impl_from_TP_POOL( pool
);
2943 NTSTATUS status
= STATUS_SUCCESS
;
2945 TRACE( "%p %u\n", pool
, minimum
);
2947 RtlEnterCriticalSection( &this->cs
);
2949 while (this->num_workers
< minimum
)
2951 status
= tp_new_worker_thread( this );
2952 if (status
!= STATUS_SUCCESS
)
2956 if (status
== STATUS_SUCCESS
)
2958 this->min_workers
= minimum
;
2959 this->max_workers
= max( this->min_workers
, this->max_workers
);
2962 RtlLeaveCriticalSection( &this->cs
);
2966 /***********************************************************************
2967 * TpSetTimer (NTDLL.@)
2969 VOID WINAPI
TpSetTimer( TP_TIMER
*timer
, LARGE_INTEGER
*timeout
, LONG period
, LONG window_length
)
2971 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2972 struct threadpool_object
*other_timer
;
2973 BOOL submit_timer
= FALSE
;
2974 ULONGLONG timestamp
;
2976 TRACE( "%p %p %u %u\n", timer
, timeout
, period
, window_length
);
2978 RtlEnterCriticalSection( &timerqueue
.cs
);
2980 assert( this->u
.timer
.timer_initialized
);
2981 this->u
.timer
.timer_set
= timeout
!= NULL
;
2983 /* Convert relative timeout to absolute timestamp and handle a timeout
2984 * of zero, which means that the timer is submitted immediately. */
2987 timestamp
= timeout
->QuadPart
;
2988 if ((LONGLONG
)timestamp
< 0)
2991 NtQuerySystemTime( &now
);
2992 timestamp
= now
.QuadPart
- timestamp
;
2994 else if (!timestamp
)
3001 NtQuerySystemTime( &now
);
3002 timestamp
= now
.QuadPart
+ (ULONGLONG
)period
* 10000;
3004 submit_timer
= TRUE
;
3008 /* First remove existing timeout. */
3009 if (this->u
.timer
.timer_pending
)
3011 list_remove( &this->u
.timer
.timer_entry
);
3012 this->u
.timer
.timer_pending
= FALSE
;
3015 /* If the timer was enabled, then add it back to the queue. */
3018 this->u
.timer
.timeout
= timestamp
;
3019 this->u
.timer
.period
= period
;
3020 this->u
.timer
.window_length
= window_length
;
3022 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
3023 struct threadpool_object
, u
.timer
.timer_entry
)
3025 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
3026 if (this->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
3029 list_add_before( &other_timer
->u
.timer
.timer_entry
, &this->u
.timer
.timer_entry
);
3031 /* Wake up the timer thread when the timeout has to be updated. */
3032 if (list_head( &timerqueue
.pending_timers
) == &this->u
.timer
.timer_entry
)
3033 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
3035 this->u
.timer
.timer_pending
= TRUE
;
3038 RtlLeaveCriticalSection( &timerqueue
.cs
);
3041 tp_object_submit( this, FALSE
);
3044 /***********************************************************************
3045 * TpSetWait (NTDLL.@)
3047 VOID WINAPI
TpSetWait( TP_WAIT
*wait
, HANDLE handle
, LARGE_INTEGER
*timeout
)
3049 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
3050 ULONGLONG timestamp
= MAXLONGLONG
;
3052 TRACE( "%p %p %p\n", wait
, handle
, timeout
);
3054 RtlEnterCriticalSection( &waitqueue
.cs
);
3056 assert( this->u
.wait
.bucket
);
3057 this->u
.wait
.handle
= handle
;
3059 if (handle
|| this->u
.wait
.wait_pending
)
3061 struct waitqueue_bucket
*bucket
= this->u
.wait
.bucket
;
3062 list_remove( &this->u
.wait
.wait_entry
);
3064 /* Convert relative timeout to absolute timestamp. */
3065 if (handle
&& timeout
)
3067 timestamp
= timeout
->QuadPart
;
3068 if ((LONGLONG
)timestamp
< 0)
3071 NtQuerySystemTime( &now
);
3072 timestamp
= now
.QuadPart
- timestamp
;
3076 /* Add wait object back into one of the queues. */
3079 list_add_tail( &bucket
->waiting
, &this->u
.wait
.wait_entry
);
3080 this->u
.wait
.wait_pending
= TRUE
;
3081 this->u
.wait
.timeout
= timestamp
;
3085 list_add_tail( &bucket
->reserved
, &this->u
.wait
.wait_entry
);
3086 this->u
.wait
.wait_pending
= FALSE
;
3089 /* Wake up the wait queue thread. */
3090 NtSetEvent( bucket
->update_event
, NULL
);
3093 RtlLeaveCriticalSection( &waitqueue
.cs
);
3096 /***********************************************************************
3097 * TpSimpleTryPost (NTDLL.@)
3099 NTSTATUS WINAPI
TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback
, PVOID userdata
,
3100 TP_CALLBACK_ENVIRON
*environment
)
3102 struct threadpool_object
*object
;
3103 struct threadpool
*pool
;
3106 TRACE( "%p %p %p\n", callback
, userdata
, environment
);
3108 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
3110 return STATUS_NO_MEMORY
;
3112 status
= tp_threadpool_lock( &pool
, environment
);
3115 RtlFreeHeap( GetProcessHeap(), 0, object
);
3119 object
->type
= TP_OBJECT_TYPE_SIMPLE
;
3120 object
->u
.simple
.callback
= callback
;
3121 tp_object_initialize( object
, pool
, userdata
, environment
);
3123 return STATUS_SUCCESS
;
3126 /***********************************************************************
3127 * TpStartAsyncIoOperation (NTDLL.@)
3129 void WINAPI
TpStartAsyncIoOperation( TP_IO
*io
)
3131 struct threadpool_object
*this = impl_from_TP_IO( io
);
3133 TRACE( "%p\n", io
);
3135 RtlEnterCriticalSection( &this->pool
->cs
);
3137 this->u
.io
.pending_count
++;
3139 RtlLeaveCriticalSection( &this->pool
->cs
);
3142 /***********************************************************************
3143 * TpWaitForIoCompletion (NTDLL.@)
3145 void WINAPI
TpWaitForIoCompletion( TP_IO
*io
, BOOL cancel_pending
)
3147 struct threadpool_object
*this = impl_from_TP_IO( io
);
3149 TRACE( "%p %d\n", io
, cancel_pending
);
3152 tp_object_cancel( this );
3153 tp_object_wait( this, FALSE
);
3156 /***********************************************************************
3157 * TpWaitForTimer (NTDLL.@)
3159 VOID WINAPI
TpWaitForTimer( TP_TIMER
*timer
, BOOL cancel_pending
)
3161 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
3163 TRACE( "%p %d\n", timer
, cancel_pending
);
3166 tp_object_cancel( this );
3167 tp_object_wait( this, FALSE
);
3170 /***********************************************************************
3171 * TpWaitForWait (NTDLL.@)
3173 VOID WINAPI
TpWaitForWait( TP_WAIT
*wait
, BOOL cancel_pending
)
3175 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
3177 TRACE( "%p %d\n", wait
, cancel_pending
);
3180 tp_object_cancel( this );
3181 tp_object_wait( this, FALSE
);
3184 /***********************************************************************
3185 * TpWaitForWork (NTDLL.@)
3187 VOID WINAPI
TpWaitForWork( TP_WORK
*work
, BOOL cancel_pending
)
3189 struct threadpool_object
*this = impl_from_TP_WORK( work
);
3191 TRACE( "%p %u\n", work
, cancel_pending
);
3194 tp_object_cancel( this );
3195 tp_object_wait( this, FALSE
);
3198 /***********************************************************************
3199 * TpSetPoolStackInformation (NTDLL.@)
3201 NTSTATUS WINAPI
TpSetPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3203 struct threadpool
*this = impl_from_TP_POOL( pool
);
3205 TRACE( "%p %p\n", pool
, stack_info
);
3208 return STATUS_INVALID_PARAMETER
;
3210 RtlEnterCriticalSection( &this->cs
);
3211 this->stack_info
= *stack_info
;
3212 RtlLeaveCriticalSection( &this->cs
);
3214 return STATUS_SUCCESS
;
3217 /***********************************************************************
3218 * TpQueryPoolStackInformation (NTDLL.@)
3220 NTSTATUS WINAPI
TpQueryPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3222 struct threadpool
*this = impl_from_TP_POOL( pool
);
3224 TRACE( "%p %p\n", pool
, stack_info
);
3227 return STATUS_INVALID_PARAMETER
;
3229 RtlEnterCriticalSection( &this->cs
);
3230 *stack_info
= this->stack_info
;
3231 RtlLeaveCriticalSection( &this->cs
);
3233 return STATUS_SUCCESS
;
3236 static void CALLBACK
rtl_wait_callback( TP_CALLBACK_INSTANCE
*instance
, void *userdata
, TP_WAIT
*wait
, TP_WAIT_RESULT result
)
3238 struct threadpool_object
*object
= impl_from_TP_WAIT(wait
);
3239 object
->u
.wait
.rtl_callback( userdata
, result
!= STATUS_WAIT_0
);
3242 /***********************************************************************
3243 * RtlRegisterWait (NTDLL.@)
3245 * Registers a wait for a handle to become signaled.
3248 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3249 * Object [I] Object to wait to become signaled.
3250 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3251 * Context [I] Context to pass to the callback function when it is executed.
3252 * Milliseconds [I] Number of milliseconds to wait before timing out.
3253 * Flags [I] Flags. See notes.
3256 * Success: STATUS_SUCCESS.
3257 * Failure: Any NTSTATUS code.
3260 * Flags can be one or more of the following:
3261 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3262 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3263 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3264 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3265 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3267 NTSTATUS WINAPI
RtlRegisterWait( HANDLE
*out
, HANDLE handle
, RTL_WAITORTIMERCALLBACKFUNC callback
,
3268 void *context
, ULONG milliseconds
, ULONG flags
)
3270 struct threadpool_object
*object
;
3271 TP_CALLBACK_ENVIRON environment
;
3272 LARGE_INTEGER timeout
;
3276 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %u, flags %x\n",
3277 out
, handle
, callback
, context
, milliseconds
, flags
);
3279 memset( &environment
, 0, sizeof(environment
) );
3280 environment
.Version
= 1;
3281 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
3282 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
3284 flags
&= (WT_EXECUTEONLYONCE
| WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
);
3285 if ((status
= tp_alloc_wait( &wait
, rtl_wait_callback
, context
, &environment
, flags
)))
3288 object
= impl_from_TP_WAIT(wait
);
3289 object
->u
.wait
.rtl_callback
= callback
;
3291 RtlEnterCriticalSection( &waitqueue
.cs
);
3292 TpSetWait( (TP_WAIT
*)object
, handle
, get_nt_timeout( &timeout
, milliseconds
) );
3295 RtlLeaveCriticalSection( &waitqueue
.cs
);
3297 return STATUS_SUCCESS
;
3300 /***********************************************************************
3301 * RtlDeregisterWaitEx (NTDLL.@)
3303 * Cancels a wait operation and frees the resources associated with calling
3304 * RtlRegisterWait().
3307 * WaitObject [I] Handle to the wait object to free.
3310 * Success: STATUS_SUCCESS.
3311 * Failure: Any NTSTATUS code.
3313 NTSTATUS WINAPI
RtlDeregisterWaitEx( HANDLE handle
, HANDLE event
)
3315 struct threadpool_object
*object
= handle
;
3318 TRACE( "handle %p, event %p\n", handle
, event
);
3320 if (!object
) return STATUS_INVALID_HANDLE
;
3322 TpSetWait( (TP_WAIT
*)object
, NULL
, NULL
);
3324 if (event
== INVALID_HANDLE_VALUE
) TpWaitForWait( (TP_WAIT
*)object
, TRUE
);
3327 assert( object
->completed_event
== NULL
);
3328 object
->completed_event
= event
;
3331 RtlEnterCriticalSection( &object
->pool
->cs
);
3332 if (object
->num_pending_callbacks
+ object
->num_running_callbacks
3333 + object
->num_associated_callbacks
) status
= STATUS_PENDING
;
3334 else status
= STATUS_SUCCESS
;
3335 RtlLeaveCriticalSection( &object
->pool
->cs
);
3337 TpReleaseWait( (TP_WAIT
*)object
);
3341 /***********************************************************************
3342 * RtlDeregisterWait (NTDLL.@)
3344 * Cancels a wait operation and frees the resources associated with calling
3345 * RtlRegisterWait().
3348 * WaitObject [I] Handle to the wait object to free.
3351 * Success: STATUS_SUCCESS.
3352 * Failure: Any NTSTATUS code.
3354 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
3356 return RtlDeregisterWaitEx(WaitHandle
, NULL
);