4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
27 #define WIN32_NO_STATUS
30 #include "wine/debug.h"
31 #include "wine/list.h"
33 #include "ntdll_misc.h"
35 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
38 * Old thread pooling API
43 PRTL_WORK_ITEM_ROUTINE function
;
47 #define EXPIRE_NEVER (~(ULONGLONG)0)
48 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
50 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
;
55 RTL_CRITICAL_SECTION threadpool_compl_cs
;
59 NULL
, /* compl_port */
60 { &critsect_compl_debug
, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
63 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
65 0, 0, &old_threadpool
.threadpool_compl_cs
,
66 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
67 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
73 struct timer_queue
*q
;
75 ULONG runcount
; /* number of callbacks pending execution */
76 RTL_WAITORTIMERCALLBACKFUNC callback
;
81 BOOL destroy
; /* timer should be deleted; once set, never unset */
82 HANDLE event
; /* removal event */
88 RTL_CRITICAL_SECTION cs
;
89 struct list timers
; /* sorted by expiration time */
90 BOOL quit
; /* queue should be deleted; once set, never unset */
96 * Object-oriented thread pooling API
99 #define THREADPOOL_WORKER_TIMEOUT 5000
100 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
102 /* internal threadpool representation */
109 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
110 struct list pools
[3];
111 RTL_CONDITION_VARIABLE update_event
;
112 /* information about worker threads, locked via .cs */
116 int num_busy_workers
;
118 TP_POOL_STACK_INFORMATION stack_info
;
121 enum threadpool_objtype
123 TP_OBJECT_TYPE_SIMPLE
,
125 TP_OBJECT_TYPE_TIMER
,
132 IO_STATUS_BLOCK iosb
;
136 /* internal threadpool object representation */
137 struct threadpool_object
139 void *win32_callback
; /* leave space for kernelbase to store win32 callback */
142 /* read-only information */
143 enum threadpool_objtype type
;
144 struct threadpool
*pool
;
145 struct threadpool_group
*group
;
147 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
;
148 PTP_SIMPLE_CALLBACK finalization_callback
;
151 TP_CALLBACK_PRIORITY priority
;
152 /* information about the group, locked via .group->cs */
153 struct list group_entry
;
154 BOOL is_group_member
;
155 /* information about the pool, locked via .pool->cs */
156 struct list pool_entry
;
157 RTL_CONDITION_VARIABLE finished_event
;
158 RTL_CONDITION_VARIABLE group_finished_event
;
159 HANDLE completed_event
;
160 LONG num_pending_callbacks
;
161 LONG num_running_callbacks
;
162 LONG num_associated_callbacks
;
163 /* arguments for callback */
168 PTP_SIMPLE_CALLBACK callback
;
172 PTP_WORK_CALLBACK callback
;
176 PTP_TIMER_CALLBACK callback
;
177 /* information about the timer, locked via timerqueue.cs */
178 BOOL timer_initialized
;
180 struct list timer_entry
;
188 PTP_WAIT_CALLBACK callback
;
190 /* information about the wait object, locked via waitqueue.cs */
191 struct waitqueue_bucket
*bucket
;
193 struct list wait_entry
;
197 RTL_WAITORTIMERCALLBACKFUNC rtl_callback
;
201 PTP_IO_CALLBACK callback
;
202 /* locked via .pool->cs */
203 unsigned int pending_count
, skipped_count
, completion_count
, completion_max
;
205 struct io_completion
*completions
;
210 /* internal threadpool instance representation */
211 struct threadpool_instance
213 struct threadpool_object
*object
;
219 CRITICAL_SECTION
*critical_section
;
222 LONG semaphore_count
;
228 /* internal threadpool group representation */
229 struct threadpool_group
234 /* list of group members, locked via .cs */
238 /* global timerqueue object */
239 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
;
246 struct list pending_timers
;
247 RTL_CONDITION_VARIABLE update_event
;
251 { &timerqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
253 FALSE
, /* thread_running */
254 LIST_INIT( timerqueue
.pending_timers
), /* pending_timers */
255 RTL_CONDITION_VARIABLE_INIT
/* update_event */
258 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
=
260 0, 0, &timerqueue
.cs
,
261 { &timerqueue_debug
.ProcessLocksList
, &timerqueue_debug
.ProcessLocksList
},
262 0, 0, { (DWORD_PTR
)(__FILE__
": timerqueue.cs") }
265 /* global waitqueue object */
266 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
;
276 { &waitqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
278 LIST_INIT( waitqueue
.buckets
) /* buckets */
281 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
=
284 { &waitqueue_debug
.ProcessLocksList
, &waitqueue_debug
.ProcessLocksList
},
285 0, 0, { (DWORD_PTR
)(__FILE__
": waitqueue.cs") }
288 struct waitqueue_bucket
290 struct list bucket_entry
;
292 struct list reserved
;
298 /* global I/O completion queue object */
299 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
;
307 RTL_CONDITION_VARIABLE update_event
;
311 .cs
= { &ioqueue_debug
, -1, 0, 0, 0, 0 },
314 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
=
317 { &ioqueue_debug
.ProcessLocksList
, &ioqueue_debug
.ProcessLocksList
},
318 0, 0, { (DWORD_PTR
)(__FILE__
": ioqueue.cs") }
321 static inline struct threadpool
*impl_from_TP_POOL( TP_POOL
*pool
)
323 return (struct threadpool
*)pool
;
326 static inline struct threadpool_object
*impl_from_TP_WORK( TP_WORK
*work
)
328 struct threadpool_object
*object
= (struct threadpool_object
*)work
;
329 assert( object
->type
== TP_OBJECT_TYPE_WORK
);
333 static inline struct threadpool_object
*impl_from_TP_TIMER( TP_TIMER
*timer
)
335 struct threadpool_object
*object
= (struct threadpool_object
*)timer
;
336 assert( object
->type
== TP_OBJECT_TYPE_TIMER
);
340 static inline struct threadpool_object
*impl_from_TP_WAIT( TP_WAIT
*wait
)
342 struct threadpool_object
*object
= (struct threadpool_object
*)wait
;
343 assert( object
->type
== TP_OBJECT_TYPE_WAIT
);
347 static inline struct threadpool_object
*impl_from_TP_IO( TP_IO
*io
)
349 struct threadpool_object
*object
= (struct threadpool_object
*)io
;
350 assert( object
->type
== TP_OBJECT_TYPE_IO
);
354 static inline struct threadpool_group
*impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group
)
356 return (struct threadpool_group
*)group
;
359 static inline struct threadpool_instance
*impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE
*instance
)
361 return (struct threadpool_instance
*)instance
;
364 static void CALLBACK
threadpool_worker_proc( void *param
);
365 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
);
366 static void tp_object_execute( struct threadpool_object
*object
, BOOL wait_thread
);
367 static void tp_object_prepare_shutdown( struct threadpool_object
*object
);
368 static BOOL
tp_object_release( struct threadpool_object
*object
);
369 static struct threadpool
*default_threadpool
= NULL
;
371 static BOOL
array_reserve(void **elements
, unsigned int *capacity
, unsigned int count
, unsigned int size
)
373 unsigned int new_capacity
, max_capacity
;
376 if (count
<= *capacity
)
379 max_capacity
= ~(SIZE_T
)0 / size
;
380 if (count
> max_capacity
)
383 new_capacity
= max(4, *capacity
);
384 while (new_capacity
< count
&& new_capacity
<= max_capacity
/ 2)
386 if (new_capacity
< count
)
387 new_capacity
= max_capacity
;
389 if (!(new_elements
= RtlReAllocateHeap( GetProcessHeap(), 0, *elements
, new_capacity
* size
)))
392 *elements
= new_elements
;
393 *capacity
= new_capacity
;
398 static void set_thread_name(const WCHAR
*name
)
400 THREAD_NAME_INFORMATION info
;
402 RtlInitUnicodeString(&info
.ThreadName
, name
);
403 NtSetInformationThread(GetCurrentThread(), ThreadNameInformation
, &info
, sizeof(info
));
406 static void CALLBACK
process_rtl_work_item( TP_CALLBACK_INSTANCE
*instance
, void *userdata
)
408 struct rtl_work_item
*item
= userdata
;
410 TRACE("executing %p(%p)\n", item
->function
, item
->context
);
411 item
->function( item
->context
);
413 RtlFreeHeap( GetProcessHeap(), 0, item
);
416 /***********************************************************************
417 * RtlQueueWorkItem (NTDLL.@)
419 * Queues a work item into a thread in the thread pool.
422 * function [I] Work function to execute.
423 * context [I] Context to pass to the work function when it is executed.
424 * flags [I] Flags. See notes.
427 * Success: STATUS_SUCCESS.
428 * Failure: Any NTSTATUS code.
431 * Flags can be one or more of the following:
432 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
433 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
434 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
435 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
436 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
438 NTSTATUS WINAPI
RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function
, PVOID context
, ULONG flags
)
440 TP_CALLBACK_ENVIRON environment
;
441 struct rtl_work_item
*item
;
444 TRACE( "%p %p %lu\n", function
, context
, flags
);
446 item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item
) );
448 return STATUS_NO_MEMORY
;
450 memset( &environment
, 0, sizeof(environment
) );
451 environment
.Version
= 1;
452 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
453 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
455 item
->function
= function
;
456 item
->context
= context
;
458 status
= TpSimpleTryPost( process_rtl_work_item
, item
, &environment
);
459 if (status
) RtlFreeHeap( GetProcessHeap(), 0, item
);
463 /***********************************************************************
464 * iocp_poller - get completion events and run callbacks
466 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
472 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
474 IO_STATUS_BLOCK iosb
;
475 NTSTATUS res
= NtRemoveIoCompletion( cport
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
478 ERR("NtRemoveIoCompletion failed: 0x%lx\n", res
);
482 DWORD transferred
= 0;
485 if (iosb
.Status
== STATUS_SUCCESS
)
486 transferred
= iosb
.Information
;
488 err
= RtlNtStatusToDosError(iosb
.Status
);
490 callback( err
, transferred
, overlapped
);
496 /***********************************************************************
497 * RtlSetIoCompletionCallback (NTDLL.@)
499 * Binds a handle to a thread pool's completion port, and possibly
500 * starts a non-I/O thread to monitor this port and call functions back.
503 * FileHandle [I] Handle to bind to a completion port.
504 * Function [I] Callback function to call on I/O completions.
505 * Flags [I] Not used.
508 * Success: STATUS_SUCCESS.
509 * Failure: Any NTSTATUS code.
512 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
514 IO_STATUS_BLOCK iosb
;
515 FILE_COMPLETION_INFORMATION info
;
517 if (Flags
) FIXME("Unknown value Flags=0x%lx\n", Flags
);
519 if (!old_threadpool
.compl_port
)
521 NTSTATUS res
= STATUS_SUCCESS
;
523 RtlEnterCriticalSection(&old_threadpool
.threadpool_compl_cs
);
524 if (!old_threadpool
.compl_port
)
528 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
531 /* FIXME native can start additional threads in case of e.g. hung callback function. */
532 res
= RtlQueueWorkItem( iocp_poller
, cport
, WT_EXECUTEDEFAULT
);
534 old_threadpool
.compl_port
= cport
;
539 RtlLeaveCriticalSection(&old_threadpool
.threadpool_compl_cs
);
543 info
.CompletionPort
= old_threadpool
.compl_port
;
544 info
.CompletionKey
= (ULONG_PTR
)Function
;
546 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
549 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
551 if (timeout
== INFINITE
) return NULL
;
552 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
557 /************************** Timer Queue Impl **************************/
559 static void queue_remove_timer(struct queue_timer
*t
)
561 /* We MUST hold the queue cs while calling this function. This ensures
562 that we cannot queue another callback for this timer. The runcount
563 being zero makes sure we don't have any already queued. */
564 struct timer_queue
*q
= t
->q
;
566 assert(t
->runcount
== 0);
569 list_remove(&t
->entry
);
571 NtSetEvent(t
->event
, NULL
);
572 RtlFreeHeap(GetProcessHeap(), 0, t
);
574 if (q
->quit
&& list_empty(&q
->timers
))
575 NtSetEvent(q
->event
, NULL
);
578 static void timer_cleanup_callback(struct queue_timer
*t
)
580 struct timer_queue
*q
= t
->q
;
581 RtlEnterCriticalSection(&q
->cs
);
583 assert(0 < t
->runcount
);
586 if (t
->destroy
&& t
->runcount
== 0)
587 queue_remove_timer(t
);
589 RtlLeaveCriticalSection(&q
->cs
);
592 static DWORD WINAPI
timer_callback_wrapper(LPVOID p
)
594 struct queue_timer
*t
= p
;
595 t
->callback(t
->param
, TRUE
);
596 timer_cleanup_callback(t
);
600 static inline ULONGLONG
queue_current_time(void)
602 LARGE_INTEGER now
, freq
;
603 NtQueryPerformanceCounter(&now
, &freq
);
604 return now
.QuadPart
* 1000 / freq
.QuadPart
;
607 static void queue_add_timer(struct queue_timer
*t
, ULONGLONG time
,
610 /* We MUST hold the queue cs while calling this function. */
611 struct timer_queue
*q
= t
->q
;
612 struct list
*ptr
= &q
->timers
;
614 assert(!q
->quit
|| (t
->destroy
&& time
== EXPIRE_NEVER
));
616 if (time
!= EXPIRE_NEVER
)
617 LIST_FOR_EACH(ptr
, &q
->timers
)
619 struct queue_timer
*cur
= LIST_ENTRY(ptr
, struct queue_timer
, entry
);
620 if (time
< cur
->expire
)
623 list_add_before(ptr
, &t
->entry
);
627 /* If we insert at the head of the list, we need to expire sooner
629 if (set_event
&& &t
->entry
== list_head(&q
->timers
))
630 NtSetEvent(q
->event
, NULL
);
633 static inline void queue_move_timer(struct queue_timer
*t
, ULONGLONG time
,
636 /* We MUST hold the queue cs while calling this function. */
637 list_remove(&t
->entry
);
638 queue_add_timer(t
, time
, set_event
);
641 static void queue_timer_expire(struct timer_queue
*q
)
643 struct queue_timer
*t
= NULL
;
645 RtlEnterCriticalSection(&q
->cs
);
646 if (list_head(&q
->timers
))
649 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
650 if (!t
->destroy
&& t
->expire
<= ((now
= queue_current_time())))
655 next
= t
->expire
+ t
->period
;
656 /* avoid trigger cascade if overloaded / hibernated */
658 next
= now
+ t
->period
;
662 queue_move_timer(t
, next
, FALSE
);
667 RtlLeaveCriticalSection(&q
->cs
);
671 if (t
->flags
& WT_EXECUTEINTIMERTHREAD
)
672 timer_callback_wrapper(t
);
677 & (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
678 | WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
));
679 NTSTATUS status
= RtlQueueWorkItem(timer_callback_wrapper
, t
, flags
);
680 if (status
!= STATUS_SUCCESS
)
681 timer_cleanup_callback(t
);
686 static ULONG
queue_get_timeout(struct timer_queue
*q
)
688 struct queue_timer
*t
;
689 ULONG timeout
= INFINITE
;
691 RtlEnterCriticalSection(&q
->cs
);
692 if (list_head(&q
->timers
))
694 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
695 assert(!t
->destroy
|| t
->expire
== EXPIRE_NEVER
);
697 if (t
->expire
!= EXPIRE_NEVER
)
699 ULONGLONG time
= queue_current_time();
700 timeout
= t
->expire
< time
? 0 : t
->expire
- time
;
703 RtlLeaveCriticalSection(&q
->cs
);
708 static void WINAPI
timer_queue_thread_proc(LPVOID p
)
710 struct timer_queue
*q
= p
;
713 set_thread_name(L
"wine_threadpool_timer_queue");
714 timeout_ms
= INFINITE
;
717 LARGE_INTEGER timeout
;
721 status
= NtWaitForSingleObject(
722 q
->event
, FALSE
, get_nt_timeout(&timeout
, timeout_ms
));
724 if (status
== STATUS_WAIT_0
)
726 /* There are two possible ways to trigger the event. Either
727 we are quitting and the last timer got removed, or a new
728 timer got put at the head of the list so we need to adjust
730 RtlEnterCriticalSection(&q
->cs
);
731 if (q
->quit
&& list_empty(&q
->timers
))
733 RtlLeaveCriticalSection(&q
->cs
);
735 else if (status
== STATUS_TIMEOUT
)
736 queue_timer_expire(q
);
741 timeout_ms
= queue_get_timeout(q
);
745 RtlDeleteCriticalSection(&q
->cs
);
747 RtlFreeHeap(GetProcessHeap(), 0, q
);
748 RtlExitUserThread( 0 );
751 static void queue_destroy_timer(struct queue_timer
*t
)
753 /* We MUST hold the queue cs while calling this function. */
755 if (t
->runcount
== 0)
756 /* Ensure a timer is promptly removed. If callbacks are pending,
757 it will be removed after the last one finishes by the callback
759 queue_remove_timer(t
);
761 /* Make sure no destroyed timer masks an active timer at the head
762 of the sorted list. */
763 queue_move_timer(t
, EXPIRE_NEVER
, FALSE
);
766 /***********************************************************************
767 * RtlCreateTimerQueue (NTDLL.@)
769 * Creates a timer queue object and returns a handle to it.
772 * NewTimerQueue [O] The newly created queue.
775 * Success: STATUS_SUCCESS.
776 * Failure: Any NTSTATUS code.
778 NTSTATUS WINAPI
RtlCreateTimerQueue(PHANDLE NewTimerQueue
)
781 struct timer_queue
*q
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q
);
783 return STATUS_NO_MEMORY
;
785 RtlInitializeCriticalSection(&q
->cs
);
786 list_init(&q
->timers
);
788 q
->magic
= TIMER_QUEUE_MAGIC
;
789 status
= NtCreateEvent(&q
->event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
790 if (status
!= STATUS_SUCCESS
)
792 RtlFreeHeap(GetProcessHeap(), 0, q
);
795 status
= RtlCreateUserThread(GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
796 timer_queue_thread_proc
, q
, &q
->thread
, NULL
);
797 if (status
!= STATUS_SUCCESS
)
800 RtlFreeHeap(GetProcessHeap(), 0, q
);
805 return STATUS_SUCCESS
;
808 /***********************************************************************
809 * RtlDeleteTimerQueueEx (NTDLL.@)
811 * Deletes a timer queue object.
814 * TimerQueue [I] The timer queue to destroy.
815 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
816 * wait until all timers are finished firing before
817 * returning. Otherwise, return immediately and set the
818 * event when all timers are done.
821 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
822 * Failure: Any NTSTATUS code.
824 NTSTATUS WINAPI
RtlDeleteTimerQueueEx(HANDLE TimerQueue
, HANDLE CompletionEvent
)
826 struct timer_queue
*q
= TimerQueue
;
827 struct queue_timer
*t
, *temp
;
831 if (!q
|| q
->magic
!= TIMER_QUEUE_MAGIC
)
832 return STATUS_INVALID_HANDLE
;
836 RtlEnterCriticalSection(&q
->cs
);
838 if (list_head(&q
->timers
))
839 /* When the last timer is removed, it will signal the timer thread to
841 LIST_FOR_EACH_ENTRY_SAFE(t
, temp
, &q
->timers
, struct queue_timer
, entry
)
842 queue_destroy_timer(t
);
844 /* However if we have none, we must do it ourselves. */
845 NtSetEvent(q
->event
, NULL
);
846 RtlLeaveCriticalSection(&q
->cs
);
848 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
850 NtWaitForSingleObject(thread
, FALSE
, NULL
);
851 status
= STATUS_SUCCESS
;
857 FIXME("asynchronous return on completion event unimplemented\n");
858 NtWaitForSingleObject(thread
, FALSE
, NULL
);
859 NtSetEvent(CompletionEvent
, NULL
);
861 status
= STATUS_PENDING
;
868 static struct timer_queue
*get_timer_queue(HANDLE TimerQueue
)
870 static struct timer_queue
*default_timer_queue
;
876 if (!default_timer_queue
)
879 NTSTATUS status
= RtlCreateTimerQueue(&q
);
880 if (status
== STATUS_SUCCESS
)
882 PVOID p
= InterlockedCompareExchangePointer( (void **) &default_timer_queue
, q
, NULL
);
884 /* Got beat to the punch. */
885 RtlDeleteTimerQueueEx(q
, NULL
);
888 return default_timer_queue
;
892 /***********************************************************************
893 * RtlCreateTimer (NTDLL.@)
895 * Creates a new timer associated with the given queue.
898 * TimerQueue [I] The queue to hold the timer.
899 * NewTimer [O] The newly created timer.
900 * Callback [I] The callback to fire.
901 * Parameter [I] The argument for the callback.
902 * DueTime [I] The delay, in milliseconds, before first firing the
904 * Period [I] The period, in milliseconds, at which to fire the timer
905 * after the first callback. If zero, the timer will only
906 * fire once. It still needs to be deleted with
908 * Flags [I] Flags controlling the execution of the callback. In
909 * addition to the WT_* thread pool flags (see
910 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
911 * WT_EXECUTEONLYONCE are supported.
914 * Success: STATUS_SUCCESS.
915 * Failure: Any NTSTATUS code.
917 NTSTATUS WINAPI
RtlCreateTimer(HANDLE TimerQueue
, HANDLE
*NewTimer
,
918 RTL_WAITORTIMERCALLBACKFUNC Callback
,
919 PVOID Parameter
, DWORD DueTime
, DWORD Period
,
923 struct queue_timer
*t
;
924 struct timer_queue
*q
= get_timer_queue(TimerQueue
);
926 if (!q
) return STATUS_NO_MEMORY
;
927 if (q
->magic
!= TIMER_QUEUE_MAGIC
) return STATUS_INVALID_HANDLE
;
929 t
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t
);
931 return STATUS_NO_MEMORY
;
935 t
->callback
= Callback
;
936 t
->param
= Parameter
;
942 status
= STATUS_SUCCESS
;
943 RtlEnterCriticalSection(&q
->cs
);
945 status
= STATUS_INVALID_HANDLE
;
947 queue_add_timer(t
, queue_current_time() + DueTime
, TRUE
);
948 RtlLeaveCriticalSection(&q
->cs
);
950 if (status
== STATUS_SUCCESS
)
953 RtlFreeHeap(GetProcessHeap(), 0, t
);
958 /***********************************************************************
959 * RtlUpdateTimer (NTDLL.@)
961 * Changes the time at which a timer expires.
964 * TimerQueue [I] The queue that holds the timer.
965 * Timer [I] The timer to update.
966 * DueTime [I] The delay, in milliseconds, before next firing the timer.
967 * Period [I] The period, in milliseconds, at which to fire the timer
968 * after the first callback. If zero, the timer will not
969 * refire once. It still needs to be deleted with
973 * Success: STATUS_SUCCESS.
974 * Failure: Any NTSTATUS code.
976 NTSTATUS WINAPI
RtlUpdateTimer(HANDLE TimerQueue
, HANDLE Timer
,
977 DWORD DueTime
, DWORD Period
)
979 struct queue_timer
*t
= Timer
;
980 struct timer_queue
*q
= t
->q
;
982 RtlEnterCriticalSection(&q
->cs
);
983 /* Can't change a timer if it was once-only or destroyed. */
984 if (t
->expire
!= EXPIRE_NEVER
)
987 queue_move_timer(t
, queue_current_time() + DueTime
, TRUE
);
989 RtlLeaveCriticalSection(&q
->cs
);
991 return STATUS_SUCCESS
;
994 /***********************************************************************
995 * RtlDeleteTimer (NTDLL.@)
997 * Cancels a timer-queue timer.
1000 * TimerQueue [I] The queue that holds the timer.
1001 * Timer [I] The timer to update.
1002 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1003 * wait until the timer is finished firing all pending
1004 * callbacks before returning. Otherwise, return
1005 * immediately and set the timer is done.
1008 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1009 or if the completion event is NULL.
1010 * Failure: Any NTSTATUS code.
1012 NTSTATUS WINAPI
RtlDeleteTimer(HANDLE TimerQueue
, HANDLE Timer
,
1013 HANDLE CompletionEvent
)
1015 struct queue_timer
*t
= Timer
;
1016 struct timer_queue
*q
;
1017 NTSTATUS status
= STATUS_PENDING
;
1018 HANDLE event
= NULL
;
1021 return STATUS_INVALID_PARAMETER_1
;
1023 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1025 status
= NtCreateEvent(&event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1026 if (status
== STATUS_SUCCESS
)
1027 status
= STATUS_PENDING
;
1029 else if (CompletionEvent
)
1030 event
= CompletionEvent
;
1032 RtlEnterCriticalSection(&q
->cs
);
1034 if (t
->runcount
== 0 && event
)
1035 status
= STATUS_SUCCESS
;
1036 queue_destroy_timer(t
);
1037 RtlLeaveCriticalSection(&q
->cs
);
1039 if (CompletionEvent
== INVALID_HANDLE_VALUE
&& event
)
1041 if (status
== STATUS_PENDING
)
1043 NtWaitForSingleObject(event
, FALSE
, NULL
);
1044 status
= STATUS_SUCCESS
;
1052 /***********************************************************************
1053 * timerqueue_thread_proc (internal)
1055 static void CALLBACK
timerqueue_thread_proc( void *param
)
1057 ULONGLONG timeout_lower
, timeout_upper
, new_timeout
;
1058 struct threadpool_object
*other_timer
;
1059 LARGE_INTEGER now
, timeout
;
1062 TRACE( "starting timer queue thread\n" );
1063 set_thread_name(L
"wine_threadpool_timerqueue");
1065 RtlEnterCriticalSection( &timerqueue
.cs
);
1068 NtQuerySystemTime( &now
);
1070 /* Check for expired timers. */
1071 while ((ptr
= list_head( &timerqueue
.pending_timers
)))
1073 struct threadpool_object
*timer
= LIST_ENTRY( ptr
, struct threadpool_object
, u
.timer
.timer_entry
);
1074 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1075 assert( timer
->u
.timer
.timer_pending
);
1076 if (timer
->u
.timer
.timeout
> now
.QuadPart
)
1079 /* Queue a new callback in one of the worker threads. */
1080 list_remove( &timer
->u
.timer
.timer_entry
);
1081 timer
->u
.timer
.timer_pending
= FALSE
;
1082 tp_object_submit( timer
, FALSE
);
1084 /* Insert the timer back into the queue, except it's marked for shutdown. */
1085 if (timer
->u
.timer
.period
&& !timer
->shutdown
)
1087 timer
->u
.timer
.timeout
+= (ULONGLONG
)timer
->u
.timer
.period
* 10000;
1088 if (timer
->u
.timer
.timeout
<= now
.QuadPart
)
1089 timer
->u
.timer
.timeout
= now
.QuadPart
+ 1;
1091 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1092 struct threadpool_object
, u
.timer
.timer_entry
)
1094 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1095 if (timer
->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
1098 list_add_before( &other_timer
->u
.timer
.timer_entry
, &timer
->u
.timer
.timer_entry
);
1099 timer
->u
.timer
.timer_pending
= TRUE
;
1103 timeout_lower
= timeout_upper
= MAXLONGLONG
;
1105 /* Determine next timeout and use the window length to optimize wakeup times. */
1106 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1107 struct threadpool_object
, u
.timer
.timer_entry
)
1109 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1110 if (other_timer
->u
.timer
.timeout
>= timeout_upper
)
1113 timeout_lower
= other_timer
->u
.timer
.timeout
;
1114 new_timeout
= timeout_lower
+ (ULONGLONG
)other_timer
->u
.timer
.window_length
* 10000;
1115 if (new_timeout
< timeout_upper
)
1116 timeout_upper
= new_timeout
;
1119 /* Wait for timer update events or until the next timer expires. */
1120 if (timerqueue
.objcount
)
1122 timeout
.QuadPart
= timeout_lower
;
1123 RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
, &timeout
);
1127 /* All timers have been destroyed, if no new timers are created
1128 * within some amount of time, then we can shutdown this thread. */
1129 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1130 if (RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
,
1131 &timeout
) == STATUS_TIMEOUT
&& !timerqueue
.objcount
)
1137 timerqueue
.thread_running
= FALSE
;
1138 RtlLeaveCriticalSection( &timerqueue
.cs
);
1140 TRACE( "terminating timer queue thread\n" );
1141 RtlExitUserThread( 0 );
1144 /***********************************************************************
1145 * tp_new_worker_thread (internal)
1147 * Create and account a new worker thread for the desired pool.
1149 static NTSTATUS
tp_new_worker_thread( struct threadpool
*pool
)
1154 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0,
1155 pool
->stack_info
.StackReserve
, pool
->stack_info
.StackCommit
,
1156 threadpool_worker_proc
, pool
, &thread
, NULL
);
1157 if (status
== STATUS_SUCCESS
)
1159 InterlockedIncrement( &pool
->refcount
);
1160 pool
->num_workers
++;
1166 /***********************************************************************
1167 * tp_timerqueue_lock (internal)
1169 * Acquires a lock on the global timerqueue. When the lock is acquired
1170 * successfully, it is guaranteed that the timer thread is running.
1172 static NTSTATUS
tp_timerqueue_lock( struct threadpool_object
*timer
)
1174 NTSTATUS status
= STATUS_SUCCESS
;
1175 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1177 timer
->u
.timer
.timer_initialized
= FALSE
;
1178 timer
->u
.timer
.timer_pending
= FALSE
;
1179 timer
->u
.timer
.timer_set
= FALSE
;
1180 timer
->u
.timer
.timeout
= 0;
1181 timer
->u
.timer
.period
= 0;
1182 timer
->u
.timer
.window_length
= 0;
1184 RtlEnterCriticalSection( &timerqueue
.cs
);
1186 /* Make sure that the timerqueue thread is running. */
1187 if (!timerqueue
.thread_running
)
1190 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
1191 timerqueue_thread_proc
, NULL
, &thread
, NULL
);
1192 if (status
== STATUS_SUCCESS
)
1194 timerqueue
.thread_running
= TRUE
;
1199 if (status
== STATUS_SUCCESS
)
1201 timer
->u
.timer
.timer_initialized
= TRUE
;
1202 timerqueue
.objcount
++;
1205 RtlLeaveCriticalSection( &timerqueue
.cs
);
1209 /***********************************************************************
1210 * tp_timerqueue_unlock (internal)
1212 * Releases a lock on the global timerqueue.
1214 static void tp_timerqueue_unlock( struct threadpool_object
*timer
)
1216 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1218 RtlEnterCriticalSection( &timerqueue
.cs
);
1219 if (timer
->u
.timer
.timer_initialized
)
1221 /* If timer was pending, remove it. */
1222 if (timer
->u
.timer
.timer_pending
)
1224 list_remove( &timer
->u
.timer
.timer_entry
);
1225 timer
->u
.timer
.timer_pending
= FALSE
;
1228 /* If the last timer object was destroyed, then wake up the thread. */
1229 if (!--timerqueue
.objcount
)
1231 assert( list_empty( &timerqueue
.pending_timers
) );
1232 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
1235 timer
->u
.timer
.timer_initialized
= FALSE
;
1237 RtlLeaveCriticalSection( &timerqueue
.cs
);
1240 /***********************************************************************
1241 * waitqueue_thread_proc (internal)
1243 static void CALLBACK
waitqueue_thread_proc( void *param
)
1245 struct threadpool_object
*objects
[MAXIMUM_WAITQUEUE_OBJECTS
];
1246 HANDLE handles
[MAXIMUM_WAITQUEUE_OBJECTS
+ 1];
1247 struct waitqueue_bucket
*bucket
= param
;
1248 struct threadpool_object
*wait
, *next
;
1249 LARGE_INTEGER now
, timeout
;
1253 TRACE( "starting wait queue thread\n" );
1254 set_thread_name(L
"wine_threadpool_waitqueue");
1256 RtlEnterCriticalSection( &waitqueue
.cs
);
1260 NtQuerySystemTime( &now
);
1261 timeout
.QuadPart
= MAXLONGLONG
;
1264 LIST_FOR_EACH_ENTRY_SAFE( wait
, next
, &bucket
->waiting
, struct threadpool_object
,
1267 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1268 if (wait
->u
.wait
.timeout
<= now
.QuadPart
)
1270 /* Wait object timed out. */
1271 if ((wait
->u
.wait
.flags
& WT_EXECUTEONLYONCE
))
1273 list_remove( &wait
->u
.wait
.wait_entry
);
1274 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1276 if ((wait
->u
.wait
.flags
& (WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
)))
1278 InterlockedIncrement( &wait
->refcount
);
1279 wait
->num_pending_callbacks
++;
1280 RtlEnterCriticalSection( &wait
->pool
->cs
);
1281 tp_object_execute( wait
, TRUE
);
1282 RtlLeaveCriticalSection( &wait
->pool
->cs
);
1283 tp_object_release( wait
);
1285 else tp_object_submit( wait
, FALSE
);
1289 if (wait
->u
.wait
.timeout
< timeout
.QuadPart
)
1290 timeout
.QuadPart
= wait
->u
.wait
.timeout
;
1292 assert( num_handles
< MAXIMUM_WAITQUEUE_OBJECTS
);
1293 InterlockedIncrement( &wait
->refcount
);
1294 objects
[num_handles
] = wait
;
1295 handles
[num_handles
] = wait
->u
.wait
.handle
;
1300 if (!bucket
->objcount
)
1302 /* All wait objects have been destroyed, if no new wait objects are created
1303 * within some amount of time, then we can shutdown this thread. */
1304 assert( num_handles
== 0 );
1305 RtlLeaveCriticalSection( &waitqueue
.cs
);
1306 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1307 status
= NtWaitForMultipleObjects( 1, &bucket
->update_event
, TRUE
, bucket
->alertable
, &timeout
);
1308 RtlEnterCriticalSection( &waitqueue
.cs
);
1310 if (status
== STATUS_TIMEOUT
&& !bucket
->objcount
)
1315 handles
[num_handles
] = bucket
->update_event
;
1316 RtlLeaveCriticalSection( &waitqueue
.cs
);
1317 status
= NtWaitForMultipleObjects( num_handles
+ 1, handles
, TRUE
, bucket
->alertable
, &timeout
);
1318 RtlEnterCriticalSection( &waitqueue
.cs
);
1320 if (status
>= STATUS_WAIT_0
&& status
< STATUS_WAIT_0
+ num_handles
)
1322 wait
= objects
[status
- STATUS_WAIT_0
];
1323 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1324 if (wait
->u
.wait
.bucket
)
1326 /* Wait object signaled. */
1327 assert( wait
->u
.wait
.bucket
== bucket
);
1328 if ((wait
->u
.wait
.flags
& WT_EXECUTEONLYONCE
))
1330 list_remove( &wait
->u
.wait
.wait_entry
);
1331 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1333 if ((wait
->u
.wait
.flags
& (WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
)))
1335 wait
->u
.wait
.signaled
++;
1336 wait
->num_pending_callbacks
++;
1337 RtlEnterCriticalSection( &wait
->pool
->cs
);
1338 tp_object_execute( wait
, TRUE
);
1339 RtlLeaveCriticalSection( &wait
->pool
->cs
);
1341 else tp_object_submit( wait
, TRUE
);
1344 WARN("wait object %p triggered while object was destroyed\n", wait
);
1347 /* Release temporary references to wait objects. */
1350 wait
= objects
[--num_handles
];
1351 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1352 tp_object_release( wait
);
1356 /* Try to merge bucket with other threads. */
1357 if (waitqueue
.num_buckets
> 1 && bucket
->objcount
&&
1358 bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 1 / 3)
1360 struct waitqueue_bucket
*other_bucket
;
1361 LIST_FOR_EACH_ENTRY( other_bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1363 if (other_bucket
!= bucket
&& other_bucket
->objcount
&& other_bucket
->alertable
== bucket
->alertable
&&
1364 other_bucket
->objcount
+ bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 2 / 3)
1366 other_bucket
->objcount
+= bucket
->objcount
;
1367 bucket
->objcount
= 0;
1369 /* Update reserved list. */
1370 LIST_FOR_EACH_ENTRY( wait
, &bucket
->reserved
, struct threadpool_object
, u
.wait
.wait_entry
)
1372 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1373 wait
->u
.wait
.bucket
= other_bucket
;
1375 list_move_tail( &other_bucket
->reserved
, &bucket
->reserved
);
1377 /* Update waiting list. */
1378 LIST_FOR_EACH_ENTRY( wait
, &bucket
->waiting
, struct threadpool_object
, u
.wait
.wait_entry
)
1380 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1381 wait
->u
.wait
.bucket
= other_bucket
;
1383 list_move_tail( &other_bucket
->waiting
, &bucket
->waiting
);
1385 /* Move bucket to the end, to keep the probability of
1386 * newly added wait objects as small as possible. */
1387 list_remove( &bucket
->bucket_entry
);
1388 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1390 NtSetEvent( other_bucket
->update_event
, NULL
);
1397 /* Remove this bucket from the list. */
1398 list_remove( &bucket
->bucket_entry
);
1399 if (!--waitqueue
.num_buckets
)
1400 assert( list_empty( &waitqueue
.buckets
) );
1402 RtlLeaveCriticalSection( &waitqueue
.cs
);
1404 TRACE( "terminating wait queue thread\n" );
1406 assert( bucket
->objcount
== 0 );
1407 assert( list_empty( &bucket
->reserved
) );
1408 assert( list_empty( &bucket
->waiting
) );
1409 NtClose( bucket
->update_event
);
1411 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1412 RtlExitUserThread( 0 );
1415 /***********************************************************************
1416 * tp_waitqueue_lock (internal)
1418 static NTSTATUS
tp_waitqueue_lock( struct threadpool_object
*wait
)
1420 struct waitqueue_bucket
*bucket
;
1423 BOOL alertable
= (wait
->u
.wait
.flags
& WT_EXECUTEINIOTHREAD
) != 0;
1424 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1426 wait
->u
.wait
.signaled
= 0;
1427 wait
->u
.wait
.bucket
= NULL
;
1428 wait
->u
.wait
.wait_pending
= FALSE
;
1429 wait
->u
.wait
.timeout
= 0;
1430 wait
->u
.wait
.handle
= INVALID_HANDLE_VALUE
;
1432 RtlEnterCriticalSection( &waitqueue
.cs
);
1434 /* Try to assign to existing bucket if possible. */
1435 LIST_FOR_EACH_ENTRY( bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1437 if (bucket
->objcount
< MAXIMUM_WAITQUEUE_OBJECTS
&& bucket
->alertable
== alertable
)
1439 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1440 wait
->u
.wait
.bucket
= bucket
;
1443 status
= STATUS_SUCCESS
;
1448 /* Create a new bucket and corresponding worker thread. */
1449 bucket
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket
) );
1452 status
= STATUS_NO_MEMORY
;
1456 bucket
->objcount
= 0;
1457 bucket
->alertable
= alertable
;
1458 list_init( &bucket
->reserved
);
1459 list_init( &bucket
->waiting
);
1461 status
= NtCreateEvent( &bucket
->update_event
, EVENT_ALL_ACCESS
,
1462 NULL
, SynchronizationEvent
, FALSE
);
1465 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1469 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, 0, 0, 0,
1470 waitqueue_thread_proc
, bucket
, &thread
, NULL
);
1471 if (status
== STATUS_SUCCESS
)
1473 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1474 waitqueue
.num_buckets
++;
1476 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1477 wait
->u
.wait
.bucket
= bucket
;
1484 NtClose( bucket
->update_event
);
1485 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1489 RtlLeaveCriticalSection( &waitqueue
.cs
);
1493 /***********************************************************************
1494 * tp_waitqueue_unlock (internal)
1496 static void tp_waitqueue_unlock( struct threadpool_object
*wait
)
1498 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1500 RtlEnterCriticalSection( &waitqueue
.cs
);
1501 if (wait
->u
.wait
.bucket
)
1503 struct waitqueue_bucket
*bucket
= wait
->u
.wait
.bucket
;
1504 assert( bucket
->objcount
> 0 );
1506 list_remove( &wait
->u
.wait
.wait_entry
);
1507 wait
->u
.wait
.bucket
= NULL
;
1510 NtSetEvent( bucket
->update_event
, NULL
);
1512 RtlLeaveCriticalSection( &waitqueue
.cs
);
1515 static void CALLBACK
ioqueue_thread_proc( void *param
)
1517 struct io_completion
*completion
;
1518 struct threadpool_object
*io
;
1519 IO_STATUS_BLOCK iosb
;
1520 ULONG_PTR key
, value
;
1524 TRACE( "starting I/O completion thread\n" );
1525 set_thread_name(L
"wine_threadpool_ioqueue");
1527 RtlEnterCriticalSection( &ioqueue
.cs
);
1531 RtlLeaveCriticalSection( &ioqueue
.cs
);
1532 if ((status
= NtRemoveIoCompletion( ioqueue
.port
, &key
, &value
, &iosb
, NULL
)))
1533 ERR("NtRemoveIoCompletion failed, status %#lx.\n", status
);
1534 RtlEnterCriticalSection( &ioqueue
.cs
);
1536 destroy
= skip
= FALSE
;
1537 io
= (struct threadpool_object
*)key
;
1539 TRACE( "io %p, iosb.Status %#lx.\n", io
, iosb
.Status
);
1541 if (io
&& (io
->shutdown
|| io
->u
.io
.shutting_down
))
1543 RtlEnterCriticalSection( &io
->pool
->cs
);
1544 if (!io
->u
.io
.pending_count
)
1546 if (io
->u
.io
.skipped_count
)
1547 --io
->u
.io
.skipped_count
;
1549 if (io
->u
.io
.skipped_count
)
1554 RtlLeaveCriticalSection( &io
->pool
->cs
);
1561 TRACE( "Releasing io %p.\n", io
);
1562 io
->shutdown
= TRUE
;
1563 tp_object_release( io
);
1567 RtlEnterCriticalSection( &io
->pool
->cs
);
1569 TRACE( "pending_count %u.\n", io
->u
.io
.pending_count
);
1571 if (io
->u
.io
.pending_count
)
1573 --io
->u
.io
.pending_count
;
1574 if (!array_reserve((void **)&io
->u
.io
.completions
, &io
->u
.io
.completion_max
,
1575 io
->u
.io
.completion_count
+ 1, sizeof(*io
->u
.io
.completions
)))
1577 ERR( "Failed to allocate memory.\n" );
1578 RtlLeaveCriticalSection( &io
->pool
->cs
);
1582 completion
= &io
->u
.io
.completions
[io
->u
.io
.completion_count
++];
1583 completion
->iosb
= iosb
;
1584 completion
->cvalue
= value
;
1586 tp_object_submit( io
, FALSE
);
1588 RtlLeaveCriticalSection( &io
->pool
->cs
);
1591 if (!ioqueue
.objcount
)
1593 /* All I/O objects have been destroyed; if no new objects are
1594 * created within some amount of time, then we can shutdown this
1596 LARGE_INTEGER timeout
= {.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000};
1597 if (RtlSleepConditionVariableCS( &ioqueue
.update_event
, &ioqueue
.cs
,
1598 &timeout
) == STATUS_TIMEOUT
&& !ioqueue
.objcount
)
1603 ioqueue
.thread_running
= FALSE
;
1604 RtlLeaveCriticalSection( &ioqueue
.cs
);
1606 TRACE( "terminating I/O completion thread\n" );
1608 RtlExitUserThread( 0 );
1611 static NTSTATUS
tp_ioqueue_lock( struct threadpool_object
*io
, HANDLE file
)
1613 NTSTATUS status
= STATUS_SUCCESS
;
1615 assert( io
->type
== TP_OBJECT_TYPE_IO
);
1617 RtlEnterCriticalSection( &ioqueue
.cs
);
1619 if (!ioqueue
.port
&& (status
= NtCreateIoCompletion( &ioqueue
.port
,
1620 IO_COMPLETION_ALL_ACCESS
, NULL
, 0 )))
1622 RtlLeaveCriticalSection( &ioqueue
.cs
);
1626 if (!ioqueue
.thread_running
)
1630 if (!(status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
,
1631 0, 0, 0, ioqueue_thread_proc
, NULL
, &thread
, NULL
)))
1633 ioqueue
.thread_running
= TRUE
;
1638 if (status
== STATUS_SUCCESS
)
1640 FILE_COMPLETION_INFORMATION info
;
1641 IO_STATUS_BLOCK iosb
;
1643 info
.CompletionPort
= ioqueue
.port
;
1644 info
.CompletionKey
= (ULONG_PTR
)io
;
1646 status
= NtSetInformationFile( file
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
1649 if (status
== STATUS_SUCCESS
)
1651 if (!ioqueue
.objcount
++)
1652 RtlWakeConditionVariable( &ioqueue
.update_event
);
1655 RtlLeaveCriticalSection( &ioqueue
.cs
);
1659 /***********************************************************************
1660 * tp_threadpool_alloc (internal)
1662 * Allocates a new threadpool object.
1664 static NTSTATUS
tp_threadpool_alloc( struct threadpool
**out
)
1666 IMAGE_NT_HEADERS
*nt
= RtlImageNtHeader( NtCurrentTeb()->Peb
->ImageBaseAddress
);
1667 struct threadpool
*pool
;
1670 pool
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool
) );
1672 return STATUS_NO_MEMORY
;
1676 pool
->shutdown
= FALSE
;
1678 RtlInitializeCriticalSection( &pool
->cs
);
1679 pool
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool.cs");
1681 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1682 list_init( &pool
->pools
[i
] );
1683 RtlInitializeConditionVariable( &pool
->update_event
);
1685 pool
->max_workers
= 500;
1686 pool
->min_workers
= 0;
1687 pool
->num_workers
= 0;
1688 pool
->num_busy_workers
= 0;
1689 pool
->stack_info
.StackReserve
= nt
->OptionalHeader
.SizeOfStackReserve
;
1690 pool
->stack_info
.StackCommit
= nt
->OptionalHeader
.SizeOfStackCommit
;
1692 TRACE( "allocated threadpool %p\n", pool
);
1695 return STATUS_SUCCESS
;
1698 /***********************************************************************
1699 * tp_threadpool_shutdown (internal)
1701 * Prepares the shutdown of a threadpool object and notifies all worker
1702 * threads to terminate (after all remaining work items have been
1705 static void tp_threadpool_shutdown( struct threadpool
*pool
)
1707 assert( pool
!= default_threadpool
);
1709 pool
->shutdown
= TRUE
;
1710 RtlWakeAllConditionVariable( &pool
->update_event
);
1713 /***********************************************************************
1714 * tp_threadpool_release (internal)
1716 * Releases a reference to a threadpool object.
1718 static BOOL
tp_threadpool_release( struct threadpool
*pool
)
1722 if (InterlockedDecrement( &pool
->refcount
))
1725 TRACE( "destroying threadpool %p\n", pool
);
1727 assert( pool
->shutdown
);
1728 assert( !pool
->objcount
);
1729 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1730 assert( list_empty( &pool
->pools
[i
] ) );
1732 pool
->cs
.DebugInfo
->Spare
[0] = 0;
1733 RtlDeleteCriticalSection( &pool
->cs
);
1735 RtlFreeHeap( GetProcessHeap(), 0, pool
);
1739 /***********************************************************************
1740 * tp_threadpool_lock (internal)
1742 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1743 * block. When the lock is acquired successfully, it is guaranteed that
1744 * there is at least one worker thread to process tasks.
1746 static NTSTATUS
tp_threadpool_lock( struct threadpool
**out
, TP_CALLBACK_ENVIRON
*environment
)
1748 struct threadpool
*pool
= NULL
;
1749 NTSTATUS status
= STATUS_SUCCESS
;
1753 /* Validate environment parameters. */
1754 if (environment
->Version
== 3)
1756 TP_CALLBACK_ENVIRON_V3
*environment3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1758 switch (environment3
->CallbackPriority
)
1760 case TP_CALLBACK_PRIORITY_HIGH
:
1761 case TP_CALLBACK_PRIORITY_NORMAL
:
1762 case TP_CALLBACK_PRIORITY_LOW
:
1765 return STATUS_INVALID_PARAMETER
;
1769 pool
= (struct threadpool
*)environment
->Pool
;
1774 if (!default_threadpool
)
1776 status
= tp_threadpool_alloc( &pool
);
1777 if (status
!= STATUS_SUCCESS
)
1780 if (InterlockedCompareExchangePointer( (void *)&default_threadpool
, pool
, NULL
) != NULL
)
1782 tp_threadpool_shutdown( pool
);
1783 tp_threadpool_release( pool
);
1787 pool
= default_threadpool
;
1790 RtlEnterCriticalSection( &pool
->cs
);
1792 /* Make sure that the threadpool has at least one thread. */
1793 if (!pool
->num_workers
)
1794 status
= tp_new_worker_thread( pool
);
1796 /* Keep a reference, and increment objcount to ensure that the
1797 * last thread doesn't terminate. */
1798 if (status
== STATUS_SUCCESS
)
1800 InterlockedIncrement( &pool
->refcount
);
1804 RtlLeaveCriticalSection( &pool
->cs
);
1806 if (status
!= STATUS_SUCCESS
)
1810 return STATUS_SUCCESS
;
1813 /***********************************************************************
1814 * tp_threadpool_unlock (internal)
1816 * Releases a lock on a threadpool.
1818 static void tp_threadpool_unlock( struct threadpool
*pool
)
1820 RtlEnterCriticalSection( &pool
->cs
);
1822 RtlLeaveCriticalSection( &pool
->cs
);
1823 tp_threadpool_release( pool
);
1826 /***********************************************************************
1827 * tp_group_alloc (internal)
1829 * Allocates a new threadpool group object.
1831 static NTSTATUS
tp_group_alloc( struct threadpool_group
**out
)
1833 struct threadpool_group
*group
;
1835 group
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group
) );
1837 return STATUS_NO_MEMORY
;
1839 group
->refcount
= 1;
1840 group
->shutdown
= FALSE
;
1842 RtlInitializeCriticalSection( &group
->cs
);
1843 group
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool_group.cs");
1845 list_init( &group
->members
);
1847 TRACE( "allocated group %p\n", group
);
1850 return STATUS_SUCCESS
;
1853 /***********************************************************************
1854 * tp_group_shutdown (internal)
1856 * Marks the group object for shutdown.
1858 static void tp_group_shutdown( struct threadpool_group
*group
)
1860 group
->shutdown
= TRUE
;
1863 /***********************************************************************
1864 * tp_group_release (internal)
1866 * Releases a reference to a group object.
1868 static BOOL
tp_group_release( struct threadpool_group
*group
)
1870 if (InterlockedDecrement( &group
->refcount
))
1873 TRACE( "destroying group %p\n", group
);
1875 assert( group
->shutdown
);
1876 assert( list_empty( &group
->members
) );
1878 group
->cs
.DebugInfo
->Spare
[0] = 0;
1879 RtlDeleteCriticalSection( &group
->cs
);
1881 RtlFreeHeap( GetProcessHeap(), 0, group
);
1885 /***********************************************************************
1886 * tp_object_initialize (internal)
1888 * Initializes members of a threadpool object.
1890 static void tp_object_initialize( struct threadpool_object
*object
, struct threadpool
*pool
,
1891 PVOID userdata
, TP_CALLBACK_ENVIRON
*environment
)
1893 BOOL is_simple_callback
= (object
->type
== TP_OBJECT_TYPE_SIMPLE
);
1895 object
->refcount
= 1;
1896 object
->shutdown
= FALSE
;
1898 object
->pool
= pool
;
1899 object
->group
= NULL
;
1900 object
->userdata
= userdata
;
1901 object
->group_cancel_callback
= NULL
;
1902 object
->finalization_callback
= NULL
;
1903 object
->may_run_long
= 0;
1904 object
->race_dll
= NULL
;
1905 object
->priority
= TP_CALLBACK_PRIORITY_NORMAL
;
1907 memset( &object
->group_entry
, 0, sizeof(object
->group_entry
) );
1908 object
->is_group_member
= FALSE
;
1910 memset( &object
->pool_entry
, 0, sizeof(object
->pool_entry
) );
1911 RtlInitializeConditionVariable( &object
->finished_event
);
1912 RtlInitializeConditionVariable( &object
->group_finished_event
);
1913 object
->completed_event
= NULL
;
1914 object
->num_pending_callbacks
= 0;
1915 object
->num_running_callbacks
= 0;
1916 object
->num_associated_callbacks
= 0;
1920 if (environment
->Version
!= 1 && environment
->Version
!= 3)
1921 FIXME( "unsupported environment version %lu\n", environment
->Version
);
1923 object
->group
= impl_from_TP_CLEANUP_GROUP( environment
->CleanupGroup
);
1924 object
->group_cancel_callback
= environment
->CleanupGroupCancelCallback
;
1925 object
->finalization_callback
= environment
->FinalizationCallback
;
1926 object
->may_run_long
= environment
->u
.s
.LongFunction
!= 0;
1927 object
->race_dll
= environment
->RaceDll
;
1928 if (environment
->Version
== 3)
1930 TP_CALLBACK_ENVIRON_V3
*environment_v3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1932 object
->priority
= environment_v3
->CallbackPriority
;
1933 assert( object
->priority
< ARRAY_SIZE(pool
->pools
) );
1936 if (environment
->ActivationContext
)
1937 FIXME( "activation context not supported yet\n" );
1939 if (environment
->u
.s
.Persistent
)
1940 FIXME( "persistent threads not supported yet\n" );
1943 if (object
->race_dll
)
1944 LdrAddRefDll( 0, object
->race_dll
);
1946 TRACE( "allocated object %p of type %u\n", object
, object
->type
);
1948 /* For simple callbacks we have to run tp_object_submit before adding this object
1949 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1950 * will be set, and tp_object_submit would fail with an assertion. */
1952 if (is_simple_callback
)
1953 tp_object_submit( object
, FALSE
);
1957 struct threadpool_group
*group
= object
->group
;
1958 InterlockedIncrement( &group
->refcount
);
1960 RtlEnterCriticalSection( &group
->cs
);
1961 list_add_tail( &group
->members
, &object
->group_entry
);
1962 object
->is_group_member
= TRUE
;
1963 RtlLeaveCriticalSection( &group
->cs
);
1966 if (is_simple_callback
)
1967 tp_object_release( object
);
1970 static void tp_object_prio_queue( struct threadpool_object
*object
)
1972 ++object
->pool
->num_busy_workers
;
1973 list_add_tail( &object
->pool
->pools
[object
->priority
], &object
->pool_entry
);
1976 /***********************************************************************
1977 * tp_object_submit (internal)
1979 * Submits a threadpool object to the associated threadpool. This
1980 * function has to be VOID because TpPostWork can never fail on Windows.
1982 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
)
1984 struct threadpool
*pool
= object
->pool
;
1985 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
1987 assert( !object
->shutdown
);
1988 assert( !pool
->shutdown
);
1990 RtlEnterCriticalSection( &pool
->cs
);
1992 /* Start new worker threads if required. */
1993 if (pool
->num_busy_workers
>= pool
->num_workers
&&
1994 pool
->num_workers
< pool
->max_workers
)
1995 status
= tp_new_worker_thread( pool
);
1997 /* Queue work item and increment refcount. */
1998 InterlockedIncrement( &object
->refcount
);
1999 if (!object
->num_pending_callbacks
++)
2000 tp_object_prio_queue( object
);
2002 /* Count how often the object was signaled. */
2003 if (object
->type
== TP_OBJECT_TYPE_WAIT
&& signaled
)
2004 object
->u
.wait
.signaled
++;
2006 /* No new thread started - wake up one existing thread. */
2007 if (status
!= STATUS_SUCCESS
)
2009 assert( pool
->num_workers
> 0 );
2010 RtlWakeConditionVariable( &pool
->update_event
);
2013 RtlLeaveCriticalSection( &pool
->cs
);
2016 /***********************************************************************
2017 * tp_object_cancel (internal)
2019 * Cancels all currently pending callbacks for a specific object.
2021 static void tp_object_cancel( struct threadpool_object
*object
)
2023 struct threadpool
*pool
= object
->pool
;
2024 LONG pending_callbacks
= 0;
2026 RtlEnterCriticalSection( &pool
->cs
);
2027 if (object
->num_pending_callbacks
)
2029 pending_callbacks
= object
->num_pending_callbacks
;
2030 object
->num_pending_callbacks
= 0;
2031 list_remove( &object
->pool_entry
);
2033 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2034 object
->u
.wait
.signaled
= 0;
2036 if (object
->type
== TP_OBJECT_TYPE_IO
)
2038 object
->u
.io
.skipped_count
+= object
->u
.io
.pending_count
;
2039 object
->u
.io
.pending_count
= 0;
2041 RtlLeaveCriticalSection( &pool
->cs
);
2043 while (pending_callbacks
--)
2044 tp_object_release( object
);
2047 static BOOL
object_is_finished( struct threadpool_object
*object
, BOOL group
)
2049 if (object
->num_pending_callbacks
)
2051 if (object
->type
== TP_OBJECT_TYPE_IO
&& object
->u
.io
.pending_count
)
2055 return !object
->num_running_callbacks
;
2057 return !object
->num_associated_callbacks
;
2060 /***********************************************************************
2061 * tp_object_wait (internal)
2063 * Waits until all pending and running callbacks of a specific object
2064 * have been processed.
2066 static void tp_object_wait( struct threadpool_object
*object
, BOOL group_wait
)
2068 struct threadpool
*pool
= object
->pool
;
2070 RtlEnterCriticalSection( &pool
->cs
);
2071 while (!object_is_finished( object
, group_wait
))
2074 RtlSleepConditionVariableCS( &object
->group_finished_event
, &pool
->cs
, NULL
);
2076 RtlSleepConditionVariableCS( &object
->finished_event
, &pool
->cs
, NULL
);
2078 RtlLeaveCriticalSection( &pool
->cs
);
2081 static void tp_ioqueue_unlock( struct threadpool_object
*io
)
2083 assert( io
->type
== TP_OBJECT_TYPE_IO
);
2085 RtlEnterCriticalSection( &ioqueue
.cs
);
2087 assert(ioqueue
.objcount
);
2089 if (!io
->shutdown
&& !--ioqueue
.objcount
)
2090 NtSetIoCompletion( ioqueue
.port
, 0, 0, STATUS_SUCCESS
, 0 );
2092 RtlLeaveCriticalSection( &ioqueue
.cs
);
2095 /***********************************************************************
2096 * tp_object_prepare_shutdown (internal)
2098 * Prepares a threadpool object for shutdown.
2100 static void tp_object_prepare_shutdown( struct threadpool_object
*object
)
2102 if (object
->type
== TP_OBJECT_TYPE_TIMER
)
2103 tp_timerqueue_unlock( object
);
2104 else if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2105 tp_waitqueue_unlock( object
);
2106 else if (object
->type
== TP_OBJECT_TYPE_IO
)
2107 tp_ioqueue_unlock( object
);
2110 /***********************************************************************
2111 * tp_object_release (internal)
2113 * Releases a reference to a threadpool object.
2115 static BOOL
tp_object_release( struct threadpool_object
*object
)
2117 if (InterlockedDecrement( &object
->refcount
))
2120 TRACE( "destroying object %p of type %u\n", object
, object
->type
);
2122 assert( object
->shutdown
);
2123 assert( !object
->num_pending_callbacks
);
2124 assert( !object
->num_running_callbacks
);
2125 assert( !object
->num_associated_callbacks
);
2127 /* release reference to the group */
2130 struct threadpool_group
*group
= object
->group
;
2132 RtlEnterCriticalSection( &group
->cs
);
2133 if (object
->is_group_member
)
2135 list_remove( &object
->group_entry
);
2136 object
->is_group_member
= FALSE
;
2138 RtlLeaveCriticalSection( &group
->cs
);
2140 tp_group_release( group
);
2143 tp_threadpool_unlock( object
->pool
);
2145 if (object
->race_dll
)
2146 LdrUnloadDll( object
->race_dll
);
2148 if (object
->completed_event
&& object
->completed_event
!= INVALID_HANDLE_VALUE
)
2149 NtSetEvent( object
->completed_event
, NULL
);
2151 RtlFreeHeap( GetProcessHeap(), 0, object
);
2155 static struct list
*threadpool_get_next_item( const struct threadpool
*pool
)
2160 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
2162 if ((ptr
= list_head( &pool
->pools
[i
] )))
2169 /***********************************************************************
2170 * tp_object_execute (internal)
2172 * Executes a threadpool object callback, object->pool->cs has to be
2175 static void tp_object_execute( struct threadpool_object
*object
, BOOL wait_thread
)
2177 TP_CALLBACK_INSTANCE
*callback_instance
;
2178 struct threadpool_instance instance
;
2179 struct io_completion completion
;
2180 struct threadpool
*pool
= object
->pool
;
2181 TP_WAIT_RESULT wait_result
= 0;
2184 object
->num_pending_callbacks
--;
2186 /* For wait objects check if they were signaled or have timed out. */
2187 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2189 wait_result
= object
->u
.wait
.signaled
? WAIT_OBJECT_0
: WAIT_TIMEOUT
;
2190 if (wait_result
== WAIT_OBJECT_0
) object
->u
.wait
.signaled
--;
2192 else if (object
->type
== TP_OBJECT_TYPE_IO
)
2194 assert( object
->u
.io
.completion_count
);
2195 completion
= object
->u
.io
.completions
[--object
->u
.io
.completion_count
];
2198 /* Leave critical section and do the actual callback. */
2199 object
->num_associated_callbacks
++;
2200 object
->num_running_callbacks
++;
2201 RtlLeaveCriticalSection( &pool
->cs
);
2202 if (wait_thread
) RtlLeaveCriticalSection( &waitqueue
.cs
);
2204 /* Initialize threadpool instance struct. */
2205 callback_instance
= (TP_CALLBACK_INSTANCE
*)&instance
;
2206 instance
.object
= object
;
2207 instance
.threadid
= GetCurrentThreadId();
2208 instance
.associated
= TRUE
;
2209 instance
.may_run_long
= object
->may_run_long
;
2210 instance
.cleanup
.critical_section
= NULL
;
2211 instance
.cleanup
.mutex
= NULL
;
2212 instance
.cleanup
.semaphore
= NULL
;
2213 instance
.cleanup
.semaphore_count
= 0;
2214 instance
.cleanup
.event
= NULL
;
2215 instance
.cleanup
.library
= NULL
;
2217 switch (object
->type
)
2219 case TP_OBJECT_TYPE_SIMPLE
:
2221 TRACE( "executing simple callback %p(%p, %p)\n",
2222 object
->u
.simple
.callback
, callback_instance
, object
->userdata
);
2223 object
->u
.simple
.callback( callback_instance
, object
->userdata
);
2224 TRACE( "callback %p returned\n", object
->u
.simple
.callback
);
2228 case TP_OBJECT_TYPE_WORK
:
2230 TRACE( "executing work callback %p(%p, %p, %p)\n",
2231 object
->u
.work
.callback
, callback_instance
, object
->userdata
, object
);
2232 object
->u
.work
.callback( callback_instance
, object
->userdata
, (TP_WORK
*)object
);
2233 TRACE( "callback %p returned\n", object
->u
.work
.callback
);
2237 case TP_OBJECT_TYPE_TIMER
:
2239 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2240 object
->u
.timer
.callback
, callback_instance
, object
->userdata
, object
);
2241 object
->u
.timer
.callback( callback_instance
, object
->userdata
, (TP_TIMER
*)object
);
2242 TRACE( "callback %p returned\n", object
->u
.timer
.callback
);
2246 case TP_OBJECT_TYPE_WAIT
:
2248 TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
2249 object
->u
.wait
.callback
, callback_instance
, object
->userdata
, object
, wait_result
);
2250 object
->u
.wait
.callback( callback_instance
, object
->userdata
, (TP_WAIT
*)object
, wait_result
);
2251 TRACE( "callback %p returned\n", object
->u
.wait
.callback
);
2255 case TP_OBJECT_TYPE_IO
:
2257 TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
2258 object
->u
.io
.callback
, callback_instance
, object
->userdata
,
2259 completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2260 object
->u
.io
.callback( callback_instance
, object
->userdata
,
2261 (void *)completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2262 TRACE( "callback %p returned\n", object
->u
.io
.callback
);
2271 /* Execute finalization callback. */
2272 if (object
->finalization_callback
)
2274 TRACE( "executing finalization callback %p(%p, %p)\n",
2275 object
->finalization_callback
, callback_instance
, object
->userdata
);
2276 object
->finalization_callback( callback_instance
, object
->userdata
);
2277 TRACE( "callback %p returned\n", object
->finalization_callback
);
2280 /* Execute cleanup tasks. */
2281 if (instance
.cleanup
.critical_section
)
2283 RtlLeaveCriticalSection( instance
.cleanup
.critical_section
);
2285 if (instance
.cleanup
.mutex
)
2287 status
= NtReleaseMutant( instance
.cleanup
.mutex
, NULL
);
2288 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2290 if (instance
.cleanup
.semaphore
)
2292 status
= NtReleaseSemaphore( instance
.cleanup
.semaphore
, instance
.cleanup
.semaphore_count
, NULL
);
2293 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2295 if (instance
.cleanup
.event
)
2297 status
= NtSetEvent( instance
.cleanup
.event
, NULL
);
2298 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2300 if (instance
.cleanup
.library
)
2302 LdrUnloadDll( instance
.cleanup
.library
);
2306 if (wait_thread
) RtlEnterCriticalSection( &waitqueue
.cs
);
2307 RtlEnterCriticalSection( &pool
->cs
);
2309 /* Simple callbacks are automatically shutdown after execution. */
2310 if (object
->type
== TP_OBJECT_TYPE_SIMPLE
)
2312 tp_object_prepare_shutdown( object
);
2313 object
->shutdown
= TRUE
;
2316 object
->num_running_callbacks
--;
2317 if (object_is_finished( object
, TRUE
))
2318 RtlWakeAllConditionVariable( &object
->group_finished_event
);
2320 if (instance
.associated
)
2322 object
->num_associated_callbacks
--;
2323 if (object_is_finished( object
, FALSE
))
2324 RtlWakeAllConditionVariable( &object
->finished_event
);
2328 /***********************************************************************
2329 * threadpool_worker_proc (internal)
2331 static void CALLBACK
threadpool_worker_proc( void *param
)
2333 struct threadpool
*pool
= param
;
2334 LARGE_INTEGER timeout
;
2337 TRACE( "starting worker thread for pool %p\n", pool
);
2338 set_thread_name(L
"wine_threadpool_worker");
2340 RtlEnterCriticalSection( &pool
->cs
);
2343 while ((ptr
= threadpool_get_next_item( pool
)))
2345 struct threadpool_object
*object
= LIST_ENTRY( ptr
, struct threadpool_object
, pool_entry
);
2346 assert( object
->num_pending_callbacks
> 0 );
2348 /* If further pending callbacks are queued, move the work item to
2349 * the end of the pool list. Otherwise remove it from the pool. */
2350 list_remove( &object
->pool_entry
);
2351 if (object
->num_pending_callbacks
> 1)
2352 tp_object_prio_queue( object
);
2354 tp_object_execute( object
, FALSE
);
2356 assert(pool
->num_busy_workers
);
2357 pool
->num_busy_workers
--;
2359 tp_object_release( object
);
2362 /* Shutdown worker thread if requested. */
2366 /* Wait for new tasks or until the timeout expires. A thread only terminates
2367 * when no new tasks are available, and the number of threads can be
2368 * decreased without violating the min_workers limit. An exception is when
2369 * min_workers == 0, then objcount is used to detect if the last thread
2370 * can be terminated. */
2371 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
2372 if (RtlSleepConditionVariableCS( &pool
->update_event
, &pool
->cs
, &timeout
) == STATUS_TIMEOUT
&&
2373 !threadpool_get_next_item( pool
) && (pool
->num_workers
> max( pool
->min_workers
, 1 ) ||
2374 (!pool
->min_workers
&& !pool
->objcount
)))
2379 pool
->num_workers
--;
2380 RtlLeaveCriticalSection( &pool
->cs
);
2382 TRACE( "terminating worker thread for pool %p\n", pool
);
2383 tp_threadpool_release( pool
);
2384 RtlExitUserThread( 0 );
2387 /***********************************************************************
2388 * TpAllocCleanupGroup (NTDLL.@)
2390 NTSTATUS WINAPI
TpAllocCleanupGroup( TP_CLEANUP_GROUP
**out
)
2392 TRACE( "%p\n", out
);
2394 return tp_group_alloc( (struct threadpool_group
**)out
);
2397 /***********************************************************************
2398 * TpAllocIoCompletion (NTDLL.@)
2400 NTSTATUS WINAPI
TpAllocIoCompletion( TP_IO
**out
, HANDLE file
, PTP_IO_CALLBACK callback
,
2401 void *userdata
, TP_CALLBACK_ENVIRON
*environment
)
2403 struct threadpool_object
*object
;
2404 struct threadpool
*pool
;
2407 TRACE( "%p %p %p %p %p\n", out
, file
, callback
, userdata
, environment
);
2409 if (!(object
= RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY
, sizeof(*object
) )))
2410 return STATUS_NO_MEMORY
;
2412 if ((status
= tp_threadpool_lock( &pool
, environment
)))
2414 RtlFreeHeap( GetProcessHeap(), 0, object
);
2418 object
->type
= TP_OBJECT_TYPE_IO
;
2419 object
->u
.io
.callback
= callback
;
2420 if (!(object
->u
.io
.completions
= RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object
->u
.io
.completions
) )))
2422 tp_threadpool_unlock( pool
);
2423 RtlFreeHeap( GetProcessHeap(), 0, object
);
2427 if ((status
= tp_ioqueue_lock( object
, file
)))
2429 tp_threadpool_unlock( pool
);
2430 RtlFreeHeap( GetProcessHeap(), 0, object
->u
.io
.completions
);
2431 RtlFreeHeap( GetProcessHeap(), 0, object
);
2435 tp_object_initialize( object
, pool
, userdata
, environment
);
2437 *out
= (TP_IO
*)object
;
2438 return STATUS_SUCCESS
;
2441 /***********************************************************************
2442 * TpAllocPool (NTDLL.@)
2444 NTSTATUS WINAPI
TpAllocPool( TP_POOL
**out
, PVOID reserved
)
2446 TRACE( "%p %p\n", out
, reserved
);
2449 FIXME( "reserved argument is nonzero (%p)\n", reserved
);
2451 return tp_threadpool_alloc( (struct threadpool
**)out
);
2454 /***********************************************************************
2455 * TpAllocTimer (NTDLL.@)
2457 NTSTATUS WINAPI
TpAllocTimer( TP_TIMER
**out
, PTP_TIMER_CALLBACK callback
, PVOID userdata
,
2458 TP_CALLBACK_ENVIRON
*environment
)
2460 struct threadpool_object
*object
;
2461 struct threadpool
*pool
;
2464 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2466 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2468 return STATUS_NO_MEMORY
;
2470 status
= tp_threadpool_lock( &pool
, environment
);
2473 RtlFreeHeap( GetProcessHeap(), 0, object
);
2477 object
->type
= TP_OBJECT_TYPE_TIMER
;
2478 object
->u
.timer
.callback
= callback
;
2480 status
= tp_timerqueue_lock( object
);
2483 tp_threadpool_unlock( pool
);
2484 RtlFreeHeap( GetProcessHeap(), 0, object
);
2488 tp_object_initialize( object
, pool
, userdata
, environment
);
2490 *out
= (TP_TIMER
*)object
;
2491 return STATUS_SUCCESS
;
2494 static NTSTATUS
tp_alloc_wait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2495 TP_CALLBACK_ENVIRON
*environment
, DWORD flags
)
2497 struct threadpool_object
*object
;
2498 struct threadpool
*pool
;
2501 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2503 return STATUS_NO_MEMORY
;
2505 status
= tp_threadpool_lock( &pool
, environment
);
2508 RtlFreeHeap( GetProcessHeap(), 0, object
);
2512 object
->type
= TP_OBJECT_TYPE_WAIT
;
2513 object
->u
.wait
.callback
= callback
;
2514 object
->u
.wait
.flags
= flags
;
2516 status
= tp_waitqueue_lock( object
);
2519 tp_threadpool_unlock( pool
);
2520 RtlFreeHeap( GetProcessHeap(), 0, object
);
2524 tp_object_initialize( object
, pool
, userdata
, environment
);
2526 *out
= (TP_WAIT
*)object
;
2527 return STATUS_SUCCESS
;
2530 /***********************************************************************
2531 * TpAllocWait (NTDLL.@)
2533 NTSTATUS WINAPI
TpAllocWait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2534 TP_CALLBACK_ENVIRON
*environment
)
2536 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2537 return tp_alloc_wait( out
, callback
, userdata
, environment
, WT_EXECUTEONLYONCE
);
2540 /***********************************************************************
2541 * TpAllocWork (NTDLL.@)
2543 NTSTATUS WINAPI
TpAllocWork( TP_WORK
**out
, PTP_WORK_CALLBACK callback
, PVOID userdata
,
2544 TP_CALLBACK_ENVIRON
*environment
)
2546 struct threadpool_object
*object
;
2547 struct threadpool
*pool
;
2550 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2552 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2554 return STATUS_NO_MEMORY
;
2556 status
= tp_threadpool_lock( &pool
, environment
);
2559 RtlFreeHeap( GetProcessHeap(), 0, object
);
2563 object
->type
= TP_OBJECT_TYPE_WORK
;
2564 object
->u
.work
.callback
= callback
;
2565 tp_object_initialize( object
, pool
, userdata
, environment
);
2567 *out
= (TP_WORK
*)object
;
2568 return STATUS_SUCCESS
;
2571 /***********************************************************************
2572 * TpCancelAsyncIoOperation (NTDLL.@)
2574 void WINAPI
TpCancelAsyncIoOperation( TP_IO
*io
)
2576 struct threadpool_object
*this = impl_from_TP_IO( io
);
2578 TRACE( "%p\n", io
);
2580 RtlEnterCriticalSection( &this->pool
->cs
);
2582 TRACE("pending_count %u.\n", this->u
.io
.pending_count
);
2584 this->u
.io
.pending_count
--;
2585 if (object_is_finished( this, TRUE
))
2586 RtlWakeAllConditionVariable( &this->group_finished_event
);
2587 if (object_is_finished( this, FALSE
))
2588 RtlWakeAllConditionVariable( &this->finished_event
);
2590 RtlLeaveCriticalSection( &this->pool
->cs
);
2593 /***********************************************************************
2594 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2596 VOID WINAPI
TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE
*instance
, CRITICAL_SECTION
*crit
)
2598 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2600 TRACE( "%p %p\n", instance
, crit
);
2602 if (!this->cleanup
.critical_section
)
2603 this->cleanup
.critical_section
= crit
;
2606 /***********************************************************************
2607 * TpCallbackMayRunLong (NTDLL.@)
2609 NTSTATUS WINAPI
TpCallbackMayRunLong( TP_CALLBACK_INSTANCE
*instance
)
2611 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2612 struct threadpool_object
*object
= this->object
;
2613 struct threadpool
*pool
;
2614 NTSTATUS status
= STATUS_SUCCESS
;
2616 TRACE( "%p\n", instance
);
2618 if (this->threadid
!= GetCurrentThreadId())
2620 ERR("called from wrong thread, ignoring\n");
2621 return STATUS_UNSUCCESSFUL
; /* FIXME */
2624 if (this->may_run_long
)
2625 return STATUS_SUCCESS
;
2627 pool
= object
->pool
;
2628 RtlEnterCriticalSection( &pool
->cs
);
2630 /* Start new worker threads if required. */
2631 if (pool
->num_busy_workers
>= pool
->num_workers
)
2633 if (pool
->num_workers
< pool
->max_workers
)
2635 status
= tp_new_worker_thread( pool
);
2639 status
= STATUS_TOO_MANY_THREADS
;
2643 RtlLeaveCriticalSection( &pool
->cs
);
2644 this->may_run_long
= TRUE
;
2648 /***********************************************************************
2649 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2651 VOID WINAPI
TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE mutex
)
2653 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2655 TRACE( "%p %p\n", instance
, mutex
);
2657 if (!this->cleanup
.mutex
)
2658 this->cleanup
.mutex
= mutex
;
2661 /***********************************************************************
2662 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2664 VOID WINAPI
TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE semaphore
, DWORD count
)
2666 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2668 TRACE( "%p %p %lu\n", instance
, semaphore
, count
);
2670 if (!this->cleanup
.semaphore
)
2672 this->cleanup
.semaphore
= semaphore
;
2673 this->cleanup
.semaphore_count
= count
;
2677 /***********************************************************************
2678 * TpCallbackSetEventOnCompletion (NTDLL.@)
2680 VOID WINAPI
TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE event
)
2682 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2684 TRACE( "%p %p\n", instance
, event
);
2686 if (!this->cleanup
.event
)
2687 this->cleanup
.event
= event
;
2690 /***********************************************************************
2691 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2693 VOID WINAPI
TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HMODULE module
)
2695 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2697 TRACE( "%p %p\n", instance
, module
);
2699 if (!this->cleanup
.library
)
2700 this->cleanup
.library
= module
;
2703 /***********************************************************************
2704 * TpDisassociateCallback (NTDLL.@)
2706 VOID WINAPI
TpDisassociateCallback( TP_CALLBACK_INSTANCE
*instance
)
2708 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2709 struct threadpool_object
*object
= this->object
;
2710 struct threadpool
*pool
;
2712 TRACE( "%p\n", instance
);
2714 if (this->threadid
!= GetCurrentThreadId())
2716 ERR("called from wrong thread, ignoring\n");
2720 if (!this->associated
)
2723 pool
= object
->pool
;
2724 RtlEnterCriticalSection( &pool
->cs
);
2726 object
->num_associated_callbacks
--;
2727 if (object_is_finished( object
, FALSE
))
2728 RtlWakeAllConditionVariable( &object
->finished_event
);
2730 RtlLeaveCriticalSection( &pool
->cs
);
2731 this->associated
= FALSE
;
2734 /***********************************************************************
2735 * TpIsTimerSet (NTDLL.@)
2737 BOOL WINAPI
TpIsTimerSet( TP_TIMER
*timer
)
2739 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2741 TRACE( "%p\n", timer
);
2743 return this->u
.timer
.timer_set
;
2746 /***********************************************************************
2747 * TpPostWork (NTDLL.@)
2749 VOID WINAPI
TpPostWork( TP_WORK
*work
)
2751 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2753 TRACE( "%p\n", work
);
2755 tp_object_submit( this, FALSE
);
2758 /***********************************************************************
2759 * TpReleaseCleanupGroup (NTDLL.@)
2761 VOID WINAPI
TpReleaseCleanupGroup( TP_CLEANUP_GROUP
*group
)
2763 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2765 TRACE( "%p\n", group
);
2767 tp_group_shutdown( this );
2768 tp_group_release( this );
2771 /***********************************************************************
2772 * TpReleaseCleanupGroupMembers (NTDLL.@)
2774 VOID WINAPI
TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP
*group
, BOOL cancel_pending
, PVOID userdata
)
2776 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2777 struct threadpool_object
*object
, *next
;
2778 struct list members
;
2780 TRACE( "%p %u %p\n", group
, cancel_pending
, userdata
);
2782 RtlEnterCriticalSection( &this->cs
);
2784 /* Unset group, increase references, and mark objects for shutdown */
2785 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &this->members
, struct threadpool_object
, group_entry
)
2787 assert( object
->group
== this );
2788 assert( object
->is_group_member
);
2790 if (InterlockedIncrement( &object
->refcount
) == 1)
2792 /* Object is basically already destroyed, but group reference
2793 * was not deleted yet. We can safely ignore this object. */
2794 InterlockedDecrement( &object
->refcount
);
2795 list_remove( &object
->group_entry
);
2796 object
->is_group_member
= FALSE
;
2800 object
->is_group_member
= FALSE
;
2801 tp_object_prepare_shutdown( object
);
2804 /* Move members to a new temporary list */
2805 list_init( &members
);
2806 list_move_tail( &members
, &this->members
);
2808 RtlLeaveCriticalSection( &this->cs
);
2810 /* Cancel pending callbacks if requested */
2813 LIST_FOR_EACH_ENTRY( object
, &members
, struct threadpool_object
, group_entry
)
2815 tp_object_cancel( object
);
2819 /* Wait for remaining callbacks to finish */
2820 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &members
, struct threadpool_object
, group_entry
)
2822 tp_object_wait( object
, TRUE
);
2824 if (!object
->shutdown
)
2826 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2827 if (cancel_pending
&& object
->group_cancel_callback
)
2829 TRACE( "executing group cancel callback %p(%p, %p)\n",
2830 object
->group_cancel_callback
, object
->userdata
, userdata
);
2831 object
->group_cancel_callback( object
->userdata
, userdata
);
2832 TRACE( "callback %p returned\n", object
->group_cancel_callback
);
2835 if (object
->type
!= TP_OBJECT_TYPE_SIMPLE
)
2836 tp_object_release( object
);
2839 object
->shutdown
= TRUE
;
2840 tp_object_release( object
);
2844 /***********************************************************************
2845 * TpReleaseIoCompletion (NTDLL.@)
2847 void WINAPI
TpReleaseIoCompletion( TP_IO
*io
)
2849 struct threadpool_object
*this = impl_from_TP_IO( io
);
2852 TRACE( "%p\n", io
);
2854 RtlEnterCriticalSection( &this->pool
->cs
);
2855 this->u
.io
.shutting_down
= TRUE
;
2856 can_destroy
= !this->u
.io
.pending_count
&& !this->u
.io
.skipped_count
;
2857 RtlLeaveCriticalSection( &this->pool
->cs
);
2861 tp_object_prepare_shutdown( this );
2862 this->shutdown
= TRUE
;
2863 tp_object_release( this );
2867 /***********************************************************************
2868 * TpReleasePool (NTDLL.@)
2870 VOID WINAPI
TpReleasePool( TP_POOL
*pool
)
2872 struct threadpool
*this = impl_from_TP_POOL( pool
);
2874 TRACE( "%p\n", pool
);
2876 tp_threadpool_shutdown( this );
2877 tp_threadpool_release( this );
2880 /***********************************************************************
2881 * TpReleaseTimer (NTDLL.@)
2883 VOID WINAPI
TpReleaseTimer( TP_TIMER
*timer
)
2885 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2887 TRACE( "%p\n", timer
);
2889 tp_object_prepare_shutdown( this );
2890 this->shutdown
= TRUE
;
2891 tp_object_release( this );
2894 /***********************************************************************
2895 * TpReleaseWait (NTDLL.@)
2897 VOID WINAPI
TpReleaseWait( TP_WAIT
*wait
)
2899 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2901 TRACE( "%p\n", wait
);
2903 tp_object_prepare_shutdown( this );
2904 this->shutdown
= TRUE
;
2905 tp_object_release( this );
2908 /***********************************************************************
2909 * TpReleaseWork (NTDLL.@)
2911 VOID WINAPI
TpReleaseWork( TP_WORK
*work
)
2913 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2915 TRACE( "%p\n", work
);
2917 tp_object_prepare_shutdown( this );
2918 this->shutdown
= TRUE
;
2919 tp_object_release( this );
2922 /***********************************************************************
2923 * TpSetPoolMaxThreads (NTDLL.@)
2925 VOID WINAPI
TpSetPoolMaxThreads( TP_POOL
*pool
, DWORD maximum
)
2927 struct threadpool
*this = impl_from_TP_POOL( pool
);
2929 TRACE( "%p %lu\n", pool
, maximum
);
2931 RtlEnterCriticalSection( &this->cs
);
2932 this->max_workers
= max( maximum
, 1 );
2933 this->min_workers
= min( this->min_workers
, this->max_workers
);
2934 RtlLeaveCriticalSection( &this->cs
);
2937 /***********************************************************************
2938 * TpSetPoolMinThreads (NTDLL.@)
2940 BOOL WINAPI
TpSetPoolMinThreads( TP_POOL
*pool
, DWORD minimum
)
2942 struct threadpool
*this = impl_from_TP_POOL( pool
);
2943 NTSTATUS status
= STATUS_SUCCESS
;
2945 TRACE( "%p %lu\n", pool
, minimum
);
2947 RtlEnterCriticalSection( &this->cs
);
2949 while (this->num_workers
< minimum
)
2951 status
= tp_new_worker_thread( this );
2952 if (status
!= STATUS_SUCCESS
)
2956 if (status
== STATUS_SUCCESS
)
2958 this->min_workers
= minimum
;
2959 this->max_workers
= max( this->min_workers
, this->max_workers
);
2962 RtlLeaveCriticalSection( &this->cs
);
2966 /***********************************************************************
2967 * TpSetTimer (NTDLL.@)
2969 VOID WINAPI
TpSetTimer( TP_TIMER
*timer
, LARGE_INTEGER
*timeout
, LONG period
, LONG window_length
)
2971 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2972 struct threadpool_object
*other_timer
;
2973 BOOL submit_timer
= FALSE
;
2974 ULONGLONG timestamp
;
2976 TRACE( "%p %p %lu %lu\n", timer
, timeout
, period
, window_length
);
2978 RtlEnterCriticalSection( &timerqueue
.cs
);
2980 assert( this->u
.timer
.timer_initialized
);
2981 this->u
.timer
.timer_set
= timeout
!= NULL
;
2983 /* Convert relative timeout to absolute timestamp and handle a timeout
2984 * of zero, which means that the timer is submitted immediately. */
2987 timestamp
= timeout
->QuadPart
;
2988 if ((LONGLONG
)timestamp
< 0)
2991 NtQuerySystemTime( &now
);
2992 timestamp
= now
.QuadPart
- timestamp
;
2994 else if (!timestamp
)
3001 NtQuerySystemTime( &now
);
3002 timestamp
= now
.QuadPart
+ (ULONGLONG
)period
* 10000;
3004 submit_timer
= TRUE
;
3008 /* First remove existing timeout. */
3009 if (this->u
.timer
.timer_pending
)
3011 list_remove( &this->u
.timer
.timer_entry
);
3012 this->u
.timer
.timer_pending
= FALSE
;
3015 /* If the timer was enabled, then add it back to the queue. */
3018 this->u
.timer
.timeout
= timestamp
;
3019 this->u
.timer
.period
= period
;
3020 this->u
.timer
.window_length
= window_length
;
3022 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
3023 struct threadpool_object
, u
.timer
.timer_entry
)
3025 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
3026 if (this->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
3029 list_add_before( &other_timer
->u
.timer
.timer_entry
, &this->u
.timer
.timer_entry
);
3031 /* Wake up the timer thread when the timeout has to be updated. */
3032 if (list_head( &timerqueue
.pending_timers
) == &this->u
.timer
.timer_entry
)
3033 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
3035 this->u
.timer
.timer_pending
= TRUE
;
3038 RtlLeaveCriticalSection( &timerqueue
.cs
);
3041 tp_object_submit( this, FALSE
);
3044 /***********************************************************************
3045 * TpSetWait (NTDLL.@)
3047 VOID WINAPI
TpSetWait( TP_WAIT
*wait
, HANDLE handle
, LARGE_INTEGER
*timeout
)
3049 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
3050 ULONGLONG timestamp
= MAXLONGLONG
;
3052 TRACE( "%p %p %p\n", wait
, handle
, timeout
);
3054 RtlEnterCriticalSection( &waitqueue
.cs
);
3056 assert( this->u
.wait
.bucket
);
3057 this->u
.wait
.handle
= handle
;
3059 if (handle
|| this->u
.wait
.wait_pending
)
3061 struct waitqueue_bucket
*bucket
= this->u
.wait
.bucket
;
3062 list_remove( &this->u
.wait
.wait_entry
);
3064 /* Convert relative timeout to absolute timestamp. */
3065 if (handle
&& timeout
)
3067 timestamp
= timeout
->QuadPart
;
3068 if ((LONGLONG
)timestamp
< 0)
3071 NtQuerySystemTime( &now
);
3072 timestamp
= now
.QuadPart
- timestamp
;
3076 /* Add wait object back into one of the queues. */
3079 list_add_tail( &bucket
->waiting
, &this->u
.wait
.wait_entry
);
3080 this->u
.wait
.wait_pending
= TRUE
;
3081 this->u
.wait
.timeout
= timestamp
;
3085 list_add_tail( &bucket
->reserved
, &this->u
.wait
.wait_entry
);
3086 this->u
.wait
.wait_pending
= FALSE
;
3089 /* Wake up the wait queue thread. */
3090 NtSetEvent( bucket
->update_event
, NULL
);
3093 RtlLeaveCriticalSection( &waitqueue
.cs
);
3096 /***********************************************************************
3097 * TpSimpleTryPost (NTDLL.@)
3099 NTSTATUS WINAPI
TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback
, PVOID userdata
,
3100 TP_CALLBACK_ENVIRON
*environment
)
3102 struct threadpool_object
*object
;
3103 struct threadpool
*pool
;
3106 TRACE( "%p %p %p\n", callback
, userdata
, environment
);
3108 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
3110 return STATUS_NO_MEMORY
;
3112 status
= tp_threadpool_lock( &pool
, environment
);
3115 RtlFreeHeap( GetProcessHeap(), 0, object
);
3119 object
->type
= TP_OBJECT_TYPE_SIMPLE
;
3120 object
->u
.simple
.callback
= callback
;
3121 tp_object_initialize( object
, pool
, userdata
, environment
);
3123 return STATUS_SUCCESS
;
3126 /***********************************************************************
3127 * TpStartAsyncIoOperation (NTDLL.@)
3129 void WINAPI
TpStartAsyncIoOperation( TP_IO
*io
)
3131 struct threadpool_object
*this = impl_from_TP_IO( io
);
3133 TRACE( "%p\n", io
);
3135 RtlEnterCriticalSection( &this->pool
->cs
);
3137 this->u
.io
.pending_count
++;
3139 RtlLeaveCriticalSection( &this->pool
->cs
);
3142 /***********************************************************************
3143 * TpWaitForIoCompletion (NTDLL.@)
3145 void WINAPI
TpWaitForIoCompletion( TP_IO
*io
, BOOL cancel_pending
)
3147 struct threadpool_object
*this = impl_from_TP_IO( io
);
3149 TRACE( "%p %d\n", io
, cancel_pending
);
3152 tp_object_cancel( this );
3153 tp_object_wait( this, FALSE
);
3156 /***********************************************************************
3157 * TpWaitForTimer (NTDLL.@)
3159 VOID WINAPI
TpWaitForTimer( TP_TIMER
*timer
, BOOL cancel_pending
)
3161 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
3163 TRACE( "%p %d\n", timer
, cancel_pending
);
3166 tp_object_cancel( this );
3167 tp_object_wait( this, FALSE
);
3170 /***********************************************************************
3171 * TpWaitForWait (NTDLL.@)
3173 VOID WINAPI
TpWaitForWait( TP_WAIT
*wait
, BOOL cancel_pending
)
3175 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
3177 TRACE( "%p %d\n", wait
, cancel_pending
);
3180 tp_object_cancel( this );
3181 tp_object_wait( this, FALSE
);
3184 /***********************************************************************
3185 * TpWaitForWork (NTDLL.@)
3187 VOID WINAPI
TpWaitForWork( TP_WORK
*work
, BOOL cancel_pending
)
3189 struct threadpool_object
*this = impl_from_TP_WORK( work
);
3191 TRACE( "%p %u\n", work
, cancel_pending
);
3194 tp_object_cancel( this );
3195 tp_object_wait( this, FALSE
);
3198 /***********************************************************************
3199 * TpSetPoolStackInformation (NTDLL.@)
3201 NTSTATUS WINAPI
TpSetPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3203 struct threadpool
*this = impl_from_TP_POOL( pool
);
3205 TRACE( "%p %p\n", pool
, stack_info
);
3208 return STATUS_INVALID_PARAMETER
;
3210 RtlEnterCriticalSection( &this->cs
);
3211 this->stack_info
= *stack_info
;
3212 RtlLeaveCriticalSection( &this->cs
);
3214 return STATUS_SUCCESS
;
3217 /***********************************************************************
3218 * TpQueryPoolStackInformation (NTDLL.@)
3220 NTSTATUS WINAPI
TpQueryPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3222 struct threadpool
*this = impl_from_TP_POOL( pool
);
3224 TRACE( "%p %p\n", pool
, stack_info
);
3227 return STATUS_INVALID_PARAMETER
;
3229 RtlEnterCriticalSection( &this->cs
);
3230 *stack_info
= this->stack_info
;
3231 RtlLeaveCriticalSection( &this->cs
);
3233 return STATUS_SUCCESS
;
3236 static void CALLBACK
rtl_wait_callback( TP_CALLBACK_INSTANCE
*instance
, void *userdata
, TP_WAIT
*wait
, TP_WAIT_RESULT result
)
3238 struct threadpool_object
*object
= impl_from_TP_WAIT(wait
);
3239 object
->u
.wait
.rtl_callback( userdata
, result
!= STATUS_WAIT_0
);
3242 /***********************************************************************
3243 * RtlRegisterWait (NTDLL.@)
3245 * Registers a wait for a handle to become signaled.
3248 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
3249 * Object [I] Object to wait to become signaled.
3250 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
3251 * Context [I] Context to pass to the callback function when it is executed.
3252 * Milliseconds [I] Number of milliseconds to wait before timing out.
3253 * Flags [I] Flags. See notes.
3256 * Success: STATUS_SUCCESS.
3257 * Failure: Any NTSTATUS code.
3260 * Flags can be one or more of the following:
3261 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
3262 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
3263 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
3264 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
3265 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
3267 NTSTATUS WINAPI
RtlRegisterWait( HANDLE
*out
, HANDLE handle
, RTL_WAITORTIMERCALLBACKFUNC callback
,
3268 void *context
, ULONG milliseconds
, ULONG flags
)
3270 struct threadpool_object
*object
;
3271 TP_CALLBACK_ENVIRON environment
;
3272 LARGE_INTEGER timeout
;
3276 TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags %lx\n",
3277 out
, handle
, callback
, context
, milliseconds
, flags
);
3279 memset( &environment
, 0, sizeof(environment
) );
3280 environment
.Version
= 1;
3281 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
3282 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
3284 flags
&= (WT_EXECUTEONLYONCE
| WT_EXECUTEINWAITTHREAD
| WT_EXECUTEINIOTHREAD
);
3285 if ((status
= tp_alloc_wait( &wait
, rtl_wait_callback
, context
, &environment
, flags
)))
3288 object
= impl_from_TP_WAIT(wait
);
3289 object
->u
.wait
.rtl_callback
= callback
;
3291 RtlEnterCriticalSection( &waitqueue
.cs
);
3292 TpSetWait( (TP_WAIT
*)object
, handle
, get_nt_timeout( &timeout
, milliseconds
) );
3295 RtlLeaveCriticalSection( &waitqueue
.cs
);
3297 return STATUS_SUCCESS
;
3300 /***********************************************************************
3301 * RtlDeregisterWaitEx (NTDLL.@)
3303 * Cancels a wait operation and frees the resources associated with calling
3304 * RtlRegisterWait().
3307 * WaitObject [I] Handle to the wait object to free.
3310 * Success: STATUS_SUCCESS.
3311 * Failure: Any NTSTATUS code.
3313 NTSTATUS WINAPI
RtlDeregisterWaitEx( HANDLE handle
, HANDLE event
)
3315 struct threadpool_object
*object
= handle
;
3318 TRACE( "handle %p, event %p\n", handle
, event
);
3320 if (!object
) return STATUS_INVALID_HANDLE
;
3322 TpSetWait( (TP_WAIT
*)object
, NULL
, NULL
);
3324 if (event
== INVALID_HANDLE_VALUE
) TpWaitForWait( (TP_WAIT
*)object
, TRUE
);
3327 assert( object
->completed_event
== NULL
);
3328 object
->completed_event
= event
;
3331 RtlEnterCriticalSection( &object
->pool
->cs
);
3332 if (object
->num_pending_callbacks
+ object
->num_running_callbacks
3333 + object
->num_associated_callbacks
) status
= STATUS_PENDING
;
3334 else status
= STATUS_SUCCESS
;
3335 RtlLeaveCriticalSection( &object
->pool
->cs
);
3337 TpReleaseWait( (TP_WAIT
*)object
);
3341 /***********************************************************************
3342 * RtlDeregisterWait (NTDLL.@)
3344 * Cancels a wait operation and frees the resources associated with calling
3345 * RtlRegisterWait().
3348 * WaitObject [I] Handle to the wait object to free.
3351 * Success: STATUS_SUCCESS.
3352 * Failure: Any NTSTATUS code.
3354 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
3356 return RtlDeregisterWaitEx(WaitHandle
, NULL
);