msi: Downgrade a few messages to WARN.
[wine.git] / dlls / ntdll / threadpool.c
blobe0541368b129c4a135038d06bac5259764435037
1 /*
2 * Thread pooling
4 * Copyright (c) 2006 Robert Shearman
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
21 #include "config.h"
22 #include "wine/port.h"
24 #include <stdarg.h>
25 #include <limits.h>
27 #define NONAMELESSUNION
28 #include "ntstatus.h"
29 #define WIN32_NO_STATUS
30 #include "winternl.h"
32 #include "wine/debug.h"
33 #include "wine/list.h"
35 #include "ntdll_misc.h"
37 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
39 #define WORKER_TIMEOUT 30000 /* 30 seconds */
41 static LONG num_workers;
42 static LONG num_work_items;
43 static LONG num_busy_workers;
45 static struct list work_item_list = LIST_INIT(work_item_list);
46 static HANDLE work_item_event;
48 static RTL_CRITICAL_SECTION threadpool_cs;
49 static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
51 0, 0, &threadpool_cs,
52 { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
53 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
55 static RTL_CRITICAL_SECTION threadpool_cs = { &critsect_debug, -1, 0, 0, 0, 0 };
57 static HANDLE compl_port = NULL;
58 static RTL_CRITICAL_SECTION threadpool_compl_cs;
59 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
61 0, 0, &threadpool_compl_cs,
62 { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
63 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
65 static RTL_CRITICAL_SECTION threadpool_compl_cs = { &critsect_compl_debug, -1, 0, 0, 0, 0 };
67 struct work_item
69 struct list entry;
70 PRTL_WORK_ITEM_ROUTINE function;
71 PVOID context;
74 static inline LONG interlocked_inc( PLONG dest )
76 return interlocked_xchg_add( dest, 1 ) + 1;
79 static inline LONG interlocked_dec( PLONG dest )
81 return interlocked_xchg_add( dest, -1 ) - 1;
84 static void WINAPI worker_thread_proc(void * param)
86 interlocked_inc(&num_workers);
88 /* free the work item memory sooner to reduce memory usage */
89 while (TRUE)
91 if (num_work_items > 0)
93 struct list *item;
94 RtlEnterCriticalSection(&threadpool_cs);
95 item = list_head(&work_item_list);
96 if (item)
98 struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry);
99 struct work_item work_item;
100 list_remove(&work_item_ptr->entry);
101 interlocked_dec(&num_work_items);
103 RtlLeaveCriticalSection(&threadpool_cs);
105 work_item = *work_item_ptr;
106 RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
108 TRACE("executing %p(%p)\n", work_item.function, work_item.context);
110 interlocked_inc(&num_busy_workers);
112 /* do the work */
113 work_item.function(work_item.context);
115 interlocked_dec(&num_busy_workers);
117 else
118 RtlLeaveCriticalSection(&threadpool_cs);
120 else
122 NTSTATUS status;
123 LARGE_INTEGER timeout;
124 timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
125 status = NtWaitForSingleObject(work_item_event, FALSE, &timeout);
126 if (status != STATUS_WAIT_0)
127 break;
131 interlocked_dec(&num_workers);
133 RtlExitUserThread(0);
135 /* never reached */
138 static NTSTATUS add_work_item_to_queue(struct work_item *work_item)
140 NTSTATUS status;
142 RtlEnterCriticalSection(&threadpool_cs);
143 list_add_tail(&work_item_list, &work_item->entry);
144 num_work_items++;
145 RtlLeaveCriticalSection(&threadpool_cs);
147 if (!work_item_event)
149 HANDLE sem;
150 status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, LONG_MAX);
151 if (interlocked_cmpxchg_ptr( &work_item_event, sem, 0 ))
152 NtClose(sem); /* somebody beat us to it */
154 else
155 status = NtReleaseSemaphore(work_item_event, 1, NULL);
157 return status;
160 /***********************************************************************
161 * RtlQueueWorkItem (NTDLL.@)
163 * Queues a work item into a thread in the thread pool.
165 * PARAMS
166 * Function [I] Work function to execute.
167 * Context [I] Context to pass to the work function when it is executed.
168 * Flags [I] Flags. See notes.
170 * RETURNS
171 * Success: STATUS_SUCCESS.
172 * Failure: Any NTSTATUS code.
174 * NOTES
175 * Flags can be one or more of the following:
176 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
177 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
178 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
179 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
180 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
182 NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, ULONG Flags)
184 HANDLE thread;
185 NTSTATUS status;
186 struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item));
188 if (!work_item)
189 return STATUS_NO_MEMORY;
191 work_item->function = Function;
192 work_item->context = Context;
194 if (Flags & ~WT_EXECUTELONGFUNCTION)
195 FIXME("Flags 0x%x not supported\n", Flags);
197 status = add_work_item_to_queue(work_item);
199 /* FIXME: tune this algorithm to not be as aggressive with creating threads
200 * if WT_EXECUTELONGFUNCTION isn't specified */
201 if ((status == STATUS_SUCCESS) &&
202 ((num_workers == 0) || (num_workers == num_busy_workers)))
204 status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
205 NULL, 0, 0,
206 worker_thread_proc, NULL, &thread, NULL );
207 if (status == STATUS_SUCCESS)
208 NtClose( thread );
210 /* NOTE: we don't care if we couldn't create the thread if there is at
211 * least one other available to process the request */
212 if ((num_workers > 0) && (status != STATUS_SUCCESS))
213 status = STATUS_SUCCESS;
216 if (status != STATUS_SUCCESS)
218 RtlEnterCriticalSection(&threadpool_cs);
220 interlocked_dec(&num_work_items);
221 list_remove(&work_item->entry);
222 RtlFreeHeap(GetProcessHeap(), 0, work_item);
224 RtlLeaveCriticalSection(&threadpool_cs);
226 return status;
229 return STATUS_SUCCESS;
232 /***********************************************************************
233 * iocp_poller - get completion events and run callbacks
235 static DWORD CALLBACK iocp_poller(LPVOID Arg)
237 while( TRUE )
239 PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
240 LPVOID overlapped;
241 IO_STATUS_BLOCK iosb;
242 NTSTATUS res = NtRemoveIoCompletion( compl_port, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
243 if (res)
245 ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
247 else
249 DWORD transferred = 0;
250 DWORD err = 0;
252 if (iosb.u.Status == STATUS_SUCCESS)
253 transferred = iosb.Information;
254 else
255 err = RtlNtStatusToDosError(iosb.u.Status);
257 callback( err, transferred, overlapped );
262 /***********************************************************************
263 * RtlSetIoCompletionCallback (NTDLL.@)
265 * Binds a handle to a thread pool's completion port, and possibly
266 * starts a non-I/O thread to monitor this port and call functions back.
268 * PARAMS
269 * FileHandle [I] Handle to bind to a completion port.
270 * Function [I] Callback function to call on I/O completions.
271 * Flags [I] Not used.
273 * RETURNS
274 * Success: STATUS_SUCCESS.
275 * Failure: Any NTSTATUS code.
278 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
280 IO_STATUS_BLOCK iosb;
281 FILE_COMPLETION_INFORMATION info;
283 if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
285 if (!compl_port)
287 NTSTATUS res = STATUS_SUCCESS;
289 RtlEnterCriticalSection(&threadpool_compl_cs);
290 if (!compl_port)
292 HANDLE cport;
294 res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
295 if (!res)
297 /* FIXME native can start additional threads in case of e.g. hung callback function. */
298 res = RtlQueueWorkItem( iocp_poller, NULL, WT_EXECUTEDEFAULT );
299 if (!res)
300 compl_port = cport;
301 else
302 NtClose( cport );
305 RtlLeaveCriticalSection(&threadpool_compl_cs);
306 if (res) return res;
309 info.CompletionPort = compl_port;
310 info.CompletionKey = (ULONG_PTR)Function;
312 return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
315 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
317 if (timeout == INFINITE) return NULL;
318 pTime->QuadPart = (ULONGLONG)timeout * -10000;
319 return pTime;
322 struct wait_work_item
324 HANDLE Object;
325 HANDLE CancelEvent;
326 WAITORTIMERCALLBACK Callback;
327 PVOID Context;
328 ULONG Milliseconds;
329 ULONG Flags;
330 HANDLE CompletionEvent;
331 LONG DeleteCount;
332 BOOLEAN CallbackInProgress;
335 static void delete_wait_work_item(struct wait_work_item *wait_work_item)
337 NtClose( wait_work_item->CancelEvent );
338 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
341 static DWORD CALLBACK wait_thread_proc(LPVOID Arg)
343 struct wait_work_item *wait_work_item = Arg;
344 NTSTATUS status;
345 BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) ? TRUE : FALSE;
346 HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent };
347 LARGE_INTEGER timeout;
348 HANDLE completion_event;
350 TRACE("\n");
352 while (TRUE)
354 status = NtWaitForMultipleObjects( 2, handles, FALSE, alertable,
355 get_nt_timeout( &timeout, wait_work_item->Milliseconds ) );
356 if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT)
358 BOOLEAN TimerOrWaitFired;
360 if (status == STATUS_WAIT_0)
362 TRACE( "object %p signaled, calling callback %p with context %p\n",
363 wait_work_item->Object, wait_work_item->Callback,
364 wait_work_item->Context );
365 TimerOrWaitFired = FALSE;
367 else
369 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
370 wait_work_item->Object, wait_work_item->Callback,
371 wait_work_item->Context );
372 TimerOrWaitFired = TRUE;
374 wait_work_item->CallbackInProgress = TRUE;
375 wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired );
376 wait_work_item->CallbackInProgress = FALSE;
378 if (wait_work_item->Flags & WT_EXECUTEONLYONCE)
379 break;
381 else
382 break;
385 completion_event = wait_work_item->CompletionEvent;
386 if (completion_event) NtSetEvent( completion_event, NULL );
388 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
389 delete_wait_work_item( wait_work_item );
391 return 0;
394 /***********************************************************************
395 * RtlRegisterWait (NTDLL.@)
397 * Registers a wait for a handle to become signaled.
399 * PARAMS
400 * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
401 * Object [I] Object to wait to become signaled.
402 * Callback [I] Callback function to execute when the wait times out or the handle is signaled.
403 * Context [I] Context to pass to the callback function when it is executed.
404 * Milliseconds [I] Number of milliseconds to wait before timing out.
405 * Flags [I] Flags. See notes.
407 * RETURNS
408 * Success: STATUS_SUCCESS.
409 * Failure: Any NTSTATUS code.
411 * NOTES
412 * Flags can be one or more of the following:
413 *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
414 *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
415 *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
416 *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
417 *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
419 NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object,
420 RTL_WAITORTIMERCALLBACKFUNC Callback,
421 PVOID Context, ULONG Milliseconds, ULONG Flags)
423 struct wait_work_item *wait_work_item;
424 NTSTATUS status;
426 TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags );
428 wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) );
429 if (!wait_work_item)
430 return STATUS_NO_MEMORY;
432 wait_work_item->Object = Object;
433 wait_work_item->Callback = Callback;
434 wait_work_item->Context = Context;
435 wait_work_item->Milliseconds = Milliseconds;
436 wait_work_item->Flags = Flags;
437 wait_work_item->CallbackInProgress = FALSE;
438 wait_work_item->DeleteCount = 0;
439 wait_work_item->CompletionEvent = NULL;
441 status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, TRUE, FALSE );
442 if (status != STATUS_SUCCESS)
444 RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
445 return status;
448 status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags & ~WT_EXECUTEONLYONCE );
449 if (status != STATUS_SUCCESS)
451 delete_wait_work_item( wait_work_item );
452 return status;
455 *NewWaitObject = wait_work_item;
456 return status;
459 /***********************************************************************
460 * RtlDeregisterWaitEx (NTDLL.@)
462 * Cancels a wait operation and frees the resources associated with calling
463 * RtlRegisterWait().
465 * PARAMS
466 * WaitObject [I] Handle to the wait object to free.
468 * RETURNS
469 * Success: STATUS_SUCCESS.
470 * Failure: Any NTSTATUS code.
472 NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent)
474 struct wait_work_item *wait_work_item = WaitHandle;
475 NTSTATUS status = STATUS_SUCCESS;
477 TRACE( "(%p)\n", WaitHandle );
479 NtSetEvent( wait_work_item->CancelEvent, NULL );
480 if (wait_work_item->CallbackInProgress)
482 if (CompletionEvent != NULL)
484 if (CompletionEvent == INVALID_HANDLE_VALUE)
486 status = NtCreateEvent( &CompletionEvent, EVENT_ALL_ACCESS, NULL, TRUE, FALSE );
487 if (status != STATUS_SUCCESS)
488 return status;
489 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
490 if (wait_work_item->CallbackInProgress)
491 NtWaitForSingleObject( CompletionEvent, FALSE, NULL );
492 NtClose( CompletionEvent );
494 else
496 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
497 if (wait_work_item->CallbackInProgress)
498 status = STATUS_PENDING;
501 else
502 status = STATUS_PENDING;
505 if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
507 status = STATUS_SUCCESS;
508 delete_wait_work_item( wait_work_item );
511 return status;
514 /***********************************************************************
515 * RtlDeregisterWait (NTDLL.@)
517 * Cancels a wait operation and frees the resources associated with calling
518 * RtlRegisterWait().
520 * PARAMS
521 * WaitObject [I] Handle to the wait object to free.
523 * RETURNS
524 * Success: STATUS_SUCCESS.
525 * Failure: Any NTSTATUS code.
527 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
529 return RtlDeregisterWaitEx(WaitHandle, NULL);