4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2016 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #define NONAMELESSUNION
28 #define WIN32_NO_STATUS
31 #include "wine/debug.h"
32 #include "wine/list.h"
34 #include "ntdll_misc.h"
36 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
39 * Old thread pooling API
44 PRTL_WORK_ITEM_ROUTINE function
;
48 #define EXPIRE_NEVER (~(ULONGLONG)0)
49 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
51 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
;
56 RTL_CRITICAL_SECTION threadpool_compl_cs
;
60 NULL
, /* compl_port */
61 { &critsect_compl_debug
, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
64 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
66 0, 0, &old_threadpool
.threadpool_compl_cs
,
67 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
68 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
75 WAITORTIMERCALLBACK Callback
;
79 HANDLE CompletionEvent
;
81 int CallbackInProgress
;
87 struct timer_queue
*q
;
89 ULONG runcount
; /* number of callbacks pending execution */
90 RTL_WAITORTIMERCALLBACKFUNC callback
;
95 BOOL destroy
; /* timer should be deleted; once set, never unset */
96 HANDLE event
; /* removal event */
102 RTL_CRITICAL_SECTION cs
;
103 struct list timers
; /* sorted by expiration time */
104 BOOL quit
; /* queue should be deleted; once set, never unset */
110 * Object-oriented thread pooling API
113 #define THREADPOOL_WORKER_TIMEOUT 5000
114 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
116 /* internal threadpool representation */
123 /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
124 struct list pools
[3];
125 RTL_CONDITION_VARIABLE update_event
;
126 /* information about worker threads, locked via .cs */
130 int num_busy_workers
;
132 TP_POOL_STACK_INFORMATION stack_info
;
135 enum threadpool_objtype
137 TP_OBJECT_TYPE_SIMPLE
,
139 TP_OBJECT_TYPE_TIMER
,
146 IO_STATUS_BLOCK iosb
;
150 /* internal threadpool object representation */
151 struct threadpool_object
153 void *win32_callback
; /* leave space for kernelbase to store win32 callback */
156 /* read-only information */
157 enum threadpool_objtype type
;
158 struct threadpool
*pool
;
159 struct threadpool_group
*group
;
161 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
;
162 PTP_SIMPLE_CALLBACK finalization_callback
;
165 TP_CALLBACK_PRIORITY priority
;
166 /* information about the group, locked via .group->cs */
167 struct list group_entry
;
168 BOOL is_group_member
;
169 /* information about the pool, locked via .pool->cs */
170 struct list pool_entry
;
171 RTL_CONDITION_VARIABLE finished_event
;
172 RTL_CONDITION_VARIABLE group_finished_event
;
173 LONG num_pending_callbacks
;
174 LONG num_running_callbacks
;
175 LONG num_associated_callbacks
;
176 /* arguments for callback */
181 PTP_SIMPLE_CALLBACK callback
;
185 PTP_WORK_CALLBACK callback
;
189 PTP_TIMER_CALLBACK callback
;
190 /* information about the timer, locked via timerqueue.cs */
191 BOOL timer_initialized
;
193 struct list timer_entry
;
201 PTP_WAIT_CALLBACK callback
;
203 /* information about the wait object, locked via waitqueue.cs */
204 struct waitqueue_bucket
*bucket
;
206 struct list wait_entry
;
212 PTP_IO_CALLBACK callback
;
213 /* locked via .pool->cs */
214 unsigned int pending_count
, completion_count
, completion_max
;
215 struct io_completion
*completions
;
220 /* internal threadpool instance representation */
221 struct threadpool_instance
223 struct threadpool_object
*object
;
229 CRITICAL_SECTION
*critical_section
;
232 LONG semaphore_count
;
238 /* internal threadpool group representation */
239 struct threadpool_group
244 /* list of group members, locked via .cs */
248 /* global timerqueue object */
249 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
;
256 struct list pending_timers
;
257 RTL_CONDITION_VARIABLE update_event
;
261 { &timerqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
263 FALSE
, /* thread_running */
264 LIST_INIT( timerqueue
.pending_timers
), /* pending_timers */
265 RTL_CONDITION_VARIABLE_INIT
/* update_event */
268 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
=
270 0, 0, &timerqueue
.cs
,
271 { &timerqueue_debug
.ProcessLocksList
, &timerqueue_debug
.ProcessLocksList
},
272 0, 0, { (DWORD_PTR
)(__FILE__
": timerqueue.cs") }
275 /* global waitqueue object */
276 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
;
286 { &waitqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
288 LIST_INIT( waitqueue
.buckets
) /* buckets */
291 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
=
294 { &waitqueue_debug
.ProcessLocksList
, &waitqueue_debug
.ProcessLocksList
},
295 0, 0, { (DWORD_PTR
)(__FILE__
": waitqueue.cs") }
298 struct waitqueue_bucket
300 struct list bucket_entry
;
302 struct list reserved
;
307 /* global I/O completion queue object */
308 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
;
316 RTL_CONDITION_VARIABLE update_event
;
320 .cs
= { &ioqueue_debug
, -1, 0, 0, 0, 0 },
323 static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug
=
326 { &ioqueue_debug
.ProcessLocksList
, &ioqueue_debug
.ProcessLocksList
},
327 0, 0, { (DWORD_PTR
)(__FILE__
": ioqueue.cs") }
330 static inline struct threadpool
*impl_from_TP_POOL( TP_POOL
*pool
)
332 return (struct threadpool
*)pool
;
335 static inline struct threadpool_object
*impl_from_TP_WORK( TP_WORK
*work
)
337 struct threadpool_object
*object
= (struct threadpool_object
*)work
;
338 assert( object
->type
== TP_OBJECT_TYPE_WORK
);
342 static inline struct threadpool_object
*impl_from_TP_TIMER( TP_TIMER
*timer
)
344 struct threadpool_object
*object
= (struct threadpool_object
*)timer
;
345 assert( object
->type
== TP_OBJECT_TYPE_TIMER
);
349 static inline struct threadpool_object
*impl_from_TP_WAIT( TP_WAIT
*wait
)
351 struct threadpool_object
*object
= (struct threadpool_object
*)wait
;
352 assert( object
->type
== TP_OBJECT_TYPE_WAIT
);
356 static inline struct threadpool_object
*impl_from_TP_IO( TP_IO
*io
)
358 struct threadpool_object
*object
= (struct threadpool_object
*)io
;
359 assert( object
->type
== TP_OBJECT_TYPE_IO
);
363 static inline struct threadpool_group
*impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group
)
365 return (struct threadpool_group
*)group
;
368 static inline struct threadpool_instance
*impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE
*instance
)
370 return (struct threadpool_instance
*)instance
;
373 static void CALLBACK
threadpool_worker_proc( void *param
);
374 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
);
375 static void tp_object_prepare_shutdown( struct threadpool_object
*object
);
376 static BOOL
tp_object_release( struct threadpool_object
*object
);
377 static struct threadpool
*default_threadpool
= NULL
;
379 static BOOL
array_reserve(void **elements
, unsigned int *capacity
, unsigned int count
, unsigned int size
)
381 unsigned int new_capacity
, max_capacity
;
384 if (count
<= *capacity
)
387 max_capacity
= ~(SIZE_T
)0 / size
;
388 if (count
> max_capacity
)
391 new_capacity
= max(4, *capacity
);
392 while (new_capacity
< count
&& new_capacity
<= max_capacity
/ 2)
394 if (new_capacity
< count
)
395 new_capacity
= max_capacity
;
397 if (!(new_elements
= RtlReAllocateHeap( GetProcessHeap(), 0, *elements
, new_capacity
* size
)))
400 *elements
= new_elements
;
401 *capacity
= new_capacity
;
406 static void CALLBACK
process_rtl_work_item( TP_CALLBACK_INSTANCE
*instance
, void *userdata
)
408 struct rtl_work_item
*item
= userdata
;
410 TRACE("executing %p(%p)\n", item
->function
, item
->context
);
411 item
->function( item
->context
);
413 RtlFreeHeap( GetProcessHeap(), 0, item
);
416 /***********************************************************************
417 * RtlQueueWorkItem (NTDLL.@)
419 * Queues a work item into a thread in the thread pool.
422 * function [I] Work function to execute.
423 * context [I] Context to pass to the work function when it is executed.
424 * flags [I] Flags. See notes.
427 * Success: STATUS_SUCCESS.
428 * Failure: Any NTSTATUS code.
431 * Flags can be one or more of the following:
432 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
433 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
434 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
435 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
436 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
438 NTSTATUS WINAPI
RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function
, PVOID context
, ULONG flags
)
440 TP_CALLBACK_ENVIRON environment
;
441 struct rtl_work_item
*item
;
444 TRACE( "%p %p %u\n", function
, context
, flags
);
446 item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item
) );
448 return STATUS_NO_MEMORY
;
450 memset( &environment
, 0, sizeof(environment
) );
451 environment
.Version
= 1;
452 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
453 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
455 item
->function
= function
;
456 item
->context
= context
;
458 status
= TpSimpleTryPost( process_rtl_work_item
, item
, &environment
);
459 if (status
) RtlFreeHeap( GetProcessHeap(), 0, item
);
463 /***********************************************************************
464 * iocp_poller - get completion events and run callbacks
466 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
472 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
474 IO_STATUS_BLOCK iosb
;
475 NTSTATUS res
= NtRemoveIoCompletion( cport
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
478 ERR("NtRemoveIoCompletion failed: 0x%x\n", res
);
482 DWORD transferred
= 0;
485 if (iosb
.u
.Status
== STATUS_SUCCESS
)
486 transferred
= iosb
.Information
;
488 err
= RtlNtStatusToDosError(iosb
.u
.Status
);
490 callback( err
, transferred
, overlapped
);
496 /***********************************************************************
497 * RtlSetIoCompletionCallback (NTDLL.@)
499 * Binds a handle to a thread pool's completion port, and possibly
500 * starts a non-I/O thread to monitor this port and call functions back.
503 * FileHandle [I] Handle to bind to a completion port.
504 * Function [I] Callback function to call on I/O completions.
505 * Flags [I] Not used.
508 * Success: STATUS_SUCCESS.
509 * Failure: Any NTSTATUS code.
512 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
514 IO_STATUS_BLOCK iosb
;
515 FILE_COMPLETION_INFORMATION info
;
517 if (Flags
) FIXME("Unknown value Flags=0x%x\n", Flags
);
519 if (!old_threadpool
.compl_port
)
521 NTSTATUS res
= STATUS_SUCCESS
;
523 RtlEnterCriticalSection(&old_threadpool
.threadpool_compl_cs
);
524 if (!old_threadpool
.compl_port
)
528 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
531 /* FIXME native can start additional threads in case of e.g. hung callback function. */
532 res
= RtlQueueWorkItem( iocp_poller
, cport
, WT_EXECUTEDEFAULT
);
534 old_threadpool
.compl_port
= cport
;
539 RtlLeaveCriticalSection(&old_threadpool
.threadpool_compl_cs
);
543 info
.CompletionPort
= old_threadpool
.compl_port
;
544 info
.CompletionKey
= (ULONG_PTR
)Function
;
546 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
549 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
551 if (timeout
== INFINITE
) return NULL
;
552 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
556 static void delete_wait_work_item(struct wait_work_item
*wait_work_item
)
558 NtClose( wait_work_item
->CancelEvent
);
559 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
562 static DWORD CALLBACK
wait_thread_proc(LPVOID Arg
)
564 struct wait_work_item
*wait_work_item
= Arg
;
566 BOOLEAN alertable
= (wait_work_item
->Flags
& WT_EXECUTEINIOTHREAD
) != 0;
567 HANDLE handles
[2] = { wait_work_item
->Object
, wait_work_item
->CancelEvent
};
568 LARGE_INTEGER timeout
;
569 HANDLE completion_event
;
575 status
= NtWaitForMultipleObjects( 2, handles
, TRUE
, alertable
,
576 get_nt_timeout( &timeout
, wait_work_item
->Milliseconds
) );
577 if (status
== STATUS_WAIT_0
|| status
== STATUS_TIMEOUT
)
579 BOOLEAN TimerOrWaitFired
;
581 if (status
== STATUS_WAIT_0
)
583 TRACE( "object %p signaled, calling callback %p with context %p\n",
584 wait_work_item
->Object
, wait_work_item
->Callback
,
585 wait_work_item
->Context
);
586 TimerOrWaitFired
= FALSE
;
590 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
591 wait_work_item
->Object
, wait_work_item
->Callback
,
592 wait_work_item
->Context
);
593 TimerOrWaitFired
= TRUE
;
595 InterlockedExchange( &wait_work_item
->CallbackInProgress
, TRUE
);
596 if (wait_work_item
->CompletionEvent
)
598 TRACE( "Work has been canceled.\n" );
601 wait_work_item
->Callback( wait_work_item
->Context
, TimerOrWaitFired
);
602 InterlockedExchange( &wait_work_item
->CallbackInProgress
, FALSE
);
604 if (wait_work_item
->Flags
& WT_EXECUTEONLYONCE
)
607 else if (status
!= STATUS_USER_APC
)
612 if (InterlockedIncrement( &wait_work_item
->DeleteCount
) == 2 )
614 completion_event
= wait_work_item
->CompletionEvent
;
615 delete_wait_work_item( wait_work_item
);
616 if (completion_event
&& completion_event
!= INVALID_HANDLE_VALUE
)
617 NtSetEvent( completion_event
, NULL
);
623 /***********************************************************************
624 * RtlRegisterWait (NTDLL.@)
626 * Registers a wait for a handle to become signaled.
629 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
630 * Object [I] Object to wait to become signaled.
631 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
632 * Context [I] Context to pass to the callback function when it is executed.
633 * Milliseconds [I] Number of milliseconds to wait before timing out.
634 * Flags [I] Flags. See notes.
637 * Success: STATUS_SUCCESS.
638 * Failure: Any NTSTATUS code.
641 * Flags can be one or more of the following:
642 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
643 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
644 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
645 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
646 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
648 NTSTATUS WINAPI
RtlRegisterWait(PHANDLE NewWaitObject
, HANDLE Object
,
649 RTL_WAITORTIMERCALLBACKFUNC Callback
,
650 PVOID Context
, ULONG Milliseconds
, ULONG Flags
)
652 struct wait_work_item
*wait_work_item
;
655 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject
, Object
, Callback
, Context
, Milliseconds
, Flags
);
657 wait_work_item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item
) );
659 return STATUS_NO_MEMORY
;
661 wait_work_item
->Object
= Object
;
662 wait_work_item
->Callback
= Callback
;
663 wait_work_item
->Context
= Context
;
664 wait_work_item
->Milliseconds
= Milliseconds
;
665 wait_work_item
->Flags
= Flags
;
666 wait_work_item
->CallbackInProgress
= FALSE
;
667 wait_work_item
->DeleteCount
= 0;
668 wait_work_item
->CompletionEvent
= NULL
;
670 status
= NtCreateEvent( &wait_work_item
->CancelEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
671 if (status
!= STATUS_SUCCESS
)
673 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
677 Flags
= Flags
& (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
|
678 WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
);
679 status
= RtlQueueWorkItem( wait_thread_proc
, wait_work_item
, Flags
);
680 if (status
!= STATUS_SUCCESS
)
682 delete_wait_work_item( wait_work_item
);
686 *NewWaitObject
= wait_work_item
;
690 /***********************************************************************
691 * RtlDeregisterWaitEx (NTDLL.@)
693 * Cancels a wait operation and frees the resources associated with calling
697 * WaitObject [I] Handle to the wait object to free.
700 * Success: STATUS_SUCCESS.
701 * Failure: Any NTSTATUS code.
703 NTSTATUS WINAPI
RtlDeregisterWaitEx(HANDLE WaitHandle
, HANDLE CompletionEvent
)
705 struct wait_work_item
*wait_work_item
= WaitHandle
;
707 HANDLE LocalEvent
= NULL
;
708 int CallbackInProgress
;
710 TRACE( "(%p %p)\n", WaitHandle
, CompletionEvent
);
712 if (WaitHandle
== NULL
)
713 return STATUS_INVALID_HANDLE
;
715 InterlockedExchangePointer( &wait_work_item
->CompletionEvent
, INVALID_HANDLE_VALUE
);
716 CallbackInProgress
= wait_work_item
->CallbackInProgress
;
717 TRACE( "callback in progress %u\n", CallbackInProgress
);
718 if (CompletionEvent
== INVALID_HANDLE_VALUE
|| !CallbackInProgress
)
720 status
= NtCreateEvent( &LocalEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
721 if (status
!= STATUS_SUCCESS
)
723 InterlockedExchangePointer( &wait_work_item
->CompletionEvent
, LocalEvent
);
725 else if (CompletionEvent
!= NULL
)
727 InterlockedExchangePointer( &wait_work_item
->CompletionEvent
, CompletionEvent
);
730 NtSetEvent( wait_work_item
->CancelEvent
, NULL
);
732 if (InterlockedIncrement( &wait_work_item
->DeleteCount
) == 2 )
734 status
= STATUS_SUCCESS
;
735 delete_wait_work_item( wait_work_item
);
739 TRACE( "Waiting for completion event\n" );
740 NtWaitForSingleObject( LocalEvent
, FALSE
, NULL
);
741 status
= STATUS_SUCCESS
;
745 status
= STATUS_PENDING
;
749 NtClose( LocalEvent
);
754 /***********************************************************************
755 * RtlDeregisterWait (NTDLL.@)
757 * Cancels a wait operation and frees the resources associated with calling
761 * WaitObject [I] Handle to the wait object to free.
764 * Success: STATUS_SUCCESS.
765 * Failure: Any NTSTATUS code.
767 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
769 return RtlDeregisterWaitEx(WaitHandle
, NULL
);
773 /************************** Timer Queue Impl **************************/
775 static void queue_remove_timer(struct queue_timer
*t
)
777 /* We MUST hold the queue cs while calling this function. This ensures
778 that we cannot queue another callback for this timer. The runcount
779 being zero makes sure we don't have any already queued. */
780 struct timer_queue
*q
= t
->q
;
782 assert(t
->runcount
== 0);
785 list_remove(&t
->entry
);
787 NtSetEvent(t
->event
, NULL
);
788 RtlFreeHeap(GetProcessHeap(), 0, t
);
790 if (q
->quit
&& list_empty(&q
->timers
))
791 NtSetEvent(q
->event
, NULL
);
794 static void timer_cleanup_callback(struct queue_timer
*t
)
796 struct timer_queue
*q
= t
->q
;
797 RtlEnterCriticalSection(&q
->cs
);
799 assert(0 < t
->runcount
);
802 if (t
->destroy
&& t
->runcount
== 0)
803 queue_remove_timer(t
);
805 RtlLeaveCriticalSection(&q
->cs
);
808 static DWORD WINAPI
timer_callback_wrapper(LPVOID p
)
810 struct queue_timer
*t
= p
;
811 t
->callback(t
->param
, TRUE
);
812 timer_cleanup_callback(t
);
816 static inline ULONGLONG
queue_current_time(void)
818 LARGE_INTEGER now
, freq
;
819 NtQueryPerformanceCounter(&now
, &freq
);
820 return now
.QuadPart
* 1000 / freq
.QuadPart
;
823 static void queue_add_timer(struct queue_timer
*t
, ULONGLONG time
,
826 /* We MUST hold the queue cs while calling this function. */
827 struct timer_queue
*q
= t
->q
;
828 struct list
*ptr
= &q
->timers
;
830 assert(!q
->quit
|| (t
->destroy
&& time
== EXPIRE_NEVER
));
832 if (time
!= EXPIRE_NEVER
)
833 LIST_FOR_EACH(ptr
, &q
->timers
)
835 struct queue_timer
*cur
= LIST_ENTRY(ptr
, struct queue_timer
, entry
);
836 if (time
< cur
->expire
)
839 list_add_before(ptr
, &t
->entry
);
843 /* If we insert at the head of the list, we need to expire sooner
845 if (set_event
&& &t
->entry
== list_head(&q
->timers
))
846 NtSetEvent(q
->event
, NULL
);
849 static inline void queue_move_timer(struct queue_timer
*t
, ULONGLONG time
,
852 /* We MUST hold the queue cs while calling this function. */
853 list_remove(&t
->entry
);
854 queue_add_timer(t
, time
, set_event
);
857 static void queue_timer_expire(struct timer_queue
*q
)
859 struct queue_timer
*t
= NULL
;
861 RtlEnterCriticalSection(&q
->cs
);
862 if (list_head(&q
->timers
))
865 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
866 if (!t
->destroy
&& t
->expire
<= ((now
= queue_current_time())))
871 next
= t
->expire
+ t
->period
;
872 /* avoid trigger cascade if overloaded / hibernated */
874 next
= now
+ t
->period
;
878 queue_move_timer(t
, next
, FALSE
);
883 RtlLeaveCriticalSection(&q
->cs
);
887 if (t
->flags
& WT_EXECUTEINTIMERTHREAD
)
888 timer_callback_wrapper(t
);
893 & (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
894 | WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
));
895 NTSTATUS status
= RtlQueueWorkItem(timer_callback_wrapper
, t
, flags
);
896 if (status
!= STATUS_SUCCESS
)
897 timer_cleanup_callback(t
);
902 static ULONG
queue_get_timeout(struct timer_queue
*q
)
904 struct queue_timer
*t
;
905 ULONG timeout
= INFINITE
;
907 RtlEnterCriticalSection(&q
->cs
);
908 if (list_head(&q
->timers
))
910 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
911 assert(!t
->destroy
|| t
->expire
== EXPIRE_NEVER
);
913 if (t
->expire
!= EXPIRE_NEVER
)
915 ULONGLONG time
= queue_current_time();
916 timeout
= t
->expire
< time
? 0 : t
->expire
- time
;
919 RtlLeaveCriticalSection(&q
->cs
);
924 static void WINAPI
timer_queue_thread_proc(LPVOID p
)
926 struct timer_queue
*q
= p
;
929 timeout_ms
= INFINITE
;
932 LARGE_INTEGER timeout
;
936 status
= NtWaitForSingleObject(
937 q
->event
, FALSE
, get_nt_timeout(&timeout
, timeout_ms
));
939 if (status
== STATUS_WAIT_0
)
941 /* There are two possible ways to trigger the event. Either
942 we are quitting and the last timer got removed, or a new
943 timer got put at the head of the list so we need to adjust
945 RtlEnterCriticalSection(&q
->cs
);
946 if (q
->quit
&& list_empty(&q
->timers
))
948 RtlLeaveCriticalSection(&q
->cs
);
950 else if (status
== STATUS_TIMEOUT
)
951 queue_timer_expire(q
);
956 timeout_ms
= queue_get_timeout(q
);
960 RtlDeleteCriticalSection(&q
->cs
);
962 RtlFreeHeap(GetProcessHeap(), 0, q
);
963 RtlExitUserThread( 0 );
966 static void queue_destroy_timer(struct queue_timer
*t
)
968 /* We MUST hold the queue cs while calling this function. */
970 if (t
->runcount
== 0)
971 /* Ensure a timer is promptly removed. If callbacks are pending,
972 it will be removed after the last one finishes by the callback
974 queue_remove_timer(t
);
976 /* Make sure no destroyed timer masks an active timer at the head
977 of the sorted list. */
978 queue_move_timer(t
, EXPIRE_NEVER
, FALSE
);
981 /***********************************************************************
982 * RtlCreateTimerQueue (NTDLL.@)
984 * Creates a timer queue object and returns a handle to it.
987 * NewTimerQueue [O] The newly created queue.
990 * Success: STATUS_SUCCESS.
991 * Failure: Any NTSTATUS code.
993 NTSTATUS WINAPI
RtlCreateTimerQueue(PHANDLE NewTimerQueue
)
996 struct timer_queue
*q
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q
);
998 return STATUS_NO_MEMORY
;
1000 RtlInitializeCriticalSection(&q
->cs
);
1001 list_init(&q
->timers
);
1003 q
->magic
= TIMER_QUEUE_MAGIC
;
1004 status
= NtCreateEvent(&q
->event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1005 if (status
!= STATUS_SUCCESS
)
1007 RtlFreeHeap(GetProcessHeap(), 0, q
);
1010 status
= RtlCreateUserThread(GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1011 timer_queue_thread_proc
, q
, &q
->thread
, NULL
);
1012 if (status
!= STATUS_SUCCESS
)
1015 RtlFreeHeap(GetProcessHeap(), 0, q
);
1020 return STATUS_SUCCESS
;
1023 /***********************************************************************
1024 * RtlDeleteTimerQueueEx (NTDLL.@)
1026 * Deletes a timer queue object.
1029 * TimerQueue [I] The timer queue to destroy.
1030 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1031 * wait until all timers are finished firing before
1032 * returning. Otherwise, return immediately and set the
1033 * event when all timers are done.
1036 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
1037 * Failure: Any NTSTATUS code.
1039 NTSTATUS WINAPI
RtlDeleteTimerQueueEx(HANDLE TimerQueue
, HANDLE CompletionEvent
)
1041 struct timer_queue
*q
= TimerQueue
;
1042 struct queue_timer
*t
, *temp
;
1046 if (!q
|| q
->magic
!= TIMER_QUEUE_MAGIC
)
1047 return STATUS_INVALID_HANDLE
;
1051 RtlEnterCriticalSection(&q
->cs
);
1053 if (list_head(&q
->timers
))
1054 /* When the last timer is removed, it will signal the timer thread to
1056 LIST_FOR_EACH_ENTRY_SAFE(t
, temp
, &q
->timers
, struct queue_timer
, entry
)
1057 queue_destroy_timer(t
);
1059 /* However if we have none, we must do it ourselves. */
1060 NtSetEvent(q
->event
, NULL
);
1061 RtlLeaveCriticalSection(&q
->cs
);
1063 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1065 NtWaitForSingleObject(thread
, FALSE
, NULL
);
1066 status
= STATUS_SUCCESS
;
1070 if (CompletionEvent
)
1072 FIXME("asynchronous return on completion event unimplemented\n");
1073 NtWaitForSingleObject(thread
, FALSE
, NULL
);
1074 NtSetEvent(CompletionEvent
, NULL
);
1076 status
= STATUS_PENDING
;
1083 static struct timer_queue
*get_timer_queue(HANDLE TimerQueue
)
1085 static struct timer_queue
*default_timer_queue
;
1091 if (!default_timer_queue
)
1094 NTSTATUS status
= RtlCreateTimerQueue(&q
);
1095 if (status
== STATUS_SUCCESS
)
1097 PVOID p
= InterlockedCompareExchangePointer( (void **) &default_timer_queue
, q
, NULL
);
1099 /* Got beat to the punch. */
1100 RtlDeleteTimerQueueEx(q
, NULL
);
1103 return default_timer_queue
;
1107 /***********************************************************************
1108 * RtlCreateTimer (NTDLL.@)
1110 * Creates a new timer associated with the given queue.
1113 * NewTimer [O] The newly created timer.
1114 * TimerQueue [I] The queue to hold the timer.
1115 * Callback [I] The callback to fire.
1116 * Parameter [I] The argument for the callback.
1117 * DueTime [I] The delay, in milliseconds, before first firing the
1119 * Period [I] The period, in milliseconds, at which to fire the timer
1120 * after the first callback. If zero, the timer will only
1121 * fire once. It still needs to be deleted with
1123 * Flags [I] Flags controlling the execution of the callback. In
1124 * addition to the WT_* thread pool flags (see
1125 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1126 * WT_EXECUTEONLYONCE are supported.
1129 * Success: STATUS_SUCCESS.
1130 * Failure: Any NTSTATUS code.
1132 NTSTATUS WINAPI
RtlCreateTimer(PHANDLE NewTimer
, HANDLE TimerQueue
,
1133 RTL_WAITORTIMERCALLBACKFUNC Callback
,
1134 PVOID Parameter
, DWORD DueTime
, DWORD Period
,
1138 struct queue_timer
*t
;
1139 struct timer_queue
*q
= get_timer_queue(TimerQueue
);
1141 if (!q
) return STATUS_NO_MEMORY
;
1142 if (q
->magic
!= TIMER_QUEUE_MAGIC
) return STATUS_INVALID_HANDLE
;
1144 t
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t
);
1146 return STATUS_NO_MEMORY
;
1150 t
->callback
= Callback
;
1151 t
->param
= Parameter
;
1157 status
= STATUS_SUCCESS
;
1158 RtlEnterCriticalSection(&q
->cs
);
1160 status
= STATUS_INVALID_HANDLE
;
1162 queue_add_timer(t
, queue_current_time() + DueTime
, TRUE
);
1163 RtlLeaveCriticalSection(&q
->cs
);
1165 if (status
== STATUS_SUCCESS
)
1168 RtlFreeHeap(GetProcessHeap(), 0, t
);
1173 /***********************************************************************
1174 * RtlUpdateTimer (NTDLL.@)
1176 * Changes the time at which a timer expires.
1179 * TimerQueue [I] The queue that holds the timer.
1180 * Timer [I] The timer to update.
1181 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1182 * Period [I] The period, in milliseconds, at which to fire the timer
1183 * after the first callback. If zero, the timer will not
1184 * refire once. It still needs to be deleted with
1188 * Success: STATUS_SUCCESS.
1189 * Failure: Any NTSTATUS code.
1191 NTSTATUS WINAPI
RtlUpdateTimer(HANDLE TimerQueue
, HANDLE Timer
,
1192 DWORD DueTime
, DWORD Period
)
1194 struct queue_timer
*t
= Timer
;
1195 struct timer_queue
*q
= t
->q
;
1197 RtlEnterCriticalSection(&q
->cs
);
1198 /* Can't change a timer if it was once-only or destroyed. */
1199 if (t
->expire
!= EXPIRE_NEVER
)
1202 queue_move_timer(t
, queue_current_time() + DueTime
, TRUE
);
1204 RtlLeaveCriticalSection(&q
->cs
);
1206 return STATUS_SUCCESS
;
1209 /***********************************************************************
1210 * RtlDeleteTimer (NTDLL.@)
1212 * Cancels a timer-queue timer.
1215 * TimerQueue [I] The queue that holds the timer.
1216 * Timer [I] The timer to update.
1217 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1218 * wait until the timer is finished firing all pending
1219 * callbacks before returning. Otherwise, return
1220 * immediately and set the timer is done.
1223 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1224 or if the completion event is NULL.
1225 * Failure: Any NTSTATUS code.
1227 NTSTATUS WINAPI
RtlDeleteTimer(HANDLE TimerQueue
, HANDLE Timer
,
1228 HANDLE CompletionEvent
)
1230 struct queue_timer
*t
= Timer
;
1231 struct timer_queue
*q
;
1232 NTSTATUS status
= STATUS_PENDING
;
1233 HANDLE event
= NULL
;
1236 return STATUS_INVALID_PARAMETER_1
;
1238 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1240 status
= NtCreateEvent(&event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1241 if (status
== STATUS_SUCCESS
)
1242 status
= STATUS_PENDING
;
1244 else if (CompletionEvent
)
1245 event
= CompletionEvent
;
1247 RtlEnterCriticalSection(&q
->cs
);
1249 if (t
->runcount
== 0 && event
)
1250 status
= STATUS_SUCCESS
;
1251 queue_destroy_timer(t
);
1252 RtlLeaveCriticalSection(&q
->cs
);
1254 if (CompletionEvent
== INVALID_HANDLE_VALUE
&& event
)
1256 if (status
== STATUS_PENDING
)
1258 NtWaitForSingleObject(event
, FALSE
, NULL
);
1259 status
= STATUS_SUCCESS
;
1267 /***********************************************************************
1268 * timerqueue_thread_proc (internal)
1270 static void CALLBACK
timerqueue_thread_proc( void *param
)
1272 ULONGLONG timeout_lower
, timeout_upper
, new_timeout
;
1273 struct threadpool_object
*other_timer
;
1274 LARGE_INTEGER now
, timeout
;
1277 TRACE( "starting timer queue thread\n" );
1279 RtlEnterCriticalSection( &timerqueue
.cs
);
1282 NtQuerySystemTime( &now
);
1284 /* Check for expired timers. */
1285 while ((ptr
= list_head( &timerqueue
.pending_timers
)))
1287 struct threadpool_object
*timer
= LIST_ENTRY( ptr
, struct threadpool_object
, u
.timer
.timer_entry
);
1288 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1289 assert( timer
->u
.timer
.timer_pending
);
1290 if (timer
->u
.timer
.timeout
> now
.QuadPart
)
1293 /* Queue a new callback in one of the worker threads. */
1294 list_remove( &timer
->u
.timer
.timer_entry
);
1295 timer
->u
.timer
.timer_pending
= FALSE
;
1296 tp_object_submit( timer
, FALSE
);
1298 /* Insert the timer back into the queue, except it's marked for shutdown. */
1299 if (timer
->u
.timer
.period
&& !timer
->shutdown
)
1301 timer
->u
.timer
.timeout
+= (ULONGLONG
)timer
->u
.timer
.period
* 10000;
1302 if (timer
->u
.timer
.timeout
<= now
.QuadPart
)
1303 timer
->u
.timer
.timeout
= now
.QuadPart
+ 1;
1305 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1306 struct threadpool_object
, u
.timer
.timer_entry
)
1308 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1309 if (timer
->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
1312 list_add_before( &other_timer
->u
.timer
.timer_entry
, &timer
->u
.timer
.timer_entry
);
1313 timer
->u
.timer
.timer_pending
= TRUE
;
1317 timeout_lower
= TIMEOUT_INFINITE
;
1318 timeout_upper
= TIMEOUT_INFINITE
;
1320 /* Determine next timeout and use the window length to optimize wakeup times. */
1321 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1322 struct threadpool_object
, u
.timer
.timer_entry
)
1324 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1325 if (other_timer
->u
.timer
.timeout
>= timeout_upper
)
1328 timeout_lower
= other_timer
->u
.timer
.timeout
;
1329 new_timeout
= timeout_lower
+ (ULONGLONG
)other_timer
->u
.timer
.window_length
* 10000;
1330 if (new_timeout
< timeout_upper
)
1331 timeout_upper
= new_timeout
;
1334 /* Wait for timer update events or until the next timer expires. */
1335 if (timerqueue
.objcount
)
1337 timeout
.QuadPart
= timeout_lower
;
1338 RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
, &timeout
);
1342 /* All timers have been destroyed, if no new timers are created
1343 * within some amount of time, then we can shutdown this thread. */
1344 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1345 if (RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
,
1346 &timeout
) == STATUS_TIMEOUT
&& !timerqueue
.objcount
)
1352 timerqueue
.thread_running
= FALSE
;
1353 RtlLeaveCriticalSection( &timerqueue
.cs
);
1355 TRACE( "terminating timer queue thread\n" );
1356 RtlExitUserThread( 0 );
1359 /***********************************************************************
1360 * tp_new_worker_thread (internal)
1362 * Create and account a new worker thread for the desired pool.
1364 static NTSTATUS
tp_new_worker_thread( struct threadpool
*pool
)
1369 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1370 threadpool_worker_proc
, pool
, &thread
, NULL
);
1371 if (status
== STATUS_SUCCESS
)
1373 InterlockedIncrement( &pool
->refcount
);
1374 pool
->num_workers
++;
1380 /***********************************************************************
1381 * tp_timerqueue_lock (internal)
1383 * Acquires a lock on the global timerqueue. When the lock is acquired
1384 * successfully, it is guaranteed that the timer thread is running.
1386 static NTSTATUS
tp_timerqueue_lock( struct threadpool_object
*timer
)
1388 NTSTATUS status
= STATUS_SUCCESS
;
1389 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1391 timer
->u
.timer
.timer_initialized
= FALSE
;
1392 timer
->u
.timer
.timer_pending
= FALSE
;
1393 timer
->u
.timer
.timer_set
= FALSE
;
1394 timer
->u
.timer
.timeout
= 0;
1395 timer
->u
.timer
.period
= 0;
1396 timer
->u
.timer
.window_length
= 0;
1398 RtlEnterCriticalSection( &timerqueue
.cs
);
1400 /* Make sure that the timerqueue thread is running. */
1401 if (!timerqueue
.thread_running
)
1404 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1405 timerqueue_thread_proc
, NULL
, &thread
, NULL
);
1406 if (status
== STATUS_SUCCESS
)
1408 timerqueue
.thread_running
= TRUE
;
1413 if (status
== STATUS_SUCCESS
)
1415 timer
->u
.timer
.timer_initialized
= TRUE
;
1416 timerqueue
.objcount
++;
1419 RtlLeaveCriticalSection( &timerqueue
.cs
);
1423 /***********************************************************************
1424 * tp_timerqueue_unlock (internal)
1426 * Releases a lock on the global timerqueue.
1428 static void tp_timerqueue_unlock( struct threadpool_object
*timer
)
1430 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1432 RtlEnterCriticalSection( &timerqueue
.cs
);
1433 if (timer
->u
.timer
.timer_initialized
)
1435 /* If timer was pending, remove it. */
1436 if (timer
->u
.timer
.timer_pending
)
1438 list_remove( &timer
->u
.timer
.timer_entry
);
1439 timer
->u
.timer
.timer_pending
= FALSE
;
1442 /* If the last timer object was destroyed, then wake up the thread. */
1443 if (!--timerqueue
.objcount
)
1445 assert( list_empty( &timerqueue
.pending_timers
) );
1446 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
1449 timer
->u
.timer
.timer_initialized
= FALSE
;
1451 RtlLeaveCriticalSection( &timerqueue
.cs
);
1454 /***********************************************************************
1455 * waitqueue_thread_proc (internal)
1457 static void CALLBACK
waitqueue_thread_proc( void *param
)
1459 struct threadpool_object
*objects
[MAXIMUM_WAITQUEUE_OBJECTS
];
1460 HANDLE handles
[MAXIMUM_WAITQUEUE_OBJECTS
+ 1];
1461 struct waitqueue_bucket
*bucket
= param
;
1462 struct threadpool_object
*wait
, *next
;
1463 LARGE_INTEGER now
, timeout
;
1467 TRACE( "starting wait queue thread\n" );
1469 RtlEnterCriticalSection( &waitqueue
.cs
);
1473 NtQuerySystemTime( &now
);
1474 timeout
.QuadPart
= TIMEOUT_INFINITE
;
1477 LIST_FOR_EACH_ENTRY_SAFE( wait
, next
, &bucket
->waiting
, struct threadpool_object
,
1480 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1481 if (wait
->u
.wait
.timeout
<= now
.QuadPart
)
1483 /* Wait object timed out. */
1484 list_remove( &wait
->u
.wait
.wait_entry
);
1485 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1486 tp_object_submit( wait
, FALSE
);
1490 if (wait
->u
.wait
.timeout
< timeout
.QuadPart
)
1491 timeout
.QuadPart
= wait
->u
.wait
.timeout
;
1493 assert( num_handles
< MAXIMUM_WAITQUEUE_OBJECTS
);
1494 InterlockedIncrement( &wait
->refcount
);
1495 objects
[num_handles
] = wait
;
1496 handles
[num_handles
] = wait
->u
.wait
.handle
;
1501 if (!bucket
->objcount
)
1503 /* All wait objects have been destroyed, if no new wait objects are created
1504 * within some amount of time, then we can shutdown this thread. */
1505 assert( num_handles
== 0 );
1506 RtlLeaveCriticalSection( &waitqueue
.cs
);
1507 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1508 status
= NtWaitForMultipleObjects( 1, &bucket
->update_event
, TRUE
, FALSE
, &timeout
);
1509 RtlEnterCriticalSection( &waitqueue
.cs
);
1511 if (status
== STATUS_TIMEOUT
&& !bucket
->objcount
)
1516 handles
[num_handles
] = bucket
->update_event
;
1517 RtlLeaveCriticalSection( &waitqueue
.cs
);
1518 status
= NtWaitForMultipleObjects( num_handles
+ 1, handles
, TRUE
, FALSE
, &timeout
);
1519 RtlEnterCriticalSection( &waitqueue
.cs
);
1521 if (status
>= STATUS_WAIT_0
&& status
< STATUS_WAIT_0
+ num_handles
)
1523 wait
= objects
[status
- STATUS_WAIT_0
];
1524 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1525 if (wait
->u
.wait
.bucket
)
1527 /* Wait object signaled. */
1528 assert( wait
->u
.wait
.bucket
== bucket
);
1529 list_remove( &wait
->u
.wait
.wait_entry
);
1530 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1531 tp_object_submit( wait
, TRUE
);
1534 WARN("wait object %p triggered while object was destroyed\n", wait
);
1537 /* Release temporary references to wait objects. */
1540 wait
= objects
[--num_handles
];
1541 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1542 tp_object_release( wait
);
1546 /* Try to merge bucket with other threads. */
1547 if (waitqueue
.num_buckets
> 1 && bucket
->objcount
&&
1548 bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 1 / 3)
1550 struct waitqueue_bucket
*other_bucket
;
1551 LIST_FOR_EACH_ENTRY( other_bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1553 if (other_bucket
!= bucket
&& other_bucket
->objcount
&&
1554 other_bucket
->objcount
+ bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 2 / 3)
1556 other_bucket
->objcount
+= bucket
->objcount
;
1557 bucket
->objcount
= 0;
1559 /* Update reserved list. */
1560 LIST_FOR_EACH_ENTRY( wait
, &bucket
->reserved
, struct threadpool_object
, u
.wait
.wait_entry
)
1562 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1563 wait
->u
.wait
.bucket
= other_bucket
;
1565 list_move_tail( &other_bucket
->reserved
, &bucket
->reserved
);
1567 /* Update waiting list. */
1568 LIST_FOR_EACH_ENTRY( wait
, &bucket
->waiting
, struct threadpool_object
, u
.wait
.wait_entry
)
1570 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1571 wait
->u
.wait
.bucket
= other_bucket
;
1573 list_move_tail( &other_bucket
->waiting
, &bucket
->waiting
);
1575 /* Move bucket to the end, to keep the probability of
1576 * newly added wait objects as small as possible. */
1577 list_remove( &bucket
->bucket_entry
);
1578 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1580 NtSetEvent( other_bucket
->update_event
, NULL
);
1587 /* Remove this bucket from the list. */
1588 list_remove( &bucket
->bucket_entry
);
1589 if (!--waitqueue
.num_buckets
)
1590 assert( list_empty( &waitqueue
.buckets
) );
1592 RtlLeaveCriticalSection( &waitqueue
.cs
);
1594 TRACE( "terminating wait queue thread\n" );
1596 assert( bucket
->objcount
== 0 );
1597 assert( list_empty( &bucket
->reserved
) );
1598 assert( list_empty( &bucket
->waiting
) );
1599 NtClose( bucket
->update_event
);
1601 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1602 RtlExitUserThread( 0 );
1605 /***********************************************************************
1606 * tp_waitqueue_lock (internal)
1608 static NTSTATUS
tp_waitqueue_lock( struct threadpool_object
*wait
)
1610 struct waitqueue_bucket
*bucket
;
1613 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1615 wait
->u
.wait
.signaled
= 0;
1616 wait
->u
.wait
.bucket
= NULL
;
1617 wait
->u
.wait
.wait_pending
= FALSE
;
1618 wait
->u
.wait
.timeout
= 0;
1619 wait
->u
.wait
.handle
= INVALID_HANDLE_VALUE
;
1621 RtlEnterCriticalSection( &waitqueue
.cs
);
1623 /* Try to assign to existing bucket if possible. */
1624 LIST_FOR_EACH_ENTRY( bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1626 if (bucket
->objcount
< MAXIMUM_WAITQUEUE_OBJECTS
)
1628 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1629 wait
->u
.wait
.bucket
= bucket
;
1632 status
= STATUS_SUCCESS
;
1637 /* Create a new bucket and corresponding worker thread. */
1638 bucket
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket
) );
1641 status
= STATUS_NO_MEMORY
;
1645 bucket
->objcount
= 0;
1646 list_init( &bucket
->reserved
);
1647 list_init( &bucket
->waiting
);
1649 status
= NtCreateEvent( &bucket
->update_event
, EVENT_ALL_ACCESS
,
1650 NULL
, SynchronizationEvent
, FALSE
);
1653 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1657 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1658 waitqueue_thread_proc
, bucket
, &thread
, NULL
);
1659 if (status
== STATUS_SUCCESS
)
1661 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1662 waitqueue
.num_buckets
++;
1664 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1665 wait
->u
.wait
.bucket
= bucket
;
1672 NtClose( bucket
->update_event
);
1673 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1677 RtlLeaveCriticalSection( &waitqueue
.cs
);
1681 /***********************************************************************
1682 * tp_waitqueue_unlock (internal)
1684 static void tp_waitqueue_unlock( struct threadpool_object
*wait
)
1686 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1688 RtlEnterCriticalSection( &waitqueue
.cs
);
1689 if (wait
->u
.wait
.bucket
)
1691 struct waitqueue_bucket
*bucket
= wait
->u
.wait
.bucket
;
1692 assert( bucket
->objcount
> 0 );
1694 list_remove( &wait
->u
.wait
.wait_entry
);
1695 wait
->u
.wait
.bucket
= NULL
;
1698 NtSetEvent( bucket
->update_event
, NULL
);
1700 RtlLeaveCriticalSection( &waitqueue
.cs
);
1703 static void CALLBACK
ioqueue_thread_proc( void *param
)
1705 struct io_completion
*completion
;
1706 struct threadpool_object
*io
;
1707 IO_STATUS_BLOCK iosb
;
1708 ULONG_PTR key
, value
;
1711 TRACE( "starting I/O completion thread\n" );
1713 RtlEnterCriticalSection( &ioqueue
.cs
);
1717 RtlLeaveCriticalSection( &ioqueue
.cs
);
1718 if ((status
= NtRemoveIoCompletion( ioqueue
.port
, &key
, &value
, &iosb
, NULL
)))
1719 ERR("NtRemoveIoCompletion failed, status %#x.\n", status
);
1720 RtlEnterCriticalSection( &ioqueue
.cs
);
1724 io
= (struct threadpool_object
*)key
;
1726 RtlEnterCriticalSection( &io
->pool
->cs
);
1728 if (!array_reserve((void **)&io
->u
.io
.completions
, &io
->u
.io
.completion_max
,
1729 io
->u
.io
.completion_count
+ 1, sizeof(*io
->u
.io
.completions
)))
1731 ERR("Failed to allocate memory.\n");
1732 RtlLeaveCriticalSection( &io
->pool
->cs
);
1736 completion
= &io
->u
.io
.completions
[io
->u
.io
.completion_count
++];
1737 completion
->iosb
= iosb
;
1738 completion
->cvalue
= value
;
1740 tp_object_submit( io
, FALSE
);
1742 RtlLeaveCriticalSection( &io
->pool
->cs
);
1745 if (!ioqueue
.objcount
)
1747 /* All I/O objects have been destroyed; if no new objects are
1748 * created within some amount of time, then we can shutdown this
1750 LARGE_INTEGER timeout
= {.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000};
1751 if (RtlSleepConditionVariableCS( &ioqueue
.update_event
, &ioqueue
.cs
,
1752 &timeout
) == STATUS_TIMEOUT
&& !ioqueue
.objcount
)
1757 RtlLeaveCriticalSection( &ioqueue
.cs
);
1759 TRACE( "terminating I/O completion thread\n" );
1761 RtlExitUserThread( 0 );
1764 static NTSTATUS
tp_ioqueue_lock( struct threadpool_object
*io
, HANDLE file
)
1766 NTSTATUS status
= STATUS_SUCCESS
;
1768 assert( io
->type
== TP_OBJECT_TYPE_IO
);
1770 RtlEnterCriticalSection( &ioqueue
.cs
);
1772 if (!ioqueue
.port
&& (status
= NtCreateIoCompletion( &ioqueue
.port
,
1773 IO_COMPLETION_ALL_ACCESS
, NULL
, 0 )))
1775 RtlLeaveCriticalSection( &ioqueue
.cs
);
1779 if (!ioqueue
.thread_running
)
1783 if (!(status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
,
1784 NULL
, 0, 0, ioqueue_thread_proc
, NULL
, &thread
, NULL
)))
1786 ioqueue
.thread_running
= TRUE
;
1791 if (status
== STATUS_SUCCESS
)
1793 FILE_COMPLETION_INFORMATION info
;
1794 IO_STATUS_BLOCK iosb
;
1796 info
.CompletionPort
= ioqueue
.port
;
1797 info
.CompletionKey
= (ULONG_PTR
)io
;
1799 status
= NtSetInformationFile( file
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
1802 if (status
== STATUS_SUCCESS
)
1804 if (!ioqueue
.objcount
++)
1805 RtlWakeConditionVariable( &ioqueue
.update_event
);
1808 RtlLeaveCriticalSection( &ioqueue
.cs
);
1812 static void tp_ioqueue_unlock( struct threadpool_object
*io
)
1814 assert( io
->type
== TP_OBJECT_TYPE_IO
);
1816 RtlEnterCriticalSection( &ioqueue
.cs
);
1818 if (!--ioqueue
.objcount
)
1819 NtSetIoCompletion( ioqueue
.port
, 0, 0, STATUS_SUCCESS
, 0 );
1821 RtlLeaveCriticalSection( &ioqueue
.cs
);
1824 /***********************************************************************
1825 * tp_threadpool_alloc (internal)
1827 * Allocates a new threadpool object.
1829 static NTSTATUS
tp_threadpool_alloc( struct threadpool
**out
)
1831 IMAGE_NT_HEADERS
*nt
= RtlImageNtHeader( NtCurrentTeb()->Peb
->ImageBaseAddress
);
1832 struct threadpool
*pool
;
1835 pool
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool
) );
1837 return STATUS_NO_MEMORY
;
1841 pool
->shutdown
= FALSE
;
1843 RtlInitializeCriticalSection( &pool
->cs
);
1844 pool
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool.cs");
1846 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1847 list_init( &pool
->pools
[i
] );
1848 RtlInitializeConditionVariable( &pool
->update_event
);
1850 pool
->max_workers
= 500;
1851 pool
->min_workers
= 0;
1852 pool
->num_workers
= 0;
1853 pool
->num_busy_workers
= 0;
1854 pool
->stack_info
.StackReserve
= nt
->OptionalHeader
.SizeOfStackReserve
;
1855 pool
->stack_info
.StackCommit
= nt
->OptionalHeader
.SizeOfStackCommit
;
1857 TRACE( "allocated threadpool %p\n", pool
);
1860 return STATUS_SUCCESS
;
1863 /***********************************************************************
1864 * tp_threadpool_shutdown (internal)
1866 * Prepares the shutdown of a threadpool object and notifies all worker
1867 * threads to terminate (after all remaining work items have been
1870 static void tp_threadpool_shutdown( struct threadpool
*pool
)
1872 assert( pool
!= default_threadpool
);
1874 pool
->shutdown
= TRUE
;
1875 RtlWakeAllConditionVariable( &pool
->update_event
);
1878 /***********************************************************************
1879 * tp_threadpool_release (internal)
1881 * Releases a reference to a threadpool object.
1883 static BOOL
tp_threadpool_release( struct threadpool
*pool
)
1887 if (InterlockedDecrement( &pool
->refcount
))
1890 TRACE( "destroying threadpool %p\n", pool
);
1892 assert( pool
->shutdown
);
1893 assert( !pool
->objcount
);
1894 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
1895 assert( list_empty( &pool
->pools
[i
] ) );
1897 pool
->cs
.DebugInfo
->Spare
[0] = 0;
1898 RtlDeleteCriticalSection( &pool
->cs
);
1900 RtlFreeHeap( GetProcessHeap(), 0, pool
);
1904 /***********************************************************************
1905 * tp_threadpool_lock (internal)
1907 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1908 * block. When the lock is acquired successfully, it is guaranteed that
1909 * there is at least one worker thread to process tasks.
1911 static NTSTATUS
tp_threadpool_lock( struct threadpool
**out
, TP_CALLBACK_ENVIRON
*environment
)
1913 struct threadpool
*pool
= NULL
;
1914 NTSTATUS status
= STATUS_SUCCESS
;
1918 /* Validate environment parameters. */
1919 if (environment
->Version
== 3)
1921 TP_CALLBACK_ENVIRON_V3
*environment3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
1923 switch (environment3
->CallbackPriority
)
1925 case TP_CALLBACK_PRIORITY_HIGH
:
1926 case TP_CALLBACK_PRIORITY_NORMAL
:
1927 case TP_CALLBACK_PRIORITY_LOW
:
1930 return STATUS_INVALID_PARAMETER
;
1934 pool
= (struct threadpool
*)environment
->Pool
;
1939 if (!default_threadpool
)
1941 status
= tp_threadpool_alloc( &pool
);
1942 if (status
!= STATUS_SUCCESS
)
1945 if (InterlockedCompareExchangePointer( (void *)&default_threadpool
, pool
, NULL
) != NULL
)
1947 tp_threadpool_shutdown( pool
);
1948 tp_threadpool_release( pool
);
1952 pool
= default_threadpool
;
1955 RtlEnterCriticalSection( &pool
->cs
);
1957 /* Make sure that the threadpool has at least one thread. */
1958 if (!pool
->num_workers
)
1959 status
= tp_new_worker_thread( pool
);
1961 /* Keep a reference, and increment objcount to ensure that the
1962 * last thread doesn't terminate. */
1963 if (status
== STATUS_SUCCESS
)
1965 InterlockedIncrement( &pool
->refcount
);
1969 RtlLeaveCriticalSection( &pool
->cs
);
1971 if (status
!= STATUS_SUCCESS
)
1975 return STATUS_SUCCESS
;
1978 /***********************************************************************
1979 * tp_threadpool_unlock (internal)
1981 * Releases a lock on a threadpool.
1983 static void tp_threadpool_unlock( struct threadpool
*pool
)
1985 RtlEnterCriticalSection( &pool
->cs
);
1987 RtlLeaveCriticalSection( &pool
->cs
);
1988 tp_threadpool_release( pool
);
1991 /***********************************************************************
1992 * tp_group_alloc (internal)
1994 * Allocates a new threadpool group object.
1996 static NTSTATUS
tp_group_alloc( struct threadpool_group
**out
)
1998 struct threadpool_group
*group
;
2000 group
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group
) );
2002 return STATUS_NO_MEMORY
;
2004 group
->refcount
= 1;
2005 group
->shutdown
= FALSE
;
2007 RtlInitializeCriticalSection( &group
->cs
);
2008 group
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool_group.cs");
2010 list_init( &group
->members
);
2012 TRACE( "allocated group %p\n", group
);
2015 return STATUS_SUCCESS
;
2018 /***********************************************************************
2019 * tp_group_shutdown (internal)
2021 * Marks the group object for shutdown.
2023 static void tp_group_shutdown( struct threadpool_group
*group
)
2025 group
->shutdown
= TRUE
;
2028 /***********************************************************************
2029 * tp_group_release (internal)
2031 * Releases a reference to a group object.
2033 static BOOL
tp_group_release( struct threadpool_group
*group
)
2035 if (InterlockedDecrement( &group
->refcount
))
2038 TRACE( "destroying group %p\n", group
);
2040 assert( group
->shutdown
);
2041 assert( list_empty( &group
->members
) );
2043 group
->cs
.DebugInfo
->Spare
[0] = 0;
2044 RtlDeleteCriticalSection( &group
->cs
);
2046 RtlFreeHeap( GetProcessHeap(), 0, group
);
2050 /***********************************************************************
2051 * tp_object_initialize (internal)
2053 * Initializes members of a threadpool object.
2055 static void tp_object_initialize( struct threadpool_object
*object
, struct threadpool
*pool
,
2056 PVOID userdata
, TP_CALLBACK_ENVIRON
*environment
)
2058 BOOL is_simple_callback
= (object
->type
== TP_OBJECT_TYPE_SIMPLE
);
2060 object
->refcount
= 1;
2061 object
->shutdown
= FALSE
;
2063 object
->pool
= pool
;
2064 object
->group
= NULL
;
2065 object
->userdata
= userdata
;
2066 object
->group_cancel_callback
= NULL
;
2067 object
->finalization_callback
= NULL
;
2068 object
->may_run_long
= 0;
2069 object
->race_dll
= NULL
;
2070 object
->priority
= TP_CALLBACK_PRIORITY_NORMAL
;
2072 memset( &object
->group_entry
, 0, sizeof(object
->group_entry
) );
2073 object
->is_group_member
= FALSE
;
2075 memset( &object
->pool_entry
, 0, sizeof(object
->pool_entry
) );
2076 RtlInitializeConditionVariable( &object
->finished_event
);
2077 RtlInitializeConditionVariable( &object
->group_finished_event
);
2078 object
->num_pending_callbacks
= 0;
2079 object
->num_running_callbacks
= 0;
2080 object
->num_associated_callbacks
= 0;
2084 if (environment
->Version
!= 1 && environment
->Version
!= 3)
2085 FIXME( "unsupported environment version %u\n", environment
->Version
);
2087 object
->group
= impl_from_TP_CLEANUP_GROUP( environment
->CleanupGroup
);
2088 object
->group_cancel_callback
= environment
->CleanupGroupCancelCallback
;
2089 object
->finalization_callback
= environment
->FinalizationCallback
;
2090 object
->may_run_long
= environment
->u
.s
.LongFunction
!= 0;
2091 object
->race_dll
= environment
->RaceDll
;
2092 if (environment
->Version
== 3)
2094 TP_CALLBACK_ENVIRON_V3
*environment_v3
= (TP_CALLBACK_ENVIRON_V3
*)environment
;
2096 object
->priority
= environment_v3
->CallbackPriority
;
2097 assert( object
->priority
< ARRAY_SIZE(pool
->pools
) );
2100 if (environment
->ActivationContext
)
2101 FIXME( "activation context not supported yet\n" );
2103 if (environment
->u
.s
.Persistent
)
2104 FIXME( "persistent threads not supported yet\n" );
2107 if (object
->race_dll
)
2108 LdrAddRefDll( 0, object
->race_dll
);
2110 TRACE( "allocated object %p of type %u\n", object
, object
->type
);
2112 /* For simple callbacks we have to run tp_object_submit before adding this object
2113 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
2114 * will be set, and tp_object_submit would fail with an assertion. */
2116 if (is_simple_callback
)
2117 tp_object_submit( object
, FALSE
);
2121 struct threadpool_group
*group
= object
->group
;
2122 InterlockedIncrement( &group
->refcount
);
2124 RtlEnterCriticalSection( &group
->cs
);
2125 list_add_tail( &group
->members
, &object
->group_entry
);
2126 object
->is_group_member
= TRUE
;
2127 RtlLeaveCriticalSection( &group
->cs
);
2130 if (is_simple_callback
)
2131 tp_object_release( object
);
2134 static void tp_object_prio_queue( struct threadpool_object
*object
)
2136 ++object
->pool
->num_busy_workers
;
2137 list_add_tail( &object
->pool
->pools
[object
->priority
], &object
->pool_entry
);
2140 /***********************************************************************
2141 * tp_object_submit (internal)
2143 * Submits a threadpool object to the associated threadpool. This
2144 * function has to be VOID because TpPostWork can never fail on Windows.
2146 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
)
2148 struct threadpool
*pool
= object
->pool
;
2149 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
2151 assert( !object
->shutdown
);
2152 assert( !pool
->shutdown
);
2154 RtlEnterCriticalSection( &pool
->cs
);
2156 /* Start new worker threads if required. */
2157 if (pool
->num_busy_workers
>= pool
->num_workers
&&
2158 pool
->num_workers
< pool
->max_workers
)
2159 status
= tp_new_worker_thread( pool
);
2161 /* Queue work item and increment refcount. */
2162 InterlockedIncrement( &object
->refcount
);
2163 if (!object
->num_pending_callbacks
++)
2164 tp_object_prio_queue( object
);
2166 /* Count how often the object was signaled. */
2167 if (object
->type
== TP_OBJECT_TYPE_WAIT
&& signaled
)
2168 object
->u
.wait
.signaled
++;
2170 /* No new thread started - wake up one existing thread. */
2171 if (status
!= STATUS_SUCCESS
)
2173 assert( pool
->num_workers
> 0 );
2174 RtlWakeConditionVariable( &pool
->update_event
);
2177 RtlLeaveCriticalSection( &pool
->cs
);
2180 /***********************************************************************
2181 * tp_object_cancel (internal)
2183 * Cancels all currently pending callbacks for a specific object.
2185 static void tp_object_cancel( struct threadpool_object
*object
)
2187 struct threadpool
*pool
= object
->pool
;
2188 LONG pending_callbacks
= 0;
2190 RtlEnterCriticalSection( &pool
->cs
);
2191 if (object
->num_pending_callbacks
)
2193 pending_callbacks
= object
->num_pending_callbacks
;
2194 object
->num_pending_callbacks
= 0;
2195 list_remove( &object
->pool_entry
);
2197 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2198 object
->u
.wait
.signaled
= 0;
2200 if (object
->type
== TP_OBJECT_TYPE_IO
)
2201 object
->u
.io
.pending_count
= 0;
2202 RtlLeaveCriticalSection( &pool
->cs
);
2204 while (pending_callbacks
--)
2205 tp_object_release( object
);
2208 static BOOL
object_is_finished( struct threadpool_object
*object
, BOOL group
)
2210 if (object
->num_pending_callbacks
)
2212 if (object
->type
== TP_OBJECT_TYPE_IO
&& object
->u
.io
.pending_count
)
2216 return !object
->num_running_callbacks
;
2218 return !object
->num_associated_callbacks
;
2221 /***********************************************************************
2222 * tp_object_wait (internal)
2224 * Waits until all pending and running callbacks of a specific object
2225 * have been processed.
2227 static void tp_object_wait( struct threadpool_object
*object
, BOOL group_wait
)
2229 struct threadpool
*pool
= object
->pool
;
2231 RtlEnterCriticalSection( &pool
->cs
);
2232 while (!object_is_finished( object
, group_wait
))
2235 RtlSleepConditionVariableCS( &object
->group_finished_event
, &pool
->cs
, NULL
);
2237 RtlSleepConditionVariableCS( &object
->finished_event
, &pool
->cs
, NULL
);
2239 RtlLeaveCriticalSection( &pool
->cs
);
2242 /***********************************************************************
2243 * tp_object_prepare_shutdown (internal)
2245 * Prepares a threadpool object for shutdown.
2247 static void tp_object_prepare_shutdown( struct threadpool_object
*object
)
2249 if (object
->type
== TP_OBJECT_TYPE_TIMER
)
2250 tp_timerqueue_unlock( object
);
2251 else if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2252 tp_waitqueue_unlock( object
);
2253 else if (object
->type
== TP_OBJECT_TYPE_IO
)
2254 tp_ioqueue_unlock( object
);
2257 /***********************************************************************
2258 * tp_object_release (internal)
2260 * Releases a reference to a threadpool object.
2262 static BOOL
tp_object_release( struct threadpool_object
*object
)
2264 if (InterlockedDecrement( &object
->refcount
))
2267 TRACE( "destroying object %p of type %u\n", object
, object
->type
);
2269 assert( object
->shutdown
);
2270 assert( !object
->num_pending_callbacks
);
2271 assert( !object
->num_running_callbacks
);
2272 assert( !object
->num_associated_callbacks
);
2274 /* release reference to the group */
2277 struct threadpool_group
*group
= object
->group
;
2279 RtlEnterCriticalSection( &group
->cs
);
2280 if (object
->is_group_member
)
2282 list_remove( &object
->group_entry
);
2283 object
->is_group_member
= FALSE
;
2285 RtlLeaveCriticalSection( &group
->cs
);
2287 tp_group_release( group
);
2290 tp_threadpool_unlock( object
->pool
);
2292 if (object
->race_dll
)
2293 LdrUnloadDll( object
->race_dll
);
2295 RtlFreeHeap( GetProcessHeap(), 0, object
);
2299 static struct list
*threadpool_get_next_item( const struct threadpool
*pool
)
2304 for (i
= 0; i
< ARRAY_SIZE(pool
->pools
); ++i
)
2306 if ((ptr
= list_head( &pool
->pools
[i
] )))
2313 /***********************************************************************
2314 * threadpool_worker_proc (internal)
2316 static void CALLBACK
threadpool_worker_proc( void *param
)
2318 TP_CALLBACK_INSTANCE
*callback_instance
;
2319 struct threadpool_instance instance
;
2320 struct io_completion completion
;
2321 struct threadpool
*pool
= param
;
2322 TP_WAIT_RESULT wait_result
= 0;
2323 LARGE_INTEGER timeout
;
2327 TRACE( "starting worker thread for pool %p\n", pool
);
2329 RtlEnterCriticalSection( &pool
->cs
);
2332 while ((ptr
= threadpool_get_next_item( pool
)))
2334 struct threadpool_object
*object
= LIST_ENTRY( ptr
, struct threadpool_object
, pool_entry
);
2335 assert( object
->num_pending_callbacks
> 0 );
2337 /* If further pending callbacks are queued, move the work item to
2338 * the end of the pool list. Otherwise remove it from the pool. */
2339 list_remove( &object
->pool_entry
);
2340 if (--object
->num_pending_callbacks
)
2341 tp_object_prio_queue( object
);
2343 /* For wait objects check if they were signaled or have timed out. */
2344 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2346 wait_result
= object
->u
.wait
.signaled
? WAIT_OBJECT_0
: WAIT_TIMEOUT
;
2347 if (wait_result
== WAIT_OBJECT_0
) object
->u
.wait
.signaled
--;
2349 else if (object
->type
== TP_OBJECT_TYPE_IO
)
2351 assert( object
->u
.io
.completion_count
);
2352 completion
= object
->u
.io
.completions
[--object
->u
.io
.completion_count
];
2353 object
->u
.io
.pending_count
--;
2356 /* Leave critical section and do the actual callback. */
2357 object
->num_associated_callbacks
++;
2358 object
->num_running_callbacks
++;
2359 RtlLeaveCriticalSection( &pool
->cs
);
2361 /* Initialize threadpool instance struct. */
2362 callback_instance
= (TP_CALLBACK_INSTANCE
*)&instance
;
2363 instance
.object
= object
;
2364 instance
.threadid
= GetCurrentThreadId();
2365 instance
.associated
= TRUE
;
2366 instance
.may_run_long
= object
->may_run_long
;
2367 instance
.cleanup
.critical_section
= NULL
;
2368 instance
.cleanup
.mutex
= NULL
;
2369 instance
.cleanup
.semaphore
= NULL
;
2370 instance
.cleanup
.semaphore_count
= 0;
2371 instance
.cleanup
.event
= NULL
;
2372 instance
.cleanup
.library
= NULL
;
2374 switch (object
->type
)
2376 case TP_OBJECT_TYPE_SIMPLE
:
2378 TRACE( "executing simple callback %p(%p, %p)\n",
2379 object
->u
.simple
.callback
, callback_instance
, object
->userdata
);
2380 object
->u
.simple
.callback( callback_instance
, object
->userdata
);
2381 TRACE( "callback %p returned\n", object
->u
.simple
.callback
);
2385 case TP_OBJECT_TYPE_WORK
:
2387 TRACE( "executing work callback %p(%p, %p, %p)\n",
2388 object
->u
.work
.callback
, callback_instance
, object
->userdata
, object
);
2389 object
->u
.work
.callback( callback_instance
, object
->userdata
, (TP_WORK
*)object
);
2390 TRACE( "callback %p returned\n", object
->u
.work
.callback
);
2394 case TP_OBJECT_TYPE_TIMER
:
2396 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2397 object
->u
.timer
.callback
, callback_instance
, object
->userdata
, object
);
2398 object
->u
.timer
.callback( callback_instance
, object
->userdata
, (TP_TIMER
*)object
);
2399 TRACE( "callback %p returned\n", object
->u
.timer
.callback
);
2403 case TP_OBJECT_TYPE_WAIT
:
2405 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2406 object
->u
.wait
.callback
, callback_instance
, object
->userdata
, object
, wait_result
);
2407 object
->u
.wait
.callback( callback_instance
, object
->userdata
, (TP_WAIT
*)object
, wait_result
);
2408 TRACE( "callback %p returned\n", object
->u
.wait
.callback
);
2412 case TP_OBJECT_TYPE_IO
:
2414 TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
2415 object
->u
.io
.callback
, callback_instance
, object
->userdata
,
2416 completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2417 object
->u
.io
.callback( callback_instance
, object
->userdata
,
2418 (void *)completion
.cvalue
, &completion
.iosb
, (TP_IO
*)object
);
2419 TRACE( "callback %p returned\n", object
->u
.io
.callback
);
2428 /* Execute finalization callback. */
2429 if (object
->finalization_callback
)
2431 TRACE( "executing finalization callback %p(%p, %p)\n",
2432 object
->finalization_callback
, callback_instance
, object
->userdata
);
2433 object
->finalization_callback( callback_instance
, object
->userdata
);
2434 TRACE( "callback %p returned\n", object
->finalization_callback
);
2437 /* Execute cleanup tasks. */
2438 if (instance
.cleanup
.critical_section
)
2440 RtlLeaveCriticalSection( instance
.cleanup
.critical_section
);
2442 if (instance
.cleanup
.mutex
)
2444 status
= NtReleaseMutant( instance
.cleanup
.mutex
, NULL
);
2445 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2447 if (instance
.cleanup
.semaphore
)
2449 status
= NtReleaseSemaphore( instance
.cleanup
.semaphore
, instance
.cleanup
.semaphore_count
, NULL
);
2450 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2452 if (instance
.cleanup
.event
)
2454 status
= NtSetEvent( instance
.cleanup
.event
, NULL
);
2455 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2457 if (instance
.cleanup
.library
)
2459 LdrUnloadDll( instance
.cleanup
.library
);
2463 RtlEnterCriticalSection( &pool
->cs
);
2464 assert(pool
->num_busy_workers
);
2465 pool
->num_busy_workers
--;
2467 /* Simple callbacks are automatically shutdown after execution. */
2468 if (object
->type
== TP_OBJECT_TYPE_SIMPLE
)
2470 tp_object_prepare_shutdown( object
);
2471 object
->shutdown
= TRUE
;
2474 object
->num_running_callbacks
--;
2475 if (object_is_finished( object
, TRUE
))
2476 RtlWakeAllConditionVariable( &object
->group_finished_event
);
2478 if (instance
.associated
)
2480 object
->num_associated_callbacks
--;
2481 if (object_is_finished( object
, FALSE
))
2482 RtlWakeAllConditionVariable( &object
->finished_event
);
2485 tp_object_release( object
);
2488 /* Shutdown worker thread if requested. */
2492 /* Wait for new tasks or until the timeout expires. A thread only terminates
2493 * when no new tasks are available, and the number of threads can be
2494 * decreased without violating the min_workers limit. An exception is when
2495 * min_workers == 0, then objcount is used to detect if the last thread
2496 * can be terminated. */
2497 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
2498 if (RtlSleepConditionVariableCS( &pool
->update_event
, &pool
->cs
, &timeout
) == STATUS_TIMEOUT
&&
2499 !threadpool_get_next_item( pool
) && (pool
->num_workers
> max( pool
->min_workers
, 1 ) ||
2500 (!pool
->min_workers
&& !pool
->objcount
)))
2505 pool
->num_workers
--;
2506 RtlLeaveCriticalSection( &pool
->cs
);
2508 TRACE( "terminating worker thread for pool %p\n", pool
);
2509 tp_threadpool_release( pool
);
2510 RtlExitUserThread( 0 );
2513 /***********************************************************************
2514 * TpAllocCleanupGroup (NTDLL.@)
2516 NTSTATUS WINAPI
TpAllocCleanupGroup( TP_CLEANUP_GROUP
**out
)
2518 TRACE( "%p\n", out
);
2520 return tp_group_alloc( (struct threadpool_group
**)out
);
2523 /***********************************************************************
2524 * TpAllocIoCompletion (NTDLL.@)
2526 NTSTATUS WINAPI
TpAllocIoCompletion( TP_IO
**out
, HANDLE file
, PTP_IO_CALLBACK callback
,
2527 void *userdata
, TP_CALLBACK_ENVIRON
*environment
)
2529 struct threadpool_object
*object
;
2530 struct threadpool
*pool
;
2533 TRACE( "%p %p %p %p %p\n", out
, file
, callback
, userdata
, environment
);
2535 if (!(object
= RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY
, sizeof(*object
) )))
2536 return STATUS_NO_MEMORY
;
2538 if ((status
= tp_threadpool_lock( &pool
, environment
)))
2540 RtlFreeHeap( GetProcessHeap(), 0, object
);
2544 object
->type
= TP_OBJECT_TYPE_IO
;
2545 object
->u
.io
.callback
= callback
;
2546 if (!(object
->u
.io
.completions
= RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object
->u
.io
.completions
) )))
2548 tp_threadpool_unlock( pool
);
2549 RtlFreeHeap( GetProcessHeap(), 0, object
);
2553 if ((status
= tp_ioqueue_lock( object
, file
)))
2555 tp_threadpool_unlock( pool
);
2556 RtlFreeHeap( GetProcessHeap(), 0, object
->u
.io
.completions
);
2557 RtlFreeHeap( GetProcessHeap(), 0, object
);
2561 tp_object_initialize( object
, pool
, userdata
, environment
);
2563 *out
= (TP_IO
*)object
;
2564 return STATUS_SUCCESS
;
2567 /***********************************************************************
2568 * TpAllocPool (NTDLL.@)
2570 NTSTATUS WINAPI
TpAllocPool( TP_POOL
**out
, PVOID reserved
)
2572 TRACE( "%p %p\n", out
, reserved
);
2575 FIXME( "reserved argument is nonzero (%p)\n", reserved
);
2577 return tp_threadpool_alloc( (struct threadpool
**)out
);
2580 /***********************************************************************
2581 * TpAllocTimer (NTDLL.@)
2583 NTSTATUS WINAPI
TpAllocTimer( TP_TIMER
**out
, PTP_TIMER_CALLBACK callback
, PVOID userdata
,
2584 TP_CALLBACK_ENVIRON
*environment
)
2586 struct threadpool_object
*object
;
2587 struct threadpool
*pool
;
2590 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2592 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2594 return STATUS_NO_MEMORY
;
2596 status
= tp_threadpool_lock( &pool
, environment
);
2599 RtlFreeHeap( GetProcessHeap(), 0, object
);
2603 object
->type
= TP_OBJECT_TYPE_TIMER
;
2604 object
->u
.timer
.callback
= callback
;
2606 status
= tp_timerqueue_lock( object
);
2609 tp_threadpool_unlock( pool
);
2610 RtlFreeHeap( GetProcessHeap(), 0, object
);
2614 tp_object_initialize( object
, pool
, userdata
, environment
);
2616 *out
= (TP_TIMER
*)object
;
2617 return STATUS_SUCCESS
;
2620 /***********************************************************************
2621 * TpAllocWait (NTDLL.@)
2623 NTSTATUS WINAPI
TpAllocWait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2624 TP_CALLBACK_ENVIRON
*environment
)
2626 struct threadpool_object
*object
;
2627 struct threadpool
*pool
;
2630 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2632 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2634 return STATUS_NO_MEMORY
;
2636 status
= tp_threadpool_lock( &pool
, environment
);
2639 RtlFreeHeap( GetProcessHeap(), 0, object
);
2643 object
->type
= TP_OBJECT_TYPE_WAIT
;
2644 object
->u
.wait
.callback
= callback
;
2646 status
= tp_waitqueue_lock( object
);
2649 tp_threadpool_unlock( pool
);
2650 RtlFreeHeap( GetProcessHeap(), 0, object
);
2654 tp_object_initialize( object
, pool
, userdata
, environment
);
2656 *out
= (TP_WAIT
*)object
;
2657 return STATUS_SUCCESS
;
2660 /***********************************************************************
2661 * TpAllocWork (NTDLL.@)
2663 NTSTATUS WINAPI
TpAllocWork( TP_WORK
**out
, PTP_WORK_CALLBACK callback
, PVOID userdata
,
2664 TP_CALLBACK_ENVIRON
*environment
)
2666 struct threadpool_object
*object
;
2667 struct threadpool
*pool
;
2670 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2672 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2674 return STATUS_NO_MEMORY
;
2676 status
= tp_threadpool_lock( &pool
, environment
);
2679 RtlFreeHeap( GetProcessHeap(), 0, object
);
2683 object
->type
= TP_OBJECT_TYPE_WORK
;
2684 object
->u
.work
.callback
= callback
;
2685 tp_object_initialize( object
, pool
, userdata
, environment
);
2687 *out
= (TP_WORK
*)object
;
2688 return STATUS_SUCCESS
;
2691 /***********************************************************************
2692 * TpCancelAsyncIoOperation (NTDLL.@)
2694 void WINAPI
TpCancelAsyncIoOperation( TP_IO
*io
)
2696 struct threadpool_object
*this = impl_from_TP_IO( io
);
2698 TRACE( "%p\n", io
);
2700 RtlEnterCriticalSection( &this->pool
->cs
);
2702 this->u
.io
.pending_count
--;
2703 if (object_is_finished( this, TRUE
))
2704 RtlWakeAllConditionVariable( &this->group_finished_event
);
2705 if (object_is_finished( this, FALSE
))
2706 RtlWakeAllConditionVariable( &this->finished_event
);
2708 RtlLeaveCriticalSection( &this->pool
->cs
);
2711 /***********************************************************************
2712 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2714 VOID WINAPI
TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE
*instance
, CRITICAL_SECTION
*crit
)
2716 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2718 TRACE( "%p %p\n", instance
, crit
);
2720 if (!this->cleanup
.critical_section
)
2721 this->cleanup
.critical_section
= crit
;
2724 /***********************************************************************
2725 * TpCallbackMayRunLong (NTDLL.@)
2727 NTSTATUS WINAPI
TpCallbackMayRunLong( TP_CALLBACK_INSTANCE
*instance
)
2729 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2730 struct threadpool_object
*object
= this->object
;
2731 struct threadpool
*pool
;
2732 NTSTATUS status
= STATUS_SUCCESS
;
2734 TRACE( "%p\n", instance
);
2736 if (this->threadid
!= GetCurrentThreadId())
2738 ERR("called from wrong thread, ignoring\n");
2739 return STATUS_UNSUCCESSFUL
; /* FIXME */
2742 if (this->may_run_long
)
2743 return STATUS_SUCCESS
;
2745 pool
= object
->pool
;
2746 RtlEnterCriticalSection( &pool
->cs
);
2748 /* Start new worker threads if required. */
2749 if (pool
->num_busy_workers
>= pool
->num_workers
)
2751 if (pool
->num_workers
< pool
->max_workers
)
2753 status
= tp_new_worker_thread( pool
);
2757 status
= STATUS_TOO_MANY_THREADS
;
2761 RtlLeaveCriticalSection( &pool
->cs
);
2762 this->may_run_long
= TRUE
;
2766 /***********************************************************************
2767 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2769 VOID WINAPI
TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE mutex
)
2771 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2773 TRACE( "%p %p\n", instance
, mutex
);
2775 if (!this->cleanup
.mutex
)
2776 this->cleanup
.mutex
= mutex
;
2779 /***********************************************************************
2780 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2782 VOID WINAPI
TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE semaphore
, DWORD count
)
2784 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2786 TRACE( "%p %p %u\n", instance
, semaphore
, count
);
2788 if (!this->cleanup
.semaphore
)
2790 this->cleanup
.semaphore
= semaphore
;
2791 this->cleanup
.semaphore_count
= count
;
2795 /***********************************************************************
2796 * TpCallbackSetEventOnCompletion (NTDLL.@)
2798 VOID WINAPI
TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE event
)
2800 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2802 TRACE( "%p %p\n", instance
, event
);
2804 if (!this->cleanup
.event
)
2805 this->cleanup
.event
= event
;
2808 /***********************************************************************
2809 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2811 VOID WINAPI
TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HMODULE module
)
2813 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2815 TRACE( "%p %p\n", instance
, module
);
2817 if (!this->cleanup
.library
)
2818 this->cleanup
.library
= module
;
2821 /***********************************************************************
2822 * TpDisassociateCallback (NTDLL.@)
2824 VOID WINAPI
TpDisassociateCallback( TP_CALLBACK_INSTANCE
*instance
)
2826 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2827 struct threadpool_object
*object
= this->object
;
2828 struct threadpool
*pool
;
2830 TRACE( "%p\n", instance
);
2832 if (this->threadid
!= GetCurrentThreadId())
2834 ERR("called from wrong thread, ignoring\n");
2838 if (!this->associated
)
2841 pool
= object
->pool
;
2842 RtlEnterCriticalSection( &pool
->cs
);
2844 object
->num_associated_callbacks
--;
2845 if (object_is_finished( object
, FALSE
))
2846 RtlWakeAllConditionVariable( &object
->finished_event
);
2848 RtlLeaveCriticalSection( &pool
->cs
);
2849 this->associated
= FALSE
;
2852 /***********************************************************************
2853 * TpIsTimerSet (NTDLL.@)
2855 BOOL WINAPI
TpIsTimerSet( TP_TIMER
*timer
)
2857 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2859 TRACE( "%p\n", timer
);
2861 return this->u
.timer
.timer_set
;
2864 /***********************************************************************
2865 * TpPostWork (NTDLL.@)
2867 VOID WINAPI
TpPostWork( TP_WORK
*work
)
2869 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2871 TRACE( "%p\n", work
);
2873 tp_object_submit( this, FALSE
);
2876 /***********************************************************************
2877 * TpReleaseCleanupGroup (NTDLL.@)
2879 VOID WINAPI
TpReleaseCleanupGroup( TP_CLEANUP_GROUP
*group
)
2881 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2883 TRACE( "%p\n", group
);
2885 tp_group_shutdown( this );
2886 tp_group_release( this );
2889 /***********************************************************************
2890 * TpReleaseCleanupGroupMembers (NTDLL.@)
2892 VOID WINAPI
TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP
*group
, BOOL cancel_pending
, PVOID userdata
)
2894 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2895 struct threadpool_object
*object
, *next
;
2896 struct list members
;
2898 TRACE( "%p %u %p\n", group
, cancel_pending
, userdata
);
2900 RtlEnterCriticalSection( &this->cs
);
2902 /* Unset group, increase references, and mark objects for shutdown */
2903 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &this->members
, struct threadpool_object
, group_entry
)
2905 assert( object
->group
== this );
2906 assert( object
->is_group_member
);
2908 if (InterlockedIncrement( &object
->refcount
) == 1)
2910 /* Object is basically already destroyed, but group reference
2911 * was not deleted yet. We can safely ignore this object. */
2912 InterlockedDecrement( &object
->refcount
);
2913 list_remove( &object
->group_entry
);
2914 object
->is_group_member
= FALSE
;
2918 object
->is_group_member
= FALSE
;
2919 tp_object_prepare_shutdown( object
);
2922 /* Move members to a new temporary list */
2923 list_init( &members
);
2924 list_move_tail( &members
, &this->members
);
2926 RtlLeaveCriticalSection( &this->cs
);
2928 /* Cancel pending callbacks if requested */
2931 LIST_FOR_EACH_ENTRY( object
, &members
, struct threadpool_object
, group_entry
)
2933 tp_object_cancel( object
);
2937 /* Wait for remaining callbacks to finish */
2938 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &members
, struct threadpool_object
, group_entry
)
2940 tp_object_wait( object
, TRUE
);
2942 if (!object
->shutdown
)
2944 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
2945 if (cancel_pending
&& object
->group_cancel_callback
)
2947 TRACE( "executing group cancel callback %p(%p, %p)\n",
2948 object
->group_cancel_callback
, object
->userdata
, userdata
);
2949 object
->group_cancel_callback( object
->userdata
, userdata
);
2950 TRACE( "callback %p returned\n", object
->group_cancel_callback
);
2953 if (object
->type
!= TP_OBJECT_TYPE_SIMPLE
)
2954 tp_object_release( object
);
2957 object
->shutdown
= TRUE
;
2958 tp_object_release( object
);
2962 /***********************************************************************
2963 * TpReleaseIoCompletion (NTDLL.@)
2965 void WINAPI
TpReleaseIoCompletion( TP_IO
*io
)
2967 struct threadpool_object
*this = impl_from_TP_IO( io
);
2969 TRACE( "%p\n", io
);
2971 tp_object_prepare_shutdown( this );
2972 this->shutdown
= TRUE
;
2973 tp_object_release( this );
2976 /***********************************************************************
2977 * TpReleasePool (NTDLL.@)
2979 VOID WINAPI
TpReleasePool( TP_POOL
*pool
)
2981 struct threadpool
*this = impl_from_TP_POOL( pool
);
2983 TRACE( "%p\n", pool
);
2985 tp_threadpool_shutdown( this );
2986 tp_threadpool_release( this );
2989 /***********************************************************************
2990 * TpReleaseTimer (NTDLL.@)
2992 VOID WINAPI
TpReleaseTimer( TP_TIMER
*timer
)
2994 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2996 TRACE( "%p\n", timer
);
2998 tp_object_prepare_shutdown( this );
2999 this->shutdown
= TRUE
;
3000 tp_object_release( this );
3003 /***********************************************************************
3004 * TpReleaseWait (NTDLL.@)
3006 VOID WINAPI
TpReleaseWait( TP_WAIT
*wait
)
3008 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
3010 TRACE( "%p\n", wait
);
3012 tp_object_prepare_shutdown( this );
3013 this->shutdown
= TRUE
;
3014 tp_object_release( this );
3017 /***********************************************************************
3018 * TpReleaseWork (NTDLL.@)
3020 VOID WINAPI
TpReleaseWork( TP_WORK
*work
)
3022 struct threadpool_object
*this = impl_from_TP_WORK( work
);
3024 TRACE( "%p\n", work
);
3026 tp_object_prepare_shutdown( this );
3027 this->shutdown
= TRUE
;
3028 tp_object_release( this );
3031 /***********************************************************************
3032 * TpSetPoolMaxThreads (NTDLL.@)
3034 VOID WINAPI
TpSetPoolMaxThreads( TP_POOL
*pool
, DWORD maximum
)
3036 struct threadpool
*this = impl_from_TP_POOL( pool
);
3038 TRACE( "%p %u\n", pool
, maximum
);
3040 RtlEnterCriticalSection( &this->cs
);
3041 this->max_workers
= max( maximum
, 1 );
3042 this->min_workers
= min( this->min_workers
, this->max_workers
);
3043 RtlLeaveCriticalSection( &this->cs
);
3046 /***********************************************************************
3047 * TpSetPoolMinThreads (NTDLL.@)
3049 BOOL WINAPI
TpSetPoolMinThreads( TP_POOL
*pool
, DWORD minimum
)
3051 struct threadpool
*this = impl_from_TP_POOL( pool
);
3052 NTSTATUS status
= STATUS_SUCCESS
;
3054 TRACE( "%p %u\n", pool
, minimum
);
3056 RtlEnterCriticalSection( &this->cs
);
3058 while (this->num_workers
< minimum
)
3060 status
= tp_new_worker_thread( this );
3061 if (status
!= STATUS_SUCCESS
)
3065 if (status
== STATUS_SUCCESS
)
3067 this->min_workers
= minimum
;
3068 this->max_workers
= max( this->min_workers
, this->max_workers
);
3071 RtlLeaveCriticalSection( &this->cs
);
3075 /***********************************************************************
3076 * TpSetTimer (NTDLL.@)
3078 VOID WINAPI
TpSetTimer( TP_TIMER
*timer
, LARGE_INTEGER
*timeout
, LONG period
, LONG window_length
)
3080 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
3081 struct threadpool_object
*other_timer
;
3082 BOOL submit_timer
= FALSE
;
3083 ULONGLONG timestamp
;
3085 TRACE( "%p %p %u %u\n", timer
, timeout
, period
, window_length
);
3087 RtlEnterCriticalSection( &timerqueue
.cs
);
3089 assert( this->u
.timer
.timer_initialized
);
3090 this->u
.timer
.timer_set
= timeout
!= NULL
;
3092 /* Convert relative timeout to absolute timestamp and handle a timeout
3093 * of zero, which means that the timer is submitted immediately. */
3096 timestamp
= timeout
->QuadPart
;
3097 if ((LONGLONG
)timestamp
< 0)
3100 NtQuerySystemTime( &now
);
3101 timestamp
= now
.QuadPart
- timestamp
;
3103 else if (!timestamp
)
3110 NtQuerySystemTime( &now
);
3111 timestamp
= now
.QuadPart
+ (ULONGLONG
)period
* 10000;
3113 submit_timer
= TRUE
;
3117 /* First remove existing timeout. */
3118 if (this->u
.timer
.timer_pending
)
3120 list_remove( &this->u
.timer
.timer_entry
);
3121 this->u
.timer
.timer_pending
= FALSE
;
3124 /* If the timer was enabled, then add it back to the queue. */
3127 this->u
.timer
.timeout
= timestamp
;
3128 this->u
.timer
.period
= period
;
3129 this->u
.timer
.window_length
= window_length
;
3131 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
3132 struct threadpool_object
, u
.timer
.timer_entry
)
3134 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
3135 if (this->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
3138 list_add_before( &other_timer
->u
.timer
.timer_entry
, &this->u
.timer
.timer_entry
);
3140 /* Wake up the timer thread when the timeout has to be updated. */
3141 if (list_head( &timerqueue
.pending_timers
) == &this->u
.timer
.timer_entry
)
3142 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
3144 this->u
.timer
.timer_pending
= TRUE
;
3147 RtlLeaveCriticalSection( &timerqueue
.cs
);
3150 tp_object_submit( this, FALSE
);
3153 /***********************************************************************
3154 * TpSetWait (NTDLL.@)
3156 VOID WINAPI
TpSetWait( TP_WAIT
*wait
, HANDLE handle
, LARGE_INTEGER
*timeout
)
3158 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
3159 ULONGLONG timestamp
= TIMEOUT_INFINITE
;
3160 BOOL submit_wait
= FALSE
;
3162 TRACE( "%p %p %p\n", wait
, handle
, timeout
);
3164 RtlEnterCriticalSection( &waitqueue
.cs
);
3166 assert( this->u
.wait
.bucket
);
3167 this->u
.wait
.handle
= handle
;
3169 if (handle
|| this->u
.wait
.wait_pending
)
3171 struct waitqueue_bucket
*bucket
= this->u
.wait
.bucket
;
3172 list_remove( &this->u
.wait
.wait_entry
);
3174 /* Convert relative timeout to absolute timestamp. */
3175 if (handle
&& timeout
)
3177 timestamp
= timeout
->QuadPart
;
3178 if ((LONGLONG
)timestamp
< 0)
3181 NtQuerySystemTime( &now
);
3182 timestamp
= now
.QuadPart
- timestamp
;
3184 else if (!timestamp
)
3191 /* Add wait object back into one of the queues. */
3194 list_add_tail( &bucket
->waiting
, &this->u
.wait
.wait_entry
);
3195 this->u
.wait
.wait_pending
= TRUE
;
3196 this->u
.wait
.timeout
= timestamp
;
3200 list_add_tail( &bucket
->reserved
, &this->u
.wait
.wait_entry
);
3201 this->u
.wait
.wait_pending
= FALSE
;
3204 /* Wake up the wait queue thread. */
3205 NtSetEvent( bucket
->update_event
, NULL
);
3208 RtlLeaveCriticalSection( &waitqueue
.cs
);
3211 tp_object_submit( this, FALSE
);
3214 /***********************************************************************
3215 * TpSimpleTryPost (NTDLL.@)
3217 NTSTATUS WINAPI
TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback
, PVOID userdata
,
3218 TP_CALLBACK_ENVIRON
*environment
)
3220 struct threadpool_object
*object
;
3221 struct threadpool
*pool
;
3224 TRACE( "%p %p %p\n", callback
, userdata
, environment
);
3226 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
3228 return STATUS_NO_MEMORY
;
3230 status
= tp_threadpool_lock( &pool
, environment
);
3233 RtlFreeHeap( GetProcessHeap(), 0, object
);
3237 object
->type
= TP_OBJECT_TYPE_SIMPLE
;
3238 object
->u
.simple
.callback
= callback
;
3239 tp_object_initialize( object
, pool
, userdata
, environment
);
3241 return STATUS_SUCCESS
;
3244 /***********************************************************************
3245 * TpStartAsyncIoOperation (NTDLL.@)
3247 void WINAPI
TpStartAsyncIoOperation( TP_IO
*io
)
3249 struct threadpool_object
*this = impl_from_TP_IO( io
);
3251 TRACE( "%p\n", io
);
3253 RtlEnterCriticalSection( &this->pool
->cs
);
3255 this->u
.io
.pending_count
++;
3257 RtlLeaveCriticalSection( &this->pool
->cs
);
3260 /***********************************************************************
3261 * TpWaitForIoCompletion (NTDLL.@)
3263 void WINAPI
TpWaitForIoCompletion( TP_IO
*io
, BOOL cancel_pending
)
3265 struct threadpool_object
*this = impl_from_TP_IO( io
);
3267 TRACE( "%p %d\n", io
, cancel_pending
);
3270 tp_object_cancel( this );
3271 tp_object_wait( this, FALSE
);
3274 /***********************************************************************
3275 * TpWaitForTimer (NTDLL.@)
3277 VOID WINAPI
TpWaitForTimer( TP_TIMER
*timer
, BOOL cancel_pending
)
3279 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
3281 TRACE( "%p %d\n", timer
, cancel_pending
);
3284 tp_object_cancel( this );
3285 tp_object_wait( this, FALSE
);
3288 /***********************************************************************
3289 * TpWaitForWait (NTDLL.@)
3291 VOID WINAPI
TpWaitForWait( TP_WAIT
*wait
, BOOL cancel_pending
)
3293 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
3295 TRACE( "%p %d\n", wait
, cancel_pending
);
3298 tp_object_cancel( this );
3299 tp_object_wait( this, FALSE
);
3302 /***********************************************************************
3303 * TpWaitForWork (NTDLL.@)
3305 VOID WINAPI
TpWaitForWork( TP_WORK
*work
, BOOL cancel_pending
)
3307 struct threadpool_object
*this = impl_from_TP_WORK( work
);
3309 TRACE( "%p %u\n", work
, cancel_pending
);
3312 tp_object_cancel( this );
3313 tp_object_wait( this, FALSE
);
3316 /***********************************************************************
3317 * TpSetPoolStackInformation (NTDLL.@)
3319 NTSTATUS WINAPI
TpSetPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3321 struct threadpool
*this = impl_from_TP_POOL( pool
);
3323 TRACE( "%p %p\n", pool
, stack_info
);
3326 return STATUS_INVALID_PARAMETER
;
3328 RtlEnterCriticalSection( &this->cs
);
3329 this->stack_info
= *stack_info
;
3330 RtlLeaveCriticalSection( &this->cs
);
3332 return STATUS_SUCCESS
;
3335 /***********************************************************************
3336 * TpQueryPoolStackInformation (NTDLL.@)
3338 NTSTATUS WINAPI
TpQueryPoolStackInformation( TP_POOL
*pool
, TP_POOL_STACK_INFORMATION
*stack_info
)
3340 struct threadpool
*this = impl_from_TP_POOL( pool
);
3342 TRACE( "%p %p\n", pool
, stack_info
);
3345 return STATUS_INVALID_PARAMETER
;
3347 RtlEnterCriticalSection( &this->cs
);
3348 *stack_info
= this->stack_info
;
3349 RtlLeaveCriticalSection( &this->cs
);
3351 return STATUS_SUCCESS
;