4 * Copyright (c) 2006 Robert Shearman
5 * Copyright (c) 2014-2015 Sebastian Lackner
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
23 #include "wine/port.h"
29 #define NONAMELESSUNION
31 #define WIN32_NO_STATUS
34 #include "wine/debug.h"
35 #include "wine/list.h"
37 #include "ntdll_misc.h"
39 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
42 * Old thread pooling API
47 PRTL_WORK_ITEM_ROUTINE function
;
51 #define EXPIRE_NEVER (~(ULONGLONG)0)
52 #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
54 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
;
59 RTL_CRITICAL_SECTION threadpool_compl_cs
;
63 NULL
, /* compl_port */
64 { &critsect_compl_debug
, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
67 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
69 0, 0, &old_threadpool
.threadpool_compl_cs
,
70 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
71 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
78 WAITORTIMERCALLBACK Callback
;
82 HANDLE CompletionEvent
;
84 BOOLEAN CallbackInProgress
;
90 struct timer_queue
*q
;
92 ULONG runcount
; /* number of callbacks pending execution */
93 RTL_WAITORTIMERCALLBACKFUNC callback
;
98 BOOL destroy
; /* timer should be deleted; once set, never unset */
99 HANDLE event
; /* removal event */
105 RTL_CRITICAL_SECTION cs
;
106 struct list timers
; /* sorted by expiration time */
107 BOOL quit
; /* queue should be deleted; once set, never unset */
113 * Object-oriented thread pooling API
116 #define THREADPOOL_WORKER_TIMEOUT 5000
117 #define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
119 /* internal threadpool representation */
126 /* pool of work items, locked via .cs */
128 RTL_CONDITION_VARIABLE update_event
;
129 /* information about worker threads, locked via .cs */
133 int num_busy_workers
;
136 enum threadpool_objtype
138 TP_OBJECT_TYPE_SIMPLE
,
140 TP_OBJECT_TYPE_TIMER
,
144 /* internal threadpool object representation */
145 struct threadpool_object
149 /* read-only information */
150 enum threadpool_objtype type
;
151 struct threadpool
*pool
;
152 struct threadpool_group
*group
;
154 PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback
;
155 PTP_SIMPLE_CALLBACK finalization_callback
;
158 /* information about the group, locked via .group->cs */
159 struct list group_entry
;
160 BOOL is_group_member
;
161 /* information about the pool, locked via .pool->cs */
162 struct list pool_entry
;
163 RTL_CONDITION_VARIABLE finished_event
;
164 RTL_CONDITION_VARIABLE group_finished_event
;
165 LONG num_pending_callbacks
;
166 LONG num_running_callbacks
;
167 LONG num_associated_callbacks
;
168 /* arguments for callback */
173 PTP_SIMPLE_CALLBACK callback
;
177 PTP_WORK_CALLBACK callback
;
181 PTP_TIMER_CALLBACK callback
;
182 /* information about the timer, locked via timerqueue.cs */
183 BOOL timer_initialized
;
185 struct list timer_entry
;
193 PTP_WAIT_CALLBACK callback
;
195 /* information about the wait object, locked via waitqueue.cs */
196 struct waitqueue_bucket
*bucket
;
198 struct list wait_entry
;
205 /* internal threadpool instance representation */
206 struct threadpool_instance
208 struct threadpool_object
*object
;
214 CRITICAL_SECTION
*critical_section
;
217 LONG semaphore_count
;
223 /* internal threadpool group representation */
224 struct threadpool_group
229 /* list of group members, locked via .cs */
233 /* global timerqueue object */
234 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
;
241 struct list pending_timers
;
242 RTL_CONDITION_VARIABLE update_event
;
246 { &timerqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
248 FALSE
, /* thread_running */
249 LIST_INIT( timerqueue
.pending_timers
), /* pending_timers */
250 RTL_CONDITION_VARIABLE_INIT
/* update_event */
253 static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug
=
255 0, 0, &timerqueue
.cs
,
256 { &timerqueue_debug
.ProcessLocksList
, &timerqueue_debug
.ProcessLocksList
},
257 0, 0, { (DWORD_PTR
)(__FILE__
": timerqueue.cs") }
260 /* global waitqueue object */
261 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
;
271 { &waitqueue_debug
, -1, 0, 0, 0, 0 }, /* cs */
273 LIST_INIT( waitqueue
.buckets
) /* buckets */
276 static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug
=
279 { &waitqueue_debug
.ProcessLocksList
, &waitqueue_debug
.ProcessLocksList
},
280 0, 0, { (DWORD_PTR
)(__FILE__
": waitqueue.cs") }
283 struct waitqueue_bucket
285 struct list bucket_entry
;
287 struct list reserved
;
292 static inline struct threadpool
*impl_from_TP_POOL( TP_POOL
*pool
)
294 return (struct threadpool
*)pool
;
297 static inline struct threadpool_object
*impl_from_TP_WORK( TP_WORK
*work
)
299 struct threadpool_object
*object
= (struct threadpool_object
*)work
;
300 assert( object
->type
== TP_OBJECT_TYPE_WORK
);
304 static inline struct threadpool_object
*impl_from_TP_TIMER( TP_TIMER
*timer
)
306 struct threadpool_object
*object
= (struct threadpool_object
*)timer
;
307 assert( object
->type
== TP_OBJECT_TYPE_TIMER
);
311 static inline struct threadpool_object
*impl_from_TP_WAIT( TP_WAIT
*wait
)
313 struct threadpool_object
*object
= (struct threadpool_object
*)wait
;
314 assert( object
->type
== TP_OBJECT_TYPE_WAIT
);
318 static inline struct threadpool_group
*impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group
)
320 return (struct threadpool_group
*)group
;
323 static inline struct threadpool_instance
*impl_from_TP_CALLBACK_INSTANCE( TP_CALLBACK_INSTANCE
*instance
)
325 return (struct threadpool_instance
*)instance
;
328 static void CALLBACK
threadpool_worker_proc( void *param
);
329 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
);
330 static void tp_object_shutdown( struct threadpool_object
*object
);
331 static BOOL
tp_object_release( struct threadpool_object
*object
);
332 static struct threadpool
*default_threadpool
= NULL
;
334 static inline LONG
interlocked_inc( PLONG dest
)
336 return interlocked_xchg_add( dest
, 1 ) + 1;
339 static inline LONG
interlocked_dec( PLONG dest
)
341 return interlocked_xchg_add( dest
, -1 ) - 1;
344 static void CALLBACK
process_rtl_work_item( TP_CALLBACK_INSTANCE
*instance
, void *userdata
)
346 struct rtl_work_item
*item
= userdata
;
348 TRACE("executing %p(%p)\n", item
->function
, item
->context
);
349 item
->function( item
->context
);
351 RtlFreeHeap( GetProcessHeap(), 0, item
);
354 /***********************************************************************
355 * RtlQueueWorkItem (NTDLL.@)
357 * Queues a work item into a thread in the thread pool.
360 * function [I] Work function to execute.
361 * context [I] Context to pass to the work function when it is executed.
362 * flags [I] Flags. See notes.
365 * Success: STATUS_SUCCESS.
366 * Failure: Any NTSTATUS code.
369 * Flags can be one or more of the following:
370 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
371 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
372 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
373 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
374 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
376 NTSTATUS WINAPI
RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function
, PVOID context
, ULONG flags
)
378 TP_CALLBACK_ENVIRON environment
;
379 struct rtl_work_item
*item
;
382 TRACE( "%p %p %u\n", function
, context
, flags
);
384 item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item
) );
386 return STATUS_NO_MEMORY
;
388 memset( &environment
, 0, sizeof(environment
) );
389 environment
.Version
= 1;
390 environment
.u
.s
.LongFunction
= (flags
& WT_EXECUTELONGFUNCTION
) != 0;
391 environment
.u
.s
.Persistent
= (flags
& WT_EXECUTEINPERSISTENTTHREAD
) != 0;
393 item
->function
= function
;
394 item
->context
= context
;
396 status
= TpSimpleTryPost( process_rtl_work_item
, item
, &environment
);
397 if (status
) RtlFreeHeap( GetProcessHeap(), 0, item
);
401 /***********************************************************************
402 * iocp_poller - get completion events and run callbacks
404 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
410 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
412 IO_STATUS_BLOCK iosb
;
413 NTSTATUS res
= NtRemoveIoCompletion( cport
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
416 ERR("NtRemoveIoCompletion failed: 0x%x\n", res
);
420 DWORD transferred
= 0;
423 if (iosb
.u
.Status
== STATUS_SUCCESS
)
424 transferred
= iosb
.Information
;
426 err
= RtlNtStatusToDosError(iosb
.u
.Status
);
428 callback( err
, transferred
, overlapped
);
434 /***********************************************************************
435 * RtlSetIoCompletionCallback (NTDLL.@)
437 * Binds a handle to a thread pool's completion port, and possibly
438 * starts a non-I/O thread to monitor this port and call functions back.
441 * FileHandle [I] Handle to bind to a completion port.
442 * Function [I] Callback function to call on I/O completions.
443 * Flags [I] Not used.
446 * Success: STATUS_SUCCESS.
447 * Failure: Any NTSTATUS code.
450 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
452 IO_STATUS_BLOCK iosb
;
453 FILE_COMPLETION_INFORMATION info
;
455 if (Flags
) FIXME("Unknown value Flags=0x%x\n", Flags
);
457 if (!old_threadpool
.compl_port
)
459 NTSTATUS res
= STATUS_SUCCESS
;
461 RtlEnterCriticalSection(&old_threadpool
.threadpool_compl_cs
);
462 if (!old_threadpool
.compl_port
)
466 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
469 /* FIXME native can start additional threads in case of e.g. hung callback function. */
470 res
= RtlQueueWorkItem( iocp_poller
, cport
, WT_EXECUTEDEFAULT
);
472 old_threadpool
.compl_port
= cport
;
477 RtlLeaveCriticalSection(&old_threadpool
.threadpool_compl_cs
);
481 info
.CompletionPort
= old_threadpool
.compl_port
;
482 info
.CompletionKey
= (ULONG_PTR
)Function
;
484 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
487 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
489 if (timeout
== INFINITE
) return NULL
;
490 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
494 static void delete_wait_work_item(struct wait_work_item
*wait_work_item
)
496 NtClose( wait_work_item
->CancelEvent
);
497 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
500 static DWORD CALLBACK
wait_thread_proc(LPVOID Arg
)
502 struct wait_work_item
*wait_work_item
= Arg
;
504 BOOLEAN alertable
= (wait_work_item
->Flags
& WT_EXECUTEINIOTHREAD
) != 0;
505 HANDLE handles
[2] = { wait_work_item
->Object
, wait_work_item
->CancelEvent
};
506 LARGE_INTEGER timeout
;
507 HANDLE completion_event
;
513 status
= NtWaitForMultipleObjects( 2, handles
, TRUE
, alertable
,
514 get_nt_timeout( &timeout
, wait_work_item
->Milliseconds
) );
515 if (status
== STATUS_WAIT_0
|| status
== STATUS_TIMEOUT
)
517 BOOLEAN TimerOrWaitFired
;
519 if (status
== STATUS_WAIT_0
)
521 TRACE( "object %p signaled, calling callback %p with context %p\n",
522 wait_work_item
->Object
, wait_work_item
->Callback
,
523 wait_work_item
->Context
);
524 TimerOrWaitFired
= FALSE
;
528 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
529 wait_work_item
->Object
, wait_work_item
->Callback
,
530 wait_work_item
->Context
);
531 TimerOrWaitFired
= TRUE
;
533 wait_work_item
->CallbackInProgress
= TRUE
;
534 wait_work_item
->Callback( wait_work_item
->Context
, TimerOrWaitFired
);
535 wait_work_item
->CallbackInProgress
= FALSE
;
537 if (wait_work_item
->Flags
& WT_EXECUTEONLYONCE
)
544 completion_event
= wait_work_item
->CompletionEvent
;
545 if (completion_event
) NtSetEvent( completion_event
, NULL
);
547 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
548 delete_wait_work_item( wait_work_item
);
553 /***********************************************************************
554 * RtlRegisterWait (NTDLL.@)
556 * Registers a wait for a handle to become signaled.
559 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
560 * Object [I] Object to wait to become signaled.
561 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
562 * Context [I] Context to pass to the callback function when it is executed.
563 * Milliseconds [I] Number of milliseconds to wait before timing out.
564 * Flags [I] Flags. See notes.
567 * Success: STATUS_SUCCESS.
568 * Failure: Any NTSTATUS code.
571 * Flags can be one or more of the following:
572 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
573 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
574 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
575 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
576 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
578 NTSTATUS WINAPI
RtlRegisterWait(PHANDLE NewWaitObject
, HANDLE Object
,
579 RTL_WAITORTIMERCALLBACKFUNC Callback
,
580 PVOID Context
, ULONG Milliseconds
, ULONG Flags
)
582 struct wait_work_item
*wait_work_item
;
585 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject
, Object
, Callback
, Context
, Milliseconds
, Flags
);
587 wait_work_item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item
) );
589 return STATUS_NO_MEMORY
;
591 wait_work_item
->Object
= Object
;
592 wait_work_item
->Callback
= Callback
;
593 wait_work_item
->Context
= Context
;
594 wait_work_item
->Milliseconds
= Milliseconds
;
595 wait_work_item
->Flags
= Flags
;
596 wait_work_item
->CallbackInProgress
= FALSE
;
597 wait_work_item
->DeleteCount
= 0;
598 wait_work_item
->CompletionEvent
= NULL
;
600 status
= NtCreateEvent( &wait_work_item
->CancelEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
601 if (status
!= STATUS_SUCCESS
)
603 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
607 Flags
= Flags
& (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
|
608 WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
);
609 status
= RtlQueueWorkItem( wait_thread_proc
, wait_work_item
, Flags
);
610 if (status
!= STATUS_SUCCESS
)
612 delete_wait_work_item( wait_work_item
);
616 *NewWaitObject
= wait_work_item
;
620 /***********************************************************************
621 * RtlDeregisterWaitEx (NTDLL.@)
623 * Cancels a wait operation and frees the resources associated with calling
627 * WaitObject [I] Handle to the wait object to free.
630 * Success: STATUS_SUCCESS.
631 * Failure: Any NTSTATUS code.
633 NTSTATUS WINAPI
RtlDeregisterWaitEx(HANDLE WaitHandle
, HANDLE CompletionEvent
)
635 struct wait_work_item
*wait_work_item
= WaitHandle
;
636 NTSTATUS status
= STATUS_SUCCESS
;
638 TRACE( "(%p)\n", WaitHandle
);
640 NtSetEvent( wait_work_item
->CancelEvent
, NULL
);
641 if (wait_work_item
->CallbackInProgress
)
643 if (CompletionEvent
!= NULL
)
645 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
647 status
= NtCreateEvent( &CompletionEvent
, EVENT_ALL_ACCESS
, NULL
, NotificationEvent
, FALSE
);
648 if (status
!= STATUS_SUCCESS
)
650 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, CompletionEvent
);
651 if (wait_work_item
->CallbackInProgress
)
652 NtWaitForSingleObject( CompletionEvent
, FALSE
, NULL
);
653 NtClose( CompletionEvent
);
657 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, CompletionEvent
);
658 if (wait_work_item
->CallbackInProgress
)
659 status
= STATUS_PENDING
;
663 status
= STATUS_PENDING
;
666 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
668 status
= STATUS_SUCCESS
;
669 delete_wait_work_item( wait_work_item
);
675 /***********************************************************************
676 * RtlDeregisterWait (NTDLL.@)
678 * Cancels a wait operation and frees the resources associated with calling
682 * WaitObject [I] Handle to the wait object to free.
685 * Success: STATUS_SUCCESS.
686 * Failure: Any NTSTATUS code.
688 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
690 return RtlDeregisterWaitEx(WaitHandle
, NULL
);
694 /************************** Timer Queue Impl **************************/
696 static void queue_remove_timer(struct queue_timer
*t
)
698 /* We MUST hold the queue cs while calling this function. This ensures
699 that we cannot queue another callback for this timer. The runcount
700 being zero makes sure we don't have any already queued. */
701 struct timer_queue
*q
= t
->q
;
703 assert(t
->runcount
== 0);
706 list_remove(&t
->entry
);
708 NtSetEvent(t
->event
, NULL
);
709 RtlFreeHeap(GetProcessHeap(), 0, t
);
711 if (q
->quit
&& list_empty(&q
->timers
))
712 NtSetEvent(q
->event
, NULL
);
715 static void timer_cleanup_callback(struct queue_timer
*t
)
717 struct timer_queue
*q
= t
->q
;
718 RtlEnterCriticalSection(&q
->cs
);
720 assert(0 < t
->runcount
);
723 if (t
->destroy
&& t
->runcount
== 0)
724 queue_remove_timer(t
);
726 RtlLeaveCriticalSection(&q
->cs
);
729 static DWORD WINAPI
timer_callback_wrapper(LPVOID p
)
731 struct queue_timer
*t
= p
;
732 t
->callback(t
->param
, TRUE
);
733 timer_cleanup_callback(t
);
737 static inline ULONGLONG
queue_current_time(void)
739 LARGE_INTEGER now
, freq
;
740 NtQueryPerformanceCounter(&now
, &freq
);
741 return now
.QuadPart
* 1000 / freq
.QuadPart
;
744 static void queue_add_timer(struct queue_timer
*t
, ULONGLONG time
,
747 /* We MUST hold the queue cs while calling this function. */
748 struct timer_queue
*q
= t
->q
;
749 struct list
*ptr
= &q
->timers
;
751 assert(!q
->quit
|| (t
->destroy
&& time
== EXPIRE_NEVER
));
753 if (time
!= EXPIRE_NEVER
)
754 LIST_FOR_EACH(ptr
, &q
->timers
)
756 struct queue_timer
*cur
= LIST_ENTRY(ptr
, struct queue_timer
, entry
);
757 if (time
< cur
->expire
)
760 list_add_before(ptr
, &t
->entry
);
764 /* If we insert at the head of the list, we need to expire sooner
766 if (set_event
&& &t
->entry
== list_head(&q
->timers
))
767 NtSetEvent(q
->event
, NULL
);
770 static inline void queue_move_timer(struct queue_timer
*t
, ULONGLONG time
,
773 /* We MUST hold the queue cs while calling this function. */
774 list_remove(&t
->entry
);
775 queue_add_timer(t
, time
, set_event
);
778 static void queue_timer_expire(struct timer_queue
*q
)
780 struct queue_timer
*t
= NULL
;
782 RtlEnterCriticalSection(&q
->cs
);
783 if (list_head(&q
->timers
))
786 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
787 if (!t
->destroy
&& t
->expire
<= ((now
= queue_current_time())))
792 next
= t
->expire
+ t
->period
;
793 /* avoid trigger cascade if overloaded / hibernated */
795 next
= now
+ t
->period
;
799 queue_move_timer(t
, next
, FALSE
);
804 RtlLeaveCriticalSection(&q
->cs
);
808 if (t
->flags
& WT_EXECUTEINTIMERTHREAD
)
809 timer_callback_wrapper(t
);
814 & (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINPERSISTENTTHREAD
815 | WT_EXECUTELONGFUNCTION
| WT_TRANSFER_IMPERSONATION
));
816 NTSTATUS status
= RtlQueueWorkItem(timer_callback_wrapper
, t
, flags
);
817 if (status
!= STATUS_SUCCESS
)
818 timer_cleanup_callback(t
);
823 static ULONG
queue_get_timeout(struct timer_queue
*q
)
825 struct queue_timer
*t
;
826 ULONG timeout
= INFINITE
;
828 RtlEnterCriticalSection(&q
->cs
);
829 if (list_head(&q
->timers
))
831 t
= LIST_ENTRY(list_head(&q
->timers
), struct queue_timer
, entry
);
832 assert(!t
->destroy
|| t
->expire
== EXPIRE_NEVER
);
834 if (t
->expire
!= EXPIRE_NEVER
)
836 ULONGLONG time
= queue_current_time();
837 timeout
= t
->expire
< time
? 0 : t
->expire
- time
;
840 RtlLeaveCriticalSection(&q
->cs
);
845 static void WINAPI
timer_queue_thread_proc(LPVOID p
)
847 struct timer_queue
*q
= p
;
850 timeout_ms
= INFINITE
;
853 LARGE_INTEGER timeout
;
857 status
= NtWaitForSingleObject(
858 q
->event
, FALSE
, get_nt_timeout(&timeout
, timeout_ms
));
860 if (status
== STATUS_WAIT_0
)
862 /* There are two possible ways to trigger the event. Either
863 we are quitting and the last timer got removed, or a new
864 timer got put at the head of the list so we need to adjust
866 RtlEnterCriticalSection(&q
->cs
);
867 if (q
->quit
&& list_empty(&q
->timers
))
869 RtlLeaveCriticalSection(&q
->cs
);
871 else if (status
== STATUS_TIMEOUT
)
872 queue_timer_expire(q
);
877 timeout_ms
= queue_get_timeout(q
);
881 RtlDeleteCriticalSection(&q
->cs
);
883 RtlFreeHeap(GetProcessHeap(), 0, q
);
884 RtlExitUserThread( 0 );
887 static void queue_destroy_timer(struct queue_timer
*t
)
889 /* We MUST hold the queue cs while calling this function. */
891 if (t
->runcount
== 0)
892 /* Ensure a timer is promptly removed. If callbacks are pending,
893 it will be removed after the last one finishes by the callback
895 queue_remove_timer(t
);
897 /* Make sure no destroyed timer masks an active timer at the head
898 of the sorted list. */
899 queue_move_timer(t
, EXPIRE_NEVER
, FALSE
);
902 /***********************************************************************
903 * RtlCreateTimerQueue (NTDLL.@)
905 * Creates a timer queue object and returns a handle to it.
908 * NewTimerQueue [O] The newly created queue.
911 * Success: STATUS_SUCCESS.
912 * Failure: Any NTSTATUS code.
914 NTSTATUS WINAPI
RtlCreateTimerQueue(PHANDLE NewTimerQueue
)
917 struct timer_queue
*q
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q
);
919 return STATUS_NO_MEMORY
;
921 RtlInitializeCriticalSection(&q
->cs
);
922 list_init(&q
->timers
);
924 q
->magic
= TIMER_QUEUE_MAGIC
;
925 status
= NtCreateEvent(&q
->event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
926 if (status
!= STATUS_SUCCESS
)
928 RtlFreeHeap(GetProcessHeap(), 0, q
);
931 status
= RtlCreateUserThread(GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
932 timer_queue_thread_proc
, q
, &q
->thread
, NULL
);
933 if (status
!= STATUS_SUCCESS
)
936 RtlFreeHeap(GetProcessHeap(), 0, q
);
941 return STATUS_SUCCESS
;
944 /***********************************************************************
945 * RtlDeleteTimerQueueEx (NTDLL.@)
947 * Deletes a timer queue object.
950 * TimerQueue [I] The timer queue to destroy.
951 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
952 * wait until all timers are finished firing before
953 * returning. Otherwise, return immediately and set the
954 * event when all timers are done.
957 * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
958 * Failure: Any NTSTATUS code.
960 NTSTATUS WINAPI
RtlDeleteTimerQueueEx(HANDLE TimerQueue
, HANDLE CompletionEvent
)
962 struct timer_queue
*q
= TimerQueue
;
963 struct queue_timer
*t
, *temp
;
967 if (!q
|| q
->magic
!= TIMER_QUEUE_MAGIC
)
968 return STATUS_INVALID_HANDLE
;
972 RtlEnterCriticalSection(&q
->cs
);
974 if (list_head(&q
->timers
))
975 /* When the last timer is removed, it will signal the timer thread to
977 LIST_FOR_EACH_ENTRY_SAFE(t
, temp
, &q
->timers
, struct queue_timer
, entry
)
978 queue_destroy_timer(t
);
980 /* However if we have none, we must do it ourselves. */
981 NtSetEvent(q
->event
, NULL
);
982 RtlLeaveCriticalSection(&q
->cs
);
984 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
986 NtWaitForSingleObject(thread
, FALSE
, NULL
);
987 status
= STATUS_SUCCESS
;
993 FIXME("asynchronous return on completion event unimplemented\n");
994 NtWaitForSingleObject(thread
, FALSE
, NULL
);
995 NtSetEvent(CompletionEvent
, NULL
);
997 status
= STATUS_PENDING
;
1004 static struct timer_queue
*get_timer_queue(HANDLE TimerQueue
)
1006 static struct timer_queue
*default_timer_queue
;
1012 if (!default_timer_queue
)
1015 NTSTATUS status
= RtlCreateTimerQueue(&q
);
1016 if (status
== STATUS_SUCCESS
)
1018 PVOID p
= interlocked_cmpxchg_ptr(
1019 (void **) &default_timer_queue
, q
, NULL
);
1021 /* Got beat to the punch. */
1022 RtlDeleteTimerQueueEx(q
, NULL
);
1025 return default_timer_queue
;
1029 /***********************************************************************
1030 * RtlCreateTimer (NTDLL.@)
1032 * Creates a new timer associated with the given queue.
1035 * NewTimer [O] The newly created timer.
1036 * TimerQueue [I] The queue to hold the timer.
1037 * Callback [I] The callback to fire.
1038 * Parameter [I] The argument for the callback.
1039 * DueTime [I] The delay, in milliseconds, before first firing the
1041 * Period [I] The period, in milliseconds, at which to fire the timer
1042 * after the first callback. If zero, the timer will only
1043 * fire once. It still needs to be deleted with
1045 * Flags [I] Flags controlling the execution of the callback. In
1046 * addition to the WT_* thread pool flags (see
1047 * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
1048 * WT_EXECUTEONLYONCE are supported.
1051 * Success: STATUS_SUCCESS.
1052 * Failure: Any NTSTATUS code.
1054 NTSTATUS WINAPI
RtlCreateTimer(PHANDLE NewTimer
, HANDLE TimerQueue
,
1055 RTL_WAITORTIMERCALLBACKFUNC Callback
,
1056 PVOID Parameter
, DWORD DueTime
, DWORD Period
,
1060 struct queue_timer
*t
;
1061 struct timer_queue
*q
= get_timer_queue(TimerQueue
);
1063 if (!q
) return STATUS_NO_MEMORY
;
1064 if (q
->magic
!= TIMER_QUEUE_MAGIC
) return STATUS_INVALID_HANDLE
;
1066 t
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t
);
1068 return STATUS_NO_MEMORY
;
1072 t
->callback
= Callback
;
1073 t
->param
= Parameter
;
1079 status
= STATUS_SUCCESS
;
1080 RtlEnterCriticalSection(&q
->cs
);
1082 status
= STATUS_INVALID_HANDLE
;
1084 queue_add_timer(t
, queue_current_time() + DueTime
, TRUE
);
1085 RtlLeaveCriticalSection(&q
->cs
);
1087 if (status
== STATUS_SUCCESS
)
1090 RtlFreeHeap(GetProcessHeap(), 0, t
);
1095 /***********************************************************************
1096 * RtlUpdateTimer (NTDLL.@)
1098 * Changes the time at which a timer expires.
1101 * TimerQueue [I] The queue that holds the timer.
1102 * Timer [I] The timer to update.
1103 * DueTime [I] The delay, in milliseconds, before next firing the timer.
1104 * Period [I] The period, in milliseconds, at which to fire the timer
1105 * after the first callback. If zero, the timer will not
1106 * refire once. It still needs to be deleted with
1110 * Success: STATUS_SUCCESS.
1111 * Failure: Any NTSTATUS code.
1113 NTSTATUS WINAPI
RtlUpdateTimer(HANDLE TimerQueue
, HANDLE Timer
,
1114 DWORD DueTime
, DWORD Period
)
1116 struct queue_timer
*t
= Timer
;
1117 struct timer_queue
*q
= t
->q
;
1119 RtlEnterCriticalSection(&q
->cs
);
1120 /* Can't change a timer if it was once-only or destroyed. */
1121 if (t
->expire
!= EXPIRE_NEVER
)
1124 queue_move_timer(t
, queue_current_time() + DueTime
, TRUE
);
1126 RtlLeaveCriticalSection(&q
->cs
);
1128 return STATUS_SUCCESS
;
1131 /***********************************************************************
1132 * RtlDeleteTimer (NTDLL.@)
1134 * Cancels a timer-queue timer.
1137 * TimerQueue [I] The queue that holds the timer.
1138 * Timer [I] The timer to update.
1139 * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
1140 * wait until the timer is finished firing all pending
1141 * callbacks before returning. Otherwise, return
1142 * immediately and set the timer is done.
1145 * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
1146 or if the completion event is NULL.
1147 * Failure: Any NTSTATUS code.
1149 NTSTATUS WINAPI
RtlDeleteTimer(HANDLE TimerQueue
, HANDLE Timer
,
1150 HANDLE CompletionEvent
)
1152 struct queue_timer
*t
= Timer
;
1153 struct timer_queue
*q
;
1154 NTSTATUS status
= STATUS_PENDING
;
1155 HANDLE event
= NULL
;
1158 return STATUS_INVALID_PARAMETER_1
;
1160 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
1162 status
= NtCreateEvent(&event
, EVENT_ALL_ACCESS
, NULL
, SynchronizationEvent
, FALSE
);
1163 if (status
== STATUS_SUCCESS
)
1164 status
= STATUS_PENDING
;
1166 else if (CompletionEvent
)
1167 event
= CompletionEvent
;
1169 RtlEnterCriticalSection(&q
->cs
);
1171 if (t
->runcount
== 0 && event
)
1172 status
= STATUS_SUCCESS
;
1173 queue_destroy_timer(t
);
1174 RtlLeaveCriticalSection(&q
->cs
);
1176 if (CompletionEvent
== INVALID_HANDLE_VALUE
&& event
)
1178 if (status
== STATUS_PENDING
)
1180 NtWaitForSingleObject(event
, FALSE
, NULL
);
1181 status
= STATUS_SUCCESS
;
1189 /***********************************************************************
1190 * timerqueue_thread_proc (internal)
1192 static void CALLBACK
timerqueue_thread_proc( void *param
)
1194 ULONGLONG timeout_lower
, timeout_upper
, new_timeout
;
1195 struct threadpool_object
*other_timer
;
1196 LARGE_INTEGER now
, timeout
;
1199 TRACE( "starting timer queue thread\n" );
1201 RtlEnterCriticalSection( &timerqueue
.cs
);
1204 NtQuerySystemTime( &now
);
1206 /* Check for expired timers. */
1207 while ((ptr
= list_head( &timerqueue
.pending_timers
)))
1209 struct threadpool_object
*timer
= LIST_ENTRY( ptr
, struct threadpool_object
, u
.timer
.timer_entry
);
1210 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1211 assert( timer
->u
.timer
.timer_pending
);
1212 if (timer
->u
.timer
.timeout
> now
.QuadPart
)
1215 /* Queue a new callback in one of the worker threads. */
1216 list_remove( &timer
->u
.timer
.timer_entry
);
1217 timer
->u
.timer
.timer_pending
= FALSE
;
1218 tp_object_submit( timer
, FALSE
);
1220 /* Insert the timer back into the queue, except its marked for shutdown. */
1221 if (timer
->u
.timer
.period
&& !timer
->shutdown
)
1223 timer
->u
.timer
.timeout
+= (ULONGLONG
)timer
->u
.timer
.period
* 10000;
1224 if (timer
->u
.timer
.timeout
<= now
.QuadPart
)
1225 timer
->u
.timer
.timeout
= now
.QuadPart
+ 1;
1227 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1228 struct threadpool_object
, u
.timer
.timer_entry
)
1230 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1231 if (timer
->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
1234 list_add_before( &other_timer
->u
.timer
.timer_entry
, &timer
->u
.timer
.timer_entry
);
1235 timer
->u
.timer
.timer_pending
= TRUE
;
1239 timeout_lower
= TIMEOUT_INFINITE
;
1240 timeout_upper
= TIMEOUT_INFINITE
;
1242 /* Determine next timeout and use the window length to optimize wakeup times. */
1243 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
1244 struct threadpool_object
, u
.timer
.timer_entry
)
1246 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
1247 if (other_timer
->u
.timer
.timeout
>= timeout_upper
)
1250 timeout_lower
= other_timer
->u
.timer
.timeout
;
1251 new_timeout
= timeout_lower
+ (ULONGLONG
)other_timer
->u
.timer
.window_length
* 10000;
1252 if (new_timeout
< timeout_upper
)
1253 timeout_upper
= new_timeout
;
1256 /* Wait for timer update events or until the next timer expires. */
1257 if (timerqueue
.objcount
)
1259 timeout
.QuadPart
= timeout_lower
;
1260 RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
, &timeout
);
1264 /* All timers have been destroyed, if no new timers are created
1265 * within some amount of time, then we can shutdown this thread. */
1266 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1267 if (RtlSleepConditionVariableCS( &timerqueue
.update_event
, &timerqueue
.cs
,
1268 &timeout
) == STATUS_TIMEOUT
&& !timerqueue
.objcount
)
1274 timerqueue
.thread_running
= FALSE
;
1275 RtlLeaveCriticalSection( &timerqueue
.cs
);
1277 TRACE( "terminating timer queue thread\n" );
1278 RtlExitUserThread( 0 );
1281 /***********************************************************************
1282 * tp_timerqueue_lock (internal)
1284 * Acquires a lock on the global timerqueue. When the lock is acquired
1285 * successfully, it is guaranteed that the timer thread is running.
1287 static NTSTATUS
tp_timerqueue_lock( struct threadpool_object
*timer
)
1289 NTSTATUS status
= STATUS_SUCCESS
;
1290 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1292 timer
->u
.timer
.timer_initialized
= FALSE
;
1293 timer
->u
.timer
.timer_pending
= FALSE
;
1294 timer
->u
.timer
.timer_set
= FALSE
;
1295 timer
->u
.timer
.timeout
= 0;
1296 timer
->u
.timer
.period
= 0;
1297 timer
->u
.timer
.window_length
= 0;
1299 RtlEnterCriticalSection( &timerqueue
.cs
);
1301 /* Make sure that the timerqueue thread is running. */
1302 if (!timerqueue
.thread_running
)
1305 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1306 timerqueue_thread_proc
, NULL
, &thread
, NULL
);
1307 if (status
== STATUS_SUCCESS
)
1309 timerqueue
.thread_running
= TRUE
;
1314 if (status
== STATUS_SUCCESS
)
1316 timer
->u
.timer
.timer_initialized
= TRUE
;
1317 timerqueue
.objcount
++;
1320 RtlLeaveCriticalSection( &timerqueue
.cs
);
1324 /***********************************************************************
1325 * tp_timerqueue_unlock (internal)
1327 * Releases a lock on the global timerqueue.
1329 static void tp_timerqueue_unlock( struct threadpool_object
*timer
)
1331 assert( timer
->type
== TP_OBJECT_TYPE_TIMER
);
1333 RtlEnterCriticalSection( &timerqueue
.cs
);
1334 if (timer
->u
.timer
.timer_initialized
)
1336 /* If timer was pending, remove it. */
1337 if (timer
->u
.timer
.timer_pending
)
1339 list_remove( &timer
->u
.timer
.timer_entry
);
1340 timer
->u
.timer
.timer_pending
= FALSE
;
1343 /* If the last timer object was destroyed, then wake up the thread. */
1344 if (!--timerqueue
.objcount
)
1346 assert( list_empty( &timerqueue
.pending_timers
) );
1347 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
1350 timer
->u
.timer
.timer_initialized
= FALSE
;
1352 RtlLeaveCriticalSection( &timerqueue
.cs
);
1355 /***********************************************************************
1356 * waitqueue_thread_proc (internal)
1358 static void CALLBACK
waitqueue_thread_proc( void *param
)
1360 struct threadpool_object
*objects
[MAXIMUM_WAITQUEUE_OBJECTS
];
1361 HANDLE handles
[MAXIMUM_WAITQUEUE_OBJECTS
+ 1];
1362 struct waitqueue_bucket
*bucket
= param
;
1363 struct threadpool_object
*wait
, *next
;
1364 LARGE_INTEGER now
, timeout
;
1368 TRACE( "starting wait queue thread\n" );
1370 RtlEnterCriticalSection( &waitqueue
.cs
);
1374 NtQuerySystemTime( &now
);
1375 timeout
.QuadPart
= TIMEOUT_INFINITE
;
1378 LIST_FOR_EACH_ENTRY_SAFE( wait
, next
, &bucket
->waiting
, struct threadpool_object
,
1381 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1382 if (wait
->u
.wait
.timeout
<= now
.QuadPart
)
1384 /* Wait object timed out. */
1385 list_remove( &wait
->u
.wait
.wait_entry
);
1386 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1387 tp_object_submit( wait
, FALSE
);
1391 if (wait
->u
.wait
.timeout
< timeout
.QuadPart
)
1392 timeout
.QuadPart
= wait
->u
.wait
.timeout
;
1394 assert( num_handles
< MAXIMUM_WAITQUEUE_OBJECTS
);
1395 interlocked_inc( &wait
->refcount
);
1396 objects
[num_handles
] = wait
;
1397 handles
[num_handles
] = wait
->u
.wait
.handle
;
1402 if (!bucket
->objcount
)
1404 /* All wait objects have been destroyed, if no new wait objects are created
1405 * within some amount of time, then we can shutdown this thread. */
1406 assert( num_handles
== 0 );
1407 RtlLeaveCriticalSection( &waitqueue
.cs
);
1408 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
1409 status
= NtWaitForMultipleObjects( 1, &bucket
->update_event
, TRUE
, FALSE
, &timeout
);
1410 RtlEnterCriticalSection( &waitqueue
.cs
);
1412 if (status
== STATUS_TIMEOUT
&& !bucket
->objcount
)
1417 handles
[num_handles
] = bucket
->update_event
;
1418 RtlLeaveCriticalSection( &waitqueue
.cs
);
1419 status
= NtWaitForMultipleObjects( num_handles
+ 1, handles
, TRUE
, FALSE
, &timeout
);
1420 RtlEnterCriticalSection( &waitqueue
.cs
);
1422 if (status
>= STATUS_WAIT_0
&& status
< STATUS_WAIT_0
+ num_handles
)
1424 wait
= objects
[status
- STATUS_WAIT_0
];
1425 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1426 if (wait
->u
.wait
.bucket
)
1428 /* Wait object signaled. */
1429 assert( wait
->u
.wait
.bucket
== bucket
);
1430 list_remove( &wait
->u
.wait
.wait_entry
);
1431 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1432 tp_object_submit( wait
, TRUE
);
1435 ERR("wait object %p triggered while object was destroyed\n", wait
);
1438 /* Release temporary references to wait objects. */
1441 wait
= objects
[--num_handles
];
1442 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1443 tp_object_release( wait
);
1447 /* Try to merge bucket with other threads. */
1448 if (waitqueue
.num_buckets
> 1 && bucket
->objcount
&&
1449 bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 1 / 3)
1451 struct waitqueue_bucket
*other_bucket
;
1452 LIST_FOR_EACH_ENTRY( other_bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1454 if (other_bucket
!= bucket
&& other_bucket
->objcount
&&
1455 other_bucket
->objcount
+ bucket
->objcount
<= MAXIMUM_WAITQUEUE_OBJECTS
* 2 / 3)
1457 other_bucket
->objcount
+= bucket
->objcount
;
1458 bucket
->objcount
= 0;
1460 /* Update reserved list. */
1461 LIST_FOR_EACH_ENTRY( wait
, &bucket
->reserved
, struct threadpool_object
, u
.wait
.wait_entry
)
1463 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1464 wait
->u
.wait
.bucket
= other_bucket
;
1466 list_move_tail( &other_bucket
->reserved
, &bucket
->reserved
);
1468 /* Update waiting list. */
1469 LIST_FOR_EACH_ENTRY( wait
, &bucket
->waiting
, struct threadpool_object
, u
.wait
.wait_entry
)
1471 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1472 wait
->u
.wait
.bucket
= other_bucket
;
1474 list_move_tail( &other_bucket
->waiting
, &bucket
->waiting
);
1476 /* Move bucket to the end, to keep the probability of
1477 * newly added wait objects as small as possible. */
1478 list_remove( &bucket
->bucket_entry
);
1479 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1481 NtSetEvent( other_bucket
->update_event
, NULL
);
1488 /* Remove this bucket from the list. */
1489 list_remove( &bucket
->bucket_entry
);
1490 if (!--waitqueue
.num_buckets
)
1491 assert( list_empty( &waitqueue
.buckets
) );
1493 RtlLeaveCriticalSection( &waitqueue
.cs
);
1495 TRACE( "terminating wait queue thread\n" );
1497 assert( bucket
->objcount
== 0 );
1498 assert( list_empty( &bucket
->reserved
) );
1499 assert( list_empty( &bucket
->waiting
) );
1500 NtClose( bucket
->update_event
);
1502 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1503 RtlExitUserThread( 0 );
1506 /***********************************************************************
1507 * tp_waitqueue_lock (internal)
1509 static NTSTATUS
tp_waitqueue_lock( struct threadpool_object
*wait
)
1511 struct waitqueue_bucket
*bucket
;
1514 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1516 wait
->u
.wait
.signaled
= 0;
1517 wait
->u
.wait
.bucket
= NULL
;
1518 wait
->u
.wait
.wait_pending
= FALSE
;
1519 wait
->u
.wait
.timeout
= 0;
1520 wait
->u
.wait
.handle
= INVALID_HANDLE_VALUE
;
1522 RtlEnterCriticalSection( &waitqueue
.cs
);
1524 /* Try to assign to existing bucket if possible. */
1525 LIST_FOR_EACH_ENTRY( bucket
, &waitqueue
.buckets
, struct waitqueue_bucket
, bucket_entry
)
1527 if (bucket
->objcount
< MAXIMUM_WAITQUEUE_OBJECTS
)
1529 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1530 wait
->u
.wait
.bucket
= bucket
;
1533 status
= STATUS_SUCCESS
;
1538 /* Create a new bucket and corresponding worker thread. */
1539 bucket
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket
) );
1542 status
= STATUS_NO_MEMORY
;
1546 bucket
->objcount
= 0;
1547 list_init( &bucket
->reserved
);
1548 list_init( &bucket
->waiting
);
1550 status
= NtCreateEvent( &bucket
->update_event
, EVENT_ALL_ACCESS
,
1551 NULL
, SynchronizationEvent
, FALSE
);
1554 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1558 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1559 waitqueue_thread_proc
, bucket
, &thread
, NULL
);
1560 if (status
== STATUS_SUCCESS
)
1562 list_add_tail( &waitqueue
.buckets
, &bucket
->bucket_entry
);
1563 waitqueue
.num_buckets
++;
1565 list_add_tail( &bucket
->reserved
, &wait
->u
.wait
.wait_entry
);
1566 wait
->u
.wait
.bucket
= bucket
;
1573 NtClose( bucket
->update_event
);
1574 RtlFreeHeap( GetProcessHeap(), 0, bucket
);
1578 RtlLeaveCriticalSection( &waitqueue
.cs
);
1582 /***********************************************************************
1583 * tp_waitqueue_unlock (internal)
1585 static void tp_waitqueue_unlock( struct threadpool_object
*wait
)
1587 assert( wait
->type
== TP_OBJECT_TYPE_WAIT
);
1589 RtlEnterCriticalSection( &waitqueue
.cs
);
1590 if (wait
->u
.wait
.bucket
)
1592 struct waitqueue_bucket
*bucket
= wait
->u
.wait
.bucket
;
1593 assert( bucket
->objcount
> 0 );
1595 list_remove( &wait
->u
.wait
.wait_entry
);
1596 wait
->u
.wait
.bucket
= NULL
;
1599 NtSetEvent( bucket
->update_event
, NULL
);
1601 RtlLeaveCriticalSection( &waitqueue
.cs
);
1604 /***********************************************************************
1605 * tp_threadpool_alloc (internal)
1607 * Allocates a new threadpool object.
1609 static NTSTATUS
tp_threadpool_alloc( struct threadpool
**out
)
1611 struct threadpool
*pool
;
1613 pool
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool
) );
1615 return STATUS_NO_MEMORY
;
1619 pool
->shutdown
= FALSE
;
1621 RtlInitializeCriticalSection( &pool
->cs
);
1622 pool
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool.cs");
1624 list_init( &pool
->pool
);
1625 RtlInitializeConditionVariable( &pool
->update_event
);
1627 pool
->max_workers
= 500;
1628 pool
->min_workers
= 0;
1629 pool
->num_workers
= 0;
1630 pool
->num_busy_workers
= 0;
1632 TRACE( "allocated threadpool %p\n", pool
);
1635 return STATUS_SUCCESS
;
1638 /***********************************************************************
1639 * tp_threadpool_shutdown (internal)
1641 * Prepares the shutdown of a threadpool object and notifies all worker
1642 * threads to terminate (after all remaining work items have been
1645 static void tp_threadpool_shutdown( struct threadpool
*pool
)
1647 assert( pool
!= default_threadpool
);
1649 pool
->shutdown
= TRUE
;
1650 RtlWakeAllConditionVariable( &pool
->update_event
);
1653 /***********************************************************************
1654 * tp_threadpool_release (internal)
1656 * Releases a reference to a threadpool object.
1658 static BOOL
tp_threadpool_release( struct threadpool
*pool
)
1660 if (interlocked_dec( &pool
->refcount
))
1663 TRACE( "destroying threadpool %p\n", pool
);
1665 assert( pool
->shutdown
);
1666 assert( !pool
->objcount
);
1667 assert( list_empty( &pool
->pool
) );
1669 pool
->cs
.DebugInfo
->Spare
[0] = 0;
1670 RtlDeleteCriticalSection( &pool
->cs
);
1672 RtlFreeHeap( GetProcessHeap(), 0, pool
);
1676 /***********************************************************************
1677 * tp_threadpool_lock (internal)
1679 * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
1680 * block. When the lock is acquired successfully, it is guaranteed that
1681 * there is at least one worker thread to process tasks.
1683 static NTSTATUS
tp_threadpool_lock( struct threadpool
**out
, TP_CALLBACK_ENVIRON
*environment
)
1685 struct threadpool
*pool
= NULL
;
1686 NTSTATUS status
= STATUS_SUCCESS
;
1689 pool
= (struct threadpool
*)environment
->Pool
;
1693 if (!default_threadpool
)
1695 status
= tp_threadpool_alloc( &pool
);
1696 if (status
!= STATUS_SUCCESS
)
1699 if (interlocked_cmpxchg_ptr( (void *)&default_threadpool
, pool
, NULL
) != NULL
)
1701 tp_threadpool_shutdown( pool
);
1702 tp_threadpool_release( pool
);
1706 pool
= default_threadpool
;
1709 RtlEnterCriticalSection( &pool
->cs
);
1711 /* Make sure that the threadpool has at least one thread. */
1712 if (!pool
->num_workers
)
1715 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1716 threadpool_worker_proc
, pool
, &thread
, NULL
);
1717 if (status
== STATUS_SUCCESS
)
1719 interlocked_inc( &pool
->refcount
);
1720 pool
->num_workers
++;
1721 pool
->num_busy_workers
++;
1726 /* Keep a reference, and increment objcount to ensure that the
1727 * last thread doesn't terminate. */
1728 if (status
== STATUS_SUCCESS
)
1730 interlocked_inc( &pool
->refcount
);
1734 RtlLeaveCriticalSection( &pool
->cs
);
1736 if (status
!= STATUS_SUCCESS
)
1740 return STATUS_SUCCESS
;
1743 /***********************************************************************
1744 * tp_threadpool_unlock (internal)
1746 * Releases a lock on a threadpool.
1748 static void tp_threadpool_unlock( struct threadpool
*pool
)
1750 RtlEnterCriticalSection( &pool
->cs
);
1752 RtlLeaveCriticalSection( &pool
->cs
);
1753 tp_threadpool_release( pool
);
1756 /***********************************************************************
1757 * tp_group_alloc (internal)
1759 * Allocates a new threadpool group object.
1761 static NTSTATUS
tp_group_alloc( struct threadpool_group
**out
)
1763 struct threadpool_group
*group
;
1765 group
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group
) );
1767 return STATUS_NO_MEMORY
;
1769 group
->refcount
= 1;
1770 group
->shutdown
= FALSE
;
1772 RtlInitializeCriticalSection( &group
->cs
);
1773 group
->cs
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": threadpool_group.cs");
1775 list_init( &group
->members
);
1777 TRACE( "allocated group %p\n", group
);
1780 return STATUS_SUCCESS
;
1783 /***********************************************************************
1784 * tp_group_shutdown (internal)
1786 * Marks the group object for shutdown.
1788 static void tp_group_shutdown( struct threadpool_group
*group
)
1790 group
->shutdown
= TRUE
;
1793 /***********************************************************************
1794 * tp_group_release (internal)
1796 * Releases a reference to a group object.
1798 static BOOL
tp_group_release( struct threadpool_group
*group
)
1800 if (interlocked_dec( &group
->refcount
))
1803 TRACE( "destroying group %p\n", group
);
1805 assert( group
->shutdown
);
1806 assert( list_empty( &group
->members
) );
1808 group
->cs
.DebugInfo
->Spare
[0] = 0;
1809 RtlDeleteCriticalSection( &group
->cs
);
1811 RtlFreeHeap( GetProcessHeap(), 0, group
);
1815 /***********************************************************************
1816 * tp_object_initialize (internal)
1818 * Initializes members of a threadpool object.
1820 static void tp_object_initialize( struct threadpool_object
*object
, struct threadpool
*pool
,
1821 PVOID userdata
, TP_CALLBACK_ENVIRON
*environment
)
1823 BOOL is_simple_callback
= (object
->type
== TP_OBJECT_TYPE_SIMPLE
);
1825 object
->refcount
= 1;
1826 object
->shutdown
= FALSE
;
1828 object
->pool
= pool
;
1829 object
->group
= NULL
;
1830 object
->userdata
= userdata
;
1831 object
->group_cancel_callback
= NULL
;
1832 object
->finalization_callback
= NULL
;
1833 object
->may_run_long
= 0;
1834 object
->race_dll
= NULL
;
1836 memset( &object
->group_entry
, 0, sizeof(object
->group_entry
) );
1837 object
->is_group_member
= FALSE
;
1839 memset( &object
->pool_entry
, 0, sizeof(object
->pool_entry
) );
1840 RtlInitializeConditionVariable( &object
->finished_event
);
1841 RtlInitializeConditionVariable( &object
->group_finished_event
);
1842 object
->num_pending_callbacks
= 0;
1843 object
->num_running_callbacks
= 0;
1844 object
->num_associated_callbacks
= 0;
1848 if (environment
->Version
!= 1)
1849 FIXME( "unsupported environment version %u\n", environment
->Version
);
1851 object
->group
= impl_from_TP_CLEANUP_GROUP( environment
->CleanupGroup
);
1852 object
->group_cancel_callback
= environment
->CleanupGroupCancelCallback
;
1853 object
->finalization_callback
= environment
->FinalizationCallback
;
1854 object
->may_run_long
= environment
->u
.s
.LongFunction
!= 0;
1855 object
->race_dll
= environment
->RaceDll
;
1857 if (environment
->ActivationContext
)
1858 FIXME( "activation context not supported yet\n" );
1860 if (environment
->u
.s
.Persistent
)
1861 FIXME( "persistent threads not supported yet\n" );
1864 if (object
->race_dll
)
1865 LdrAddRefDll( 0, object
->race_dll
);
1867 TRACE( "allocated object %p of type %u\n", object
, object
->type
);
1869 /* For simple callbacks we have to run tp_object_submit before adding this object
1870 * to the cleanup group. As soon as the cleanup group members are released ->shutdown
1871 * will be set, and tp_object_submit would fail with an assertion. */
1873 if (is_simple_callback
)
1874 tp_object_submit( object
, FALSE
);
1878 struct threadpool_group
*group
= object
->group
;
1879 interlocked_inc( &group
->refcount
);
1881 RtlEnterCriticalSection( &group
->cs
);
1882 list_add_tail( &group
->members
, &object
->group_entry
);
1883 object
->is_group_member
= TRUE
;
1884 RtlLeaveCriticalSection( &group
->cs
);
1887 if (is_simple_callback
)
1889 tp_object_shutdown( object
);
1890 tp_object_release( object
);
1894 /***********************************************************************
1895 * tp_object_submit (internal)
1897 * Submits a threadpool object to the associcated threadpool. This
1898 * function has to be VOID because TpPostWork can never fail on Windows.
1900 static void tp_object_submit( struct threadpool_object
*object
, BOOL signaled
)
1902 struct threadpool
*pool
= object
->pool
;
1903 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
1905 assert( !object
->shutdown
);
1906 assert( !pool
->shutdown
);
1908 RtlEnterCriticalSection( &pool
->cs
);
1910 /* Start new worker threads if required. */
1911 if (pool
->num_busy_workers
>= pool
->num_workers
&&
1912 pool
->num_workers
< pool
->max_workers
)
1915 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
1916 threadpool_worker_proc
, pool
, &thread
, NULL
);
1917 if (status
== STATUS_SUCCESS
)
1919 interlocked_inc( &pool
->refcount
);
1920 pool
->num_workers
++;
1921 pool
->num_busy_workers
++;
1926 /* Queue work item and increment refcount. */
1927 interlocked_inc( &object
->refcount
);
1928 if (!object
->num_pending_callbacks
++)
1929 list_add_tail( &pool
->pool
, &object
->pool_entry
);
1931 /* Count how often the object was signaled. */
1932 if (object
->type
== TP_OBJECT_TYPE_WAIT
&& signaled
)
1933 object
->u
.wait
.signaled
++;
1935 /* No new thread started - wake up one existing thread. */
1936 if (status
!= STATUS_SUCCESS
)
1938 assert( pool
->num_workers
> 0 );
1939 RtlWakeConditionVariable( &pool
->update_event
);
1942 RtlLeaveCriticalSection( &pool
->cs
);
1945 /***********************************************************************
1946 * tp_object_cancel (internal)
1948 * Cancels all currently pending callbacks for a specific object.
1950 static void tp_object_cancel( struct threadpool_object
*object
, BOOL group_cancel
, PVOID userdata
)
1952 struct threadpool
*pool
= object
->pool
;
1953 LONG pending_callbacks
= 0;
1955 RtlEnterCriticalSection( &pool
->cs
);
1956 if (object
->num_pending_callbacks
)
1958 pending_callbacks
= object
->num_pending_callbacks
;
1959 object
->num_pending_callbacks
= 0;
1960 list_remove( &object
->pool_entry
);
1962 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
1963 object
->u
.wait
.signaled
= 0;
1965 RtlLeaveCriticalSection( &pool
->cs
);
1967 /* Execute group cancellation callback if defined, and if this was actually a group cancel. */
1968 if (pending_callbacks
&& group_cancel
&& object
->group_cancel_callback
)
1970 TRACE( "executing group cancel callback %p(%p, %p)\n", object
->group_cancel_callback
, object
, userdata
);
1971 object
->group_cancel_callback( object
, userdata
);
1972 TRACE( "callback %p returned\n", object
->group_cancel_callback
);
1975 while (pending_callbacks
--)
1976 tp_object_release( object
);
1979 /***********************************************************************
1980 * tp_object_wait (internal)
1982 * Waits until all pending and running callbacks of a specific object
1983 * have been processed.
1985 static void tp_object_wait( struct threadpool_object
*object
, BOOL group_wait
)
1987 struct threadpool
*pool
= object
->pool
;
1989 RtlEnterCriticalSection( &pool
->cs
);
1992 while (object
->num_pending_callbacks
|| object
->num_running_callbacks
)
1993 RtlSleepConditionVariableCS( &object
->group_finished_event
, &pool
->cs
, NULL
);
1997 while (object
->num_pending_callbacks
|| object
->num_associated_callbacks
)
1998 RtlSleepConditionVariableCS( &object
->finished_event
, &pool
->cs
, NULL
);
2000 RtlLeaveCriticalSection( &pool
->cs
);
2003 /***********************************************************************
2004 * tp_object_shutdown (internal)
2006 * Marks a threadpool object for shutdown (which means that no further
2007 * tasks can be submitted).
2009 static void tp_object_shutdown( struct threadpool_object
*object
)
2011 if (object
->type
== TP_OBJECT_TYPE_TIMER
)
2012 tp_timerqueue_unlock( object
);
2013 else if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2014 tp_waitqueue_unlock( object
);
2016 object
->shutdown
= TRUE
;
2019 /***********************************************************************
2020 * tp_object_release (internal)
2022 * Releases a reference to a threadpool object.
2024 static BOOL
tp_object_release( struct threadpool_object
*object
)
2026 if (interlocked_dec( &object
->refcount
))
2029 TRACE( "destroying object %p of type %u\n", object
, object
->type
);
2031 assert( object
->shutdown
);
2032 assert( !object
->num_pending_callbacks
);
2033 assert( !object
->num_running_callbacks
);
2034 assert( !object
->num_associated_callbacks
);
2036 /* release reference to the group */
2039 struct threadpool_group
*group
= object
->group
;
2041 RtlEnterCriticalSection( &group
->cs
);
2042 if (object
->is_group_member
)
2044 list_remove( &object
->group_entry
);
2045 object
->is_group_member
= FALSE
;
2047 RtlLeaveCriticalSection( &group
->cs
);
2049 tp_group_release( group
);
2052 tp_threadpool_unlock( object
->pool
);
2054 if (object
->race_dll
)
2055 LdrUnloadDll( object
->race_dll
);
2057 RtlFreeHeap( GetProcessHeap(), 0, object
);
2061 /***********************************************************************
2062 * threadpool_worker_proc (internal)
2064 static void CALLBACK
threadpool_worker_proc( void *param
)
2066 TP_CALLBACK_INSTANCE
*callback_instance
;
2067 struct threadpool_instance instance
;
2068 struct threadpool
*pool
= param
;
2069 TP_WAIT_RESULT wait_result
= 0;
2070 LARGE_INTEGER timeout
;
2074 TRACE( "starting worker thread for pool %p\n", pool
);
2076 RtlEnterCriticalSection( &pool
->cs
);
2077 pool
->num_busy_workers
--;
2080 while ((ptr
= list_head( &pool
->pool
)))
2082 struct threadpool_object
*object
= LIST_ENTRY( ptr
, struct threadpool_object
, pool_entry
);
2083 assert( object
->num_pending_callbacks
> 0 );
2085 /* If further pending callbacks are queued, move the work item to
2086 * the end of the pool list. Otherwise remove it from the pool. */
2087 list_remove( &object
->pool_entry
);
2088 if (--object
->num_pending_callbacks
)
2089 list_add_tail( &pool
->pool
, &object
->pool_entry
);
2091 /* For wait objects check if they were signaled or have timed out. */
2092 if (object
->type
== TP_OBJECT_TYPE_WAIT
)
2094 wait_result
= object
->u
.wait
.signaled
? WAIT_OBJECT_0
: WAIT_TIMEOUT
;
2095 if (wait_result
== WAIT_OBJECT_0
) object
->u
.wait
.signaled
--;
2098 /* Leave critical section and do the actual callback. */
2099 object
->num_associated_callbacks
++;
2100 object
->num_running_callbacks
++;
2101 pool
->num_busy_workers
++;
2102 RtlLeaveCriticalSection( &pool
->cs
);
2104 /* Initialize threadpool instance struct. */
2105 callback_instance
= (TP_CALLBACK_INSTANCE
*)&instance
;
2106 instance
.object
= object
;
2107 instance
.threadid
= GetCurrentThreadId();
2108 instance
.associated
= TRUE
;
2109 instance
.may_run_long
= object
->may_run_long
;
2110 instance
.cleanup
.critical_section
= NULL
;
2111 instance
.cleanup
.mutex
= NULL
;
2112 instance
.cleanup
.semaphore
= NULL
;
2113 instance
.cleanup
.semaphore_count
= 0;
2114 instance
.cleanup
.event
= NULL
;
2115 instance
.cleanup
.library
= NULL
;
2117 switch (object
->type
)
2119 case TP_OBJECT_TYPE_SIMPLE
:
2121 TRACE( "executing simple callback %p(%p, %p)\n",
2122 object
->u
.simple
.callback
, callback_instance
, object
->userdata
);
2123 object
->u
.simple
.callback( callback_instance
, object
->userdata
);
2124 TRACE( "callback %p returned\n", object
->u
.simple
.callback
);
2128 case TP_OBJECT_TYPE_WORK
:
2130 TRACE( "executing work callback %p(%p, %p, %p)\n",
2131 object
->u
.work
.callback
, callback_instance
, object
->userdata
, object
);
2132 object
->u
.work
.callback( callback_instance
, object
->userdata
, (TP_WORK
*)object
);
2133 TRACE( "callback %p returned\n", object
->u
.work
.callback
);
2137 case TP_OBJECT_TYPE_TIMER
:
2139 TRACE( "executing timer callback %p(%p, %p, %p)\n",
2140 object
->u
.timer
.callback
, callback_instance
, object
->userdata
, object
);
2141 object
->u
.timer
.callback( callback_instance
, object
->userdata
, (TP_TIMER
*)object
);
2142 TRACE( "callback %p returned\n", object
->u
.timer
.callback
);
2146 case TP_OBJECT_TYPE_WAIT
:
2148 TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
2149 object
->u
.wait
.callback
, callback_instance
, object
->userdata
, object
, wait_result
);
2150 object
->u
.wait
.callback( callback_instance
, object
->userdata
, (TP_WAIT
*)object
, wait_result
);
2151 TRACE( "callback %p returned\n", object
->u
.wait
.callback
);
2160 /* Execute finalization callback. */
2161 if (object
->finalization_callback
)
2163 TRACE( "executing finalization callback %p(%p, %p)\n",
2164 object
->finalization_callback
, callback_instance
, object
->userdata
);
2165 object
->finalization_callback( callback_instance
, object
->userdata
);
2166 TRACE( "callback %p returned\n", object
->finalization_callback
);
2169 /* Execute cleanup tasks. */
2170 if (instance
.cleanup
.critical_section
)
2172 RtlLeaveCriticalSection( instance
.cleanup
.critical_section
);
2174 if (instance
.cleanup
.mutex
)
2176 status
= NtReleaseMutant( instance
.cleanup
.mutex
, NULL
);
2177 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2179 if (instance
.cleanup
.semaphore
)
2181 status
= NtReleaseSemaphore( instance
.cleanup
.semaphore
, instance
.cleanup
.semaphore_count
, NULL
);
2182 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2184 if (instance
.cleanup
.event
)
2186 status
= NtSetEvent( instance
.cleanup
.event
, NULL
);
2187 if (status
!= STATUS_SUCCESS
) goto skip_cleanup
;
2189 if (instance
.cleanup
.library
)
2191 LdrUnloadDll( instance
.cleanup
.library
);
2195 RtlEnterCriticalSection( &pool
->cs
);
2196 pool
->num_busy_workers
--;
2198 object
->num_running_callbacks
--;
2199 if (!object
->num_pending_callbacks
&& !object
->num_running_callbacks
)
2200 RtlWakeAllConditionVariable( &object
->group_finished_event
);
2202 if (instance
.associated
)
2204 object
->num_associated_callbacks
--;
2205 if (!object
->num_pending_callbacks
&& !object
->num_associated_callbacks
)
2206 RtlWakeAllConditionVariable( &object
->finished_event
);
2209 tp_object_release( object
);
2212 /* Shutdown worker thread if requested. */
2216 /* Wait for new tasks or until the timeout expires. A thread only terminates
2217 * when no new tasks are available, and the number of threads can be
2218 * decreased without violating the min_workers limit. An exception is when
2219 * min_workers == 0, then objcount is used to detect if the last thread
2220 * can be terminated. */
2221 timeout
.QuadPart
= (ULONGLONG
)THREADPOOL_WORKER_TIMEOUT
* -10000;
2222 if (RtlSleepConditionVariableCS( &pool
->update_event
, &pool
->cs
, &timeout
) == STATUS_TIMEOUT
&&
2223 !list_head( &pool
->pool
) && (pool
->num_workers
> max( pool
->min_workers
, 1 ) ||
2224 (!pool
->min_workers
&& !pool
->objcount
)))
2229 pool
->num_workers
--;
2230 RtlLeaveCriticalSection( &pool
->cs
);
2232 TRACE( "terminating worker thread for pool %p\n", pool
);
2233 tp_threadpool_release( pool
);
2234 RtlExitUserThread( 0 );
2237 /***********************************************************************
2238 * TpAllocCleanupGroup (NTDLL.@)
2240 NTSTATUS WINAPI
TpAllocCleanupGroup( TP_CLEANUP_GROUP
**out
)
2242 TRACE( "%p\n", out
);
2244 return tp_group_alloc( (struct threadpool_group
**)out
);
2247 /***********************************************************************
2248 * TpAllocPool (NTDLL.@)
2250 NTSTATUS WINAPI
TpAllocPool( TP_POOL
**out
, PVOID reserved
)
2252 TRACE( "%p %p\n", out
, reserved
);
2255 FIXME( "reserved argument is nonzero (%p)", reserved
);
2257 return tp_threadpool_alloc( (struct threadpool
**)out
);
2260 /***********************************************************************
2261 * TpAllocTimer (NTDLL.@)
2263 NTSTATUS WINAPI
TpAllocTimer( TP_TIMER
**out
, PTP_TIMER_CALLBACK callback
, PVOID userdata
,
2264 TP_CALLBACK_ENVIRON
*environment
)
2266 struct threadpool_object
*object
;
2267 struct threadpool
*pool
;
2270 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2272 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2274 return STATUS_NO_MEMORY
;
2276 status
= tp_threadpool_lock( &pool
, environment
);
2279 RtlFreeHeap( GetProcessHeap(), 0, object
);
2283 object
->type
= TP_OBJECT_TYPE_TIMER
;
2284 object
->u
.timer
.callback
= callback
;
2286 status
= tp_timerqueue_lock( object
);
2289 tp_threadpool_unlock( pool
);
2290 RtlFreeHeap( GetProcessHeap(), 0, object
);
2294 tp_object_initialize( object
, pool
, userdata
, environment
);
2296 *out
= (TP_TIMER
*)object
;
2297 return STATUS_SUCCESS
;
2300 /***********************************************************************
2301 * TpAllocWait (NTDLL.@)
2303 NTSTATUS WINAPI
TpAllocWait( TP_WAIT
**out
, PTP_WAIT_CALLBACK callback
, PVOID userdata
,
2304 TP_CALLBACK_ENVIRON
*environment
)
2306 struct threadpool_object
*object
;
2307 struct threadpool
*pool
;
2310 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2312 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2314 return STATUS_NO_MEMORY
;
2316 status
= tp_threadpool_lock( &pool
, environment
);
2319 RtlFreeHeap( GetProcessHeap(), 0, object
);
2323 object
->type
= TP_OBJECT_TYPE_WAIT
;
2324 object
->u
.wait
.callback
= callback
;
2326 status
= tp_waitqueue_lock( object
);
2329 tp_threadpool_unlock( pool
);
2330 RtlFreeHeap( GetProcessHeap(), 0, object
);
2334 tp_object_initialize( object
, pool
, userdata
, environment
);
2336 *out
= (TP_WAIT
*)object
;
2337 return STATUS_SUCCESS
;
2340 /***********************************************************************
2341 * TpAllocWork (NTDLL.@)
2343 NTSTATUS WINAPI
TpAllocWork( TP_WORK
**out
, PTP_WORK_CALLBACK callback
, PVOID userdata
,
2344 TP_CALLBACK_ENVIRON
*environment
)
2346 struct threadpool_object
*object
;
2347 struct threadpool
*pool
;
2350 TRACE( "%p %p %p %p\n", out
, callback
, userdata
, environment
);
2352 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2354 return STATUS_NO_MEMORY
;
2356 status
= tp_threadpool_lock( &pool
, environment
);
2359 RtlFreeHeap( GetProcessHeap(), 0, object
);
2363 object
->type
= TP_OBJECT_TYPE_WORK
;
2364 object
->u
.work
.callback
= callback
;
2365 tp_object_initialize( object
, pool
, userdata
, environment
);
2367 *out
= (TP_WORK
*)object
;
2368 return STATUS_SUCCESS
;
2371 /***********************************************************************
2372 * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
2374 VOID WINAPI
TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE
*instance
, CRITICAL_SECTION
*crit
)
2376 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2378 TRACE( "%p %p\n", instance
, crit
);
2380 if (!this->cleanup
.critical_section
)
2381 this->cleanup
.critical_section
= crit
;
2384 /***********************************************************************
2385 * TpCallbackMayRunLong (NTDLL.@)
2387 NTSTATUS WINAPI
TpCallbackMayRunLong( TP_CALLBACK_INSTANCE
*instance
)
2389 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2390 struct threadpool_object
*object
= this->object
;
2391 struct threadpool
*pool
;
2392 NTSTATUS status
= STATUS_SUCCESS
;
2394 TRACE( "%p\n", instance
);
2396 if (this->threadid
!= GetCurrentThreadId())
2398 ERR("called from wrong thread, ignoring\n");
2399 return STATUS_UNSUCCESSFUL
; /* FIXME */
2402 if (this->may_run_long
)
2403 return STATUS_SUCCESS
;
2405 pool
= object
->pool
;
2406 RtlEnterCriticalSection( &pool
->cs
);
2408 /* Start new worker threads if required. */
2409 if (pool
->num_busy_workers
>= pool
->num_workers
)
2411 if (pool
->num_workers
< pool
->max_workers
)
2414 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
2415 threadpool_worker_proc
, pool
, &thread
, NULL
);
2416 if (status
== STATUS_SUCCESS
)
2418 interlocked_inc( &pool
->refcount
);
2419 pool
->num_workers
++;
2420 pool
->num_busy_workers
++;
2426 status
= STATUS_TOO_MANY_THREADS
;
2430 RtlLeaveCriticalSection( &pool
->cs
);
2431 this->may_run_long
= TRUE
;
2435 /***********************************************************************
2436 * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
2438 VOID WINAPI
TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE mutex
)
2440 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2442 TRACE( "%p %p\n", instance
, mutex
);
2444 if (!this->cleanup
.mutex
)
2445 this->cleanup
.mutex
= mutex
;
2448 /***********************************************************************
2449 * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
2451 VOID WINAPI
TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE semaphore
, DWORD count
)
2453 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2455 TRACE( "%p %p %u\n", instance
, semaphore
, count
);
2457 if (!this->cleanup
.semaphore
)
2459 this->cleanup
.semaphore
= semaphore
;
2460 this->cleanup
.semaphore_count
= count
;
2464 /***********************************************************************
2465 * TpCallbackSetEventOnCompletion (NTDLL.@)
2467 VOID WINAPI
TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HANDLE event
)
2469 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2471 TRACE( "%p %p\n", instance
, event
);
2473 if (!this->cleanup
.event
)
2474 this->cleanup
.event
= event
;
2477 /***********************************************************************
2478 * TpCallbackUnloadDllOnCompletion (NTDLL.@)
2480 VOID WINAPI
TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE
*instance
, HMODULE module
)
2482 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2484 TRACE( "%p %p\n", instance
, module
);
2486 if (!this->cleanup
.library
)
2487 this->cleanup
.library
= module
;
2490 /***********************************************************************
2491 * TpDisassociateCallback (NTDLL.@)
2493 VOID WINAPI
TpDisassociateCallback( TP_CALLBACK_INSTANCE
*instance
)
2495 struct threadpool_instance
*this = impl_from_TP_CALLBACK_INSTANCE( instance
);
2496 struct threadpool_object
*object
= this->object
;
2497 struct threadpool
*pool
;
2499 TRACE( "%p\n", instance
);
2501 if (this->threadid
!= GetCurrentThreadId())
2503 ERR("called from wrong thread, ignoring\n");
2507 if (!this->associated
)
2510 pool
= object
->pool
;
2511 RtlEnterCriticalSection( &pool
->cs
);
2513 object
->num_associated_callbacks
--;
2514 if (!object
->num_pending_callbacks
&& !object
->num_associated_callbacks
)
2515 RtlWakeAllConditionVariable( &object
->finished_event
);
2517 RtlLeaveCriticalSection( &pool
->cs
);
2518 this->associated
= FALSE
;
2521 /***********************************************************************
2522 * TpIsTimerSet (NTDLL.@)
2524 BOOL WINAPI
TpIsTimerSet( TP_TIMER
*timer
)
2526 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2528 TRACE( "%p\n", timer
);
2530 return this->u
.timer
.timer_set
;
2533 /***********************************************************************
2534 * TpPostWork (NTDLL.@)
2536 VOID WINAPI
TpPostWork( TP_WORK
*work
)
2538 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2540 TRACE( "%p\n", work
);
2542 tp_object_submit( this, FALSE
);
2545 /***********************************************************************
2546 * TpReleaseCleanupGroup (NTDLL.@)
2548 VOID WINAPI
TpReleaseCleanupGroup( TP_CLEANUP_GROUP
*group
)
2550 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2552 TRACE( "%p\n", group
);
2554 tp_group_shutdown( this );
2555 tp_group_release( this );
2558 /***********************************************************************
2559 * TpReleaseCleanupGroupMembers (NTDLL.@)
2561 VOID WINAPI
TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP
*group
, BOOL cancel_pending
, PVOID userdata
)
2563 struct threadpool_group
*this = impl_from_TP_CLEANUP_GROUP( group
);
2564 struct threadpool_object
*object
, *next
;
2565 struct list members
;
2567 TRACE( "%p %u %p\n", group
, cancel_pending
, userdata
);
2569 RtlEnterCriticalSection( &this->cs
);
2571 /* Unset group, increase references, and mark objects for shutdown */
2572 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &this->members
, struct threadpool_object
, group_entry
)
2574 assert( object
->group
== this );
2575 assert( object
->is_group_member
);
2577 /* Simple callbacks are very special. The user doesn't hold any reference, so
2578 * they would be released too early. Add one additional temporary reference. */
2579 if (object
->type
== TP_OBJECT_TYPE_SIMPLE
)
2581 if (interlocked_inc( &object
->refcount
) == 1)
2583 /* Object is basically already destroyed, but group reference
2584 * was not deleted yet. We can safely ignore this object. */
2585 interlocked_dec( &object
->refcount
);
2586 list_remove( &object
->group_entry
);
2587 object
->is_group_member
= FALSE
;
2592 object
->is_group_member
= FALSE
;
2593 tp_object_shutdown( object
);
2596 /* Move members to a new temporary list */
2597 list_init( &members
);
2598 list_move_tail( &members
, &this->members
);
2600 RtlLeaveCriticalSection( &this->cs
);
2602 /* Cancel pending callbacks if requested */
2605 LIST_FOR_EACH_ENTRY( object
, &members
, struct threadpool_object
, group_entry
)
2607 tp_object_cancel( object
, TRUE
, userdata
);
2611 /* Wait for remaining callbacks to finish */
2612 LIST_FOR_EACH_ENTRY_SAFE( object
, next
, &members
, struct threadpool_object
, group_entry
)
2614 tp_object_wait( object
, TRUE
);
2615 tp_object_release( object
);
2619 /***********************************************************************
2620 * TpReleasePool (NTDLL.@)
2622 VOID WINAPI
TpReleasePool( TP_POOL
*pool
)
2624 struct threadpool
*this = impl_from_TP_POOL( pool
);
2626 TRACE( "%p\n", pool
);
2628 tp_threadpool_shutdown( this );
2629 tp_threadpool_release( this );
2632 /***********************************************************************
2633 * TpReleaseTimer (NTDLL.@)
2635 VOID WINAPI
TpReleaseTimer( TP_TIMER
*timer
)
2637 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2639 TRACE( "%p\n", timer
);
2641 tp_object_shutdown( this );
2642 tp_object_release( this );
2645 /***********************************************************************
2646 * TpReleaseWait (NTDLL.@)
2648 VOID WINAPI
TpReleaseWait( TP_WAIT
*wait
)
2650 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2652 TRACE( "%p\n", wait
);
2654 tp_object_shutdown( this );
2655 tp_object_release( this );
2658 /***********************************************************************
2659 * TpReleaseWork (NTDLL.@)
2661 VOID WINAPI
TpReleaseWork( TP_WORK
*work
)
2663 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2665 TRACE( "%p\n", work
);
2667 tp_object_shutdown( this );
2668 tp_object_release( this );
2671 /***********************************************************************
2672 * TpSetPoolMaxThreads (NTDLL.@)
2674 VOID WINAPI
TpSetPoolMaxThreads( TP_POOL
*pool
, DWORD maximum
)
2676 struct threadpool
*this = impl_from_TP_POOL( pool
);
2678 TRACE( "%p %u\n", pool
, maximum
);
2680 RtlEnterCriticalSection( &this->cs
);
2681 this->max_workers
= max( maximum
, 1 );
2682 this->min_workers
= min( this->min_workers
, this->max_workers
);
2683 RtlLeaveCriticalSection( &this->cs
);
2686 /***********************************************************************
2687 * TpSetPoolMinThreads (NTDLL.@)
2689 BOOL WINAPI
TpSetPoolMinThreads( TP_POOL
*pool
, DWORD minimum
)
2691 struct threadpool
*this = impl_from_TP_POOL( pool
);
2692 NTSTATUS status
= STATUS_SUCCESS
;
2694 TRACE( "%p %u\n", pool
, minimum
);
2696 RtlEnterCriticalSection( &this->cs
);
2698 while (this->num_workers
< minimum
)
2701 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
, NULL
, 0, 0,
2702 threadpool_worker_proc
, this, &thread
, NULL
);
2703 if (status
!= STATUS_SUCCESS
)
2706 interlocked_inc( &this->refcount
);
2707 this->num_workers
++;
2708 this->num_busy_workers
++;
2712 if (status
== STATUS_SUCCESS
)
2714 this->min_workers
= minimum
;
2715 this->max_workers
= max( this->min_workers
, this->max_workers
);
2718 RtlLeaveCriticalSection( &this->cs
);
2722 /***********************************************************************
2723 * TpSetTimer (NTDLL.@)
2725 VOID WINAPI
TpSetTimer( TP_TIMER
*timer
, LARGE_INTEGER
*timeout
, LONG period
, LONG window_length
)
2727 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2728 struct threadpool_object
*other_timer
;
2729 BOOL submit_timer
= FALSE
;
2730 ULONGLONG timestamp
;
2732 TRACE( "%p %p %u %u\n", timer
, timeout
, period
, window_length
);
2734 RtlEnterCriticalSection( &timerqueue
.cs
);
2736 assert( this->u
.timer
.timer_initialized
);
2737 this->u
.timer
.timer_set
= timeout
!= NULL
;
2739 /* Convert relative timeout to absolute timestamp and handle a timeout
2740 * of zero, which means that the timer is submitted immediately. */
2743 timestamp
= timeout
->QuadPart
;
2744 if ((LONGLONG
)timestamp
< 0)
2747 NtQuerySystemTime( &now
);
2748 timestamp
= now
.QuadPart
- timestamp
;
2750 else if (!timestamp
)
2757 NtQuerySystemTime( &now
);
2758 timestamp
= now
.QuadPart
+ (ULONGLONG
)period
* 10000;
2760 submit_timer
= TRUE
;
2764 /* First remove existing timeout. */
2765 if (this->u
.timer
.timer_pending
)
2767 list_remove( &this->u
.timer
.timer_entry
);
2768 this->u
.timer
.timer_pending
= FALSE
;
2771 /* If the timer was enabled, then add it back to the queue. */
2774 this->u
.timer
.timeout
= timestamp
;
2775 this->u
.timer
.period
= period
;
2776 this->u
.timer
.window_length
= window_length
;
2778 LIST_FOR_EACH_ENTRY( other_timer
, &timerqueue
.pending_timers
,
2779 struct threadpool_object
, u
.timer
.timer_entry
)
2781 assert( other_timer
->type
== TP_OBJECT_TYPE_TIMER
);
2782 if (this->u
.timer
.timeout
< other_timer
->u
.timer
.timeout
)
2785 list_add_before( &other_timer
->u
.timer
.timer_entry
, &this->u
.timer
.timer_entry
);
2787 /* Wake up the timer thread when the timeout has to be updated. */
2788 if (list_head( &timerqueue
.pending_timers
) == &this->u
.timer
.timer_entry
)
2789 RtlWakeAllConditionVariable( &timerqueue
.update_event
);
2791 this->u
.timer
.timer_pending
= TRUE
;
2794 RtlLeaveCriticalSection( &timerqueue
.cs
);
2797 tp_object_submit( this, FALSE
);
2800 /***********************************************************************
2801 * TpSetWait (NTDLL.@)
2803 VOID WINAPI
TpSetWait( TP_WAIT
*wait
, HANDLE handle
, LARGE_INTEGER
*timeout
)
2805 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2806 ULONGLONG timestamp
= TIMEOUT_INFINITE
;
2807 BOOL submit_wait
= FALSE
;
2809 TRACE( "%p %p %p\n", wait
, handle
, timeout
);
2811 RtlEnterCriticalSection( &waitqueue
.cs
);
2813 assert( this->u
.wait
.bucket
);
2814 this->u
.wait
.handle
= handle
;
2816 if (handle
|| this->u
.wait
.wait_pending
)
2818 struct waitqueue_bucket
*bucket
= this->u
.wait
.bucket
;
2819 list_remove( &this->u
.wait
.wait_entry
);
2821 /* Convert relative timeout to absolute timestamp. */
2822 if (handle
&& timeout
)
2824 timestamp
= timeout
->QuadPart
;
2825 if ((LONGLONG
)timestamp
< 0)
2828 NtQuerySystemTime( &now
);
2829 timestamp
= now
.QuadPart
- timestamp
;
2831 else if (!timestamp
)
2838 /* Add wait object back into one of the queues. */
2841 list_add_tail( &bucket
->waiting
, &this->u
.wait
.wait_entry
);
2842 this->u
.wait
.wait_pending
= TRUE
;
2843 this->u
.wait
.timeout
= timestamp
;
2847 list_add_tail( &bucket
->reserved
, &this->u
.wait
.wait_entry
);
2848 this->u
.wait
.wait_pending
= FALSE
;
2851 /* Wake up the wait queue thread. */
2852 NtSetEvent( bucket
->update_event
, NULL
);
2855 RtlLeaveCriticalSection( &waitqueue
.cs
);
2858 tp_object_submit( this, FALSE
);
2861 /***********************************************************************
2862 * TpSimpleTryPost (NTDLL.@)
2864 NTSTATUS WINAPI
TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback
, PVOID userdata
,
2865 TP_CALLBACK_ENVIRON
*environment
)
2867 struct threadpool_object
*object
;
2868 struct threadpool
*pool
;
2871 TRACE( "%p %p %p\n", callback
, userdata
, environment
);
2873 object
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object
) );
2875 return STATUS_NO_MEMORY
;
2877 status
= tp_threadpool_lock( &pool
, environment
);
2880 RtlFreeHeap( GetProcessHeap(), 0, object
);
2884 object
->type
= TP_OBJECT_TYPE_SIMPLE
;
2885 object
->u
.simple
.callback
= callback
;
2886 tp_object_initialize( object
, pool
, userdata
, environment
);
2888 return STATUS_SUCCESS
;
2891 /***********************************************************************
2892 * TpWaitForTimer (NTDLL.@)
2894 VOID WINAPI
TpWaitForTimer( TP_TIMER
*timer
, BOOL cancel_pending
)
2896 struct threadpool_object
*this = impl_from_TP_TIMER( timer
);
2898 TRACE( "%p %d\n", timer
, cancel_pending
);
2901 tp_object_cancel( this, FALSE
, NULL
);
2902 tp_object_wait( this, FALSE
);
2905 /***********************************************************************
2906 * TpWaitForWait (NTDLL.@)
2908 VOID WINAPI
TpWaitForWait( TP_WAIT
*wait
, BOOL cancel_pending
)
2910 struct threadpool_object
*this = impl_from_TP_WAIT( wait
);
2912 TRACE( "%p %d\n", wait
, cancel_pending
);
2915 tp_object_cancel( this, FALSE
, NULL
);
2916 tp_object_wait( this, FALSE
);
2919 /***********************************************************************
2920 * TpWaitForWork (NTDLL.@)
2922 VOID WINAPI
TpWaitForWork( TP_WORK
*work
, BOOL cancel_pending
)
2924 struct threadpool_object
*this = impl_from_TP_WORK( work
);
2926 TRACE( "%p %u\n", work
, cancel_pending
);
2929 tp_object_cancel( this, FALSE
, NULL
);
2930 tp_object_wait( this, FALSE
);