4 * Copyright (c) 2006 Robert Shearman
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #include "wine/port.h"
27 #define NONAMELESSUNION
29 #define WIN32_NO_STATUS
32 #include "wine/debug.h"
33 #include "wine/list.h"
35 #include "ntdll_misc.h"
37 WINE_DEFAULT_DEBUG_CHANNEL(threadpool
);
39 #define WORKER_TIMEOUT 30000 /* 30 seconds */
41 static LONG num_workers
;
42 static LONG num_work_items
;
43 static LONG num_busy_workers
;
45 static struct list work_item_list
= LIST_INIT(work_item_list
);
46 static HANDLE work_item_event
;
48 static RTL_CRITICAL_SECTION threadpool_cs
;
49 static RTL_CRITICAL_SECTION_DEBUG critsect_debug
=
52 { &critsect_debug
.ProcessLocksList
, &critsect_debug
.ProcessLocksList
},
53 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_cs") }
55 static RTL_CRITICAL_SECTION threadpool_cs
= { &critsect_debug
, -1, 0, 0, 0, 0 };
57 static HANDLE compl_port
= NULL
;
58 static RTL_CRITICAL_SECTION threadpool_compl_cs
;
59 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug
=
61 0, 0, &threadpool_compl_cs
,
62 { &critsect_compl_debug
.ProcessLocksList
, &critsect_compl_debug
.ProcessLocksList
},
63 0, 0, { (DWORD_PTR
)(__FILE__
": threadpool_compl_cs") }
65 static RTL_CRITICAL_SECTION threadpool_compl_cs
= { &critsect_compl_debug
, -1, 0, 0, 0, 0 };
70 PRTL_WORK_ITEM_ROUTINE function
;
74 static inline LONG
interlocked_inc( PLONG dest
)
76 return interlocked_xchg_add( dest
, 1 ) + 1;
79 static inline LONG
interlocked_dec( PLONG dest
)
81 return interlocked_xchg_add( dest
, -1 ) - 1;
84 static void WINAPI
worker_thread_proc(void * param
)
86 interlocked_inc(&num_workers
);
88 /* free the work item memory sooner to reduce memory usage */
91 if (num_work_items
> 0)
94 RtlEnterCriticalSection(&threadpool_cs
);
95 item
= list_head(&work_item_list
);
98 struct work_item
*work_item_ptr
= LIST_ENTRY(item
, struct work_item
, entry
);
99 struct work_item work_item
;
100 list_remove(&work_item_ptr
->entry
);
101 interlocked_dec(&num_work_items
);
103 RtlLeaveCriticalSection(&threadpool_cs
);
105 work_item
= *work_item_ptr
;
106 RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr
);
108 TRACE("executing %p(%p)\n", work_item
.function
, work_item
.context
);
110 interlocked_inc(&num_busy_workers
);
113 work_item
.function(work_item
.context
);
115 interlocked_dec(&num_busy_workers
);
118 RtlLeaveCriticalSection(&threadpool_cs
);
123 LARGE_INTEGER timeout
;
124 timeout
.QuadPart
= -(WORKER_TIMEOUT
* (ULONGLONG
)10000);
125 status
= NtWaitForSingleObject(work_item_event
, FALSE
, &timeout
);
126 if (status
!= STATUS_WAIT_0
)
131 interlocked_dec(&num_workers
);
133 RtlExitUserThread(0);
138 static NTSTATUS
add_work_item_to_queue(struct work_item
*work_item
)
142 RtlEnterCriticalSection(&threadpool_cs
);
143 list_add_tail(&work_item_list
, &work_item
->entry
);
145 RtlLeaveCriticalSection(&threadpool_cs
);
147 if (!work_item_event
)
150 status
= NtCreateSemaphore(&sem
, SEMAPHORE_ALL_ACCESS
, NULL
, 1, LONG_MAX
);
151 if (interlocked_cmpxchg_ptr( &work_item_event
, sem
, 0 ))
152 NtClose(sem
); /* somebody beat us to it */
155 status
= NtReleaseSemaphore(work_item_event
, 1, NULL
);
160 /***********************************************************************
161 * RtlQueueWorkItem (NTDLL.@)
163 * Queues a work item into a thread in the thread pool.
166 * Function [I] Work function to execute.
167 * Context [I] Context to pass to the work function when it is executed.
168 * Flags [I] Flags. See notes.
171 * Success: STATUS_SUCCESS.
172 * Failure: Any NTSTATUS code.
175 * Flags can be one or more of the following:
176 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
177 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
178 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
179 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
180 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
182 NTSTATUS WINAPI
RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function
, PVOID Context
, ULONG Flags
)
186 struct work_item
*work_item
= RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item
));
189 return STATUS_NO_MEMORY
;
191 work_item
->function
= Function
;
192 work_item
->context
= Context
;
194 if (Flags
& ~WT_EXECUTELONGFUNCTION
)
195 FIXME("Flags 0x%x not supported\n", Flags
);
197 status
= add_work_item_to_queue(work_item
);
199 /* FIXME: tune this algorithm to not be as aggressive with creating threads
200 * if WT_EXECUTELONGFUNCTION isn't specified */
201 if ((status
== STATUS_SUCCESS
) &&
202 ((num_workers
== 0) || (num_workers
== num_busy_workers
)))
204 status
= RtlCreateUserThread( GetCurrentProcess(), NULL
, FALSE
,
206 worker_thread_proc
, NULL
, &thread
, NULL
);
207 if (status
== STATUS_SUCCESS
)
210 /* NOTE: we don't care if we couldn't create the thread if there is at
211 * least one other available to process the request */
212 if ((num_workers
> 0) && (status
!= STATUS_SUCCESS
))
213 status
= STATUS_SUCCESS
;
216 if (status
!= STATUS_SUCCESS
)
218 RtlEnterCriticalSection(&threadpool_cs
);
220 interlocked_dec(&num_work_items
);
221 list_remove(&work_item
->entry
);
222 RtlFreeHeap(GetProcessHeap(), 0, work_item
);
224 RtlLeaveCriticalSection(&threadpool_cs
);
229 return STATUS_SUCCESS
;
232 /***********************************************************************
233 * iocp_poller - get completion events and run callbacks
235 static DWORD CALLBACK
iocp_poller(LPVOID Arg
)
239 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback
;
241 IO_STATUS_BLOCK iosb
;
242 NTSTATUS res
= NtRemoveIoCompletion( compl_port
, (PULONG_PTR
)&callback
, (PULONG_PTR
)&overlapped
, &iosb
, NULL
);
245 ERR("NtRemoveIoCompletion failed: 0x%x\n", res
);
249 DWORD transferred
= 0;
252 if (iosb
.u
.Status
== STATUS_SUCCESS
)
253 transferred
= iosb
.Information
;
255 err
= RtlNtStatusToDosError(iosb
.u
.Status
);
257 callback( err
, transferred
, overlapped
);
262 /***********************************************************************
263 * RtlSetIoCompletionCallback (NTDLL.@)
265 * Binds a handle to a thread pool's completion port, and possibly
266 * starts a non-I/O thread to monitor this port and call functions back.
269 * FileHandle [I] Handle to bind to a completion port.
270 * Function [I] Callback function to call on I/O completions.
271 * Flags [I] Not used.
274 * Success: STATUS_SUCCESS.
275 * Failure: Any NTSTATUS code.
278 NTSTATUS WINAPI
RtlSetIoCompletionCallback(HANDLE FileHandle
, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function
, ULONG Flags
)
280 IO_STATUS_BLOCK iosb
;
281 FILE_COMPLETION_INFORMATION info
;
283 if (Flags
) FIXME("Unknown value Flags=0x%x\n", Flags
);
287 NTSTATUS res
= STATUS_SUCCESS
;
289 RtlEnterCriticalSection(&threadpool_compl_cs
);
294 res
= NtCreateIoCompletion( &cport
, IO_COMPLETION_ALL_ACCESS
, NULL
, 0 );
297 /* FIXME native can start additional threads in case of e.g. hung callback function. */
298 res
= RtlQueueWorkItem( iocp_poller
, NULL
, WT_EXECUTEDEFAULT
);
305 RtlLeaveCriticalSection(&threadpool_compl_cs
);
309 info
.CompletionPort
= compl_port
;
310 info
.CompletionKey
= (ULONG_PTR
)Function
;
312 return NtSetInformationFile( FileHandle
, &iosb
, &info
, sizeof(info
), FileCompletionInformation
);
315 static inline PLARGE_INTEGER
get_nt_timeout( PLARGE_INTEGER pTime
, ULONG timeout
)
317 if (timeout
== INFINITE
) return NULL
;
318 pTime
->QuadPart
= (ULONGLONG
)timeout
* -10000;
322 struct wait_work_item
326 WAITORTIMERCALLBACK Callback
;
330 HANDLE CompletionEvent
;
332 BOOLEAN CallbackInProgress
;
335 static void delete_wait_work_item(struct wait_work_item
*wait_work_item
)
337 NtClose( wait_work_item
->CancelEvent
);
338 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
341 static DWORD CALLBACK
wait_thread_proc(LPVOID Arg
)
343 struct wait_work_item
*wait_work_item
= Arg
;
345 BOOLEAN alertable
= (wait_work_item
->Flags
& WT_EXECUTEINIOTHREAD
) ? TRUE
: FALSE
;
346 HANDLE handles
[2] = { wait_work_item
->Object
, wait_work_item
->CancelEvent
};
347 LARGE_INTEGER timeout
;
348 HANDLE completion_event
;
354 status
= NtWaitForMultipleObjects( 2, handles
, FALSE
, alertable
,
355 get_nt_timeout( &timeout
, wait_work_item
->Milliseconds
) );
356 if (status
== STATUS_WAIT_0
|| status
== STATUS_TIMEOUT
)
358 BOOLEAN TimerOrWaitFired
;
360 if (status
== STATUS_WAIT_0
)
362 TRACE( "object %p signaled, calling callback %p with context %p\n",
363 wait_work_item
->Object
, wait_work_item
->Callback
,
364 wait_work_item
->Context
);
365 TimerOrWaitFired
= FALSE
;
369 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
370 wait_work_item
->Object
, wait_work_item
->Callback
,
371 wait_work_item
->Context
);
372 TimerOrWaitFired
= TRUE
;
374 wait_work_item
->CallbackInProgress
= TRUE
;
375 wait_work_item
->Callback( wait_work_item
->Context
, TimerOrWaitFired
);
376 wait_work_item
->CallbackInProgress
= FALSE
;
378 if (wait_work_item
->Flags
& WT_EXECUTEONLYONCE
)
385 completion_event
= wait_work_item
->CompletionEvent
;
386 if (completion_event
) NtSetEvent( completion_event
, NULL
);
388 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
389 delete_wait_work_item( wait_work_item
);
394 /***********************************************************************
395 * RtlRegisterWait (NTDLL.@)
397 * Registers a wait for a handle to become signaled.
400 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
401 * Object [I] Object to wait to become signaled.
402 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
403 * Context [I] Context to pass to the callback function when it is executed.
404 * Milliseconds [I] Number of milliseconds to wait before timing out.
405 * Flags [I] Flags. See notes.
408 * Success: STATUS_SUCCESS.
409 * Failure: Any NTSTATUS code.
412 * Flags can be one or more of the following:
413 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
414 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
415 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
416 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
417 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
419 NTSTATUS WINAPI
RtlRegisterWait(PHANDLE NewWaitObject
, HANDLE Object
,
420 RTL_WAITORTIMERCALLBACKFUNC Callback
,
421 PVOID Context
, ULONG Milliseconds
, ULONG Flags
)
423 struct wait_work_item
*wait_work_item
;
426 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject
, Object
, Callback
, Context
, Milliseconds
, Flags
);
428 wait_work_item
= RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item
) );
430 return STATUS_NO_MEMORY
;
432 wait_work_item
->Object
= Object
;
433 wait_work_item
->Callback
= Callback
;
434 wait_work_item
->Context
= Context
;
435 wait_work_item
->Milliseconds
= Milliseconds
;
436 wait_work_item
->Flags
= Flags
;
437 wait_work_item
->CallbackInProgress
= FALSE
;
438 wait_work_item
->DeleteCount
= 0;
439 wait_work_item
->CompletionEvent
= NULL
;
441 status
= NtCreateEvent( &wait_work_item
->CancelEvent
, EVENT_ALL_ACCESS
, NULL
, TRUE
, FALSE
);
442 if (status
!= STATUS_SUCCESS
)
444 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item
);
448 status
= RtlQueueWorkItem( wait_thread_proc
, wait_work_item
, Flags
& ~WT_EXECUTEONLYONCE
);
449 if (status
!= STATUS_SUCCESS
)
451 delete_wait_work_item( wait_work_item
);
455 *NewWaitObject
= wait_work_item
;
459 /***********************************************************************
460 * RtlDeregisterWaitEx (NTDLL.@)
462 * Cancels a wait operation and frees the resources associated with calling
466 * WaitObject [I] Handle to the wait object to free.
469 * Success: STATUS_SUCCESS.
470 * Failure: Any NTSTATUS code.
472 NTSTATUS WINAPI
RtlDeregisterWaitEx(HANDLE WaitHandle
, HANDLE CompletionEvent
)
474 struct wait_work_item
*wait_work_item
= WaitHandle
;
475 NTSTATUS status
= STATUS_SUCCESS
;
477 TRACE( "(%p)\n", WaitHandle
);
479 NtSetEvent( wait_work_item
->CancelEvent
, NULL
);
480 if (wait_work_item
->CallbackInProgress
)
482 if (CompletionEvent
!= NULL
)
484 if (CompletionEvent
== INVALID_HANDLE_VALUE
)
486 status
= NtCreateEvent( &CompletionEvent
, EVENT_ALL_ACCESS
, NULL
, TRUE
, FALSE
);
487 if (status
!= STATUS_SUCCESS
)
489 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, CompletionEvent
);
490 if (wait_work_item
->CallbackInProgress
)
491 NtWaitForSingleObject( CompletionEvent
, FALSE
, NULL
);
492 NtClose( CompletionEvent
);
496 interlocked_xchg_ptr( &wait_work_item
->CompletionEvent
, CompletionEvent
);
497 if (wait_work_item
->CallbackInProgress
)
498 status
= STATUS_PENDING
;
502 status
= STATUS_PENDING
;
505 if (interlocked_inc( &wait_work_item
->DeleteCount
) == 2 )
507 status
= STATUS_SUCCESS
;
508 delete_wait_work_item( wait_work_item
);
514 /***********************************************************************
515 * RtlDeregisterWait (NTDLL.@)
517 * Cancels a wait operation and frees the resources associated with calling
521 * WaitObject [I] Handle to the wait object to free.
524 * Success: STATUS_SUCCESS.
525 * Failure: Any NTSTATUS code.
527 NTSTATUS WINAPI
RtlDeregisterWait(HANDLE WaitHandle
)
529 return RtlDeregisterWaitEx(WaitHandle
, NULL
);