Release 8.16.
[wine.git] / dlls / rtworkq / queue.c
blob1f82eb7ad1264795a436a5cb444a2f785e59def0
1 /*
2 * Copyright 2019-2020 Nikolay Sivov for CodeWeavers
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19 #include <assert.h>
21 #define COBJMACROS
22 #include "initguid.h"
23 #include "rtworkq.h"
24 #include "wine/debug.h"
25 #include "wine/list.h"
27 WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
29 #define FIRST_USER_QUEUE_HANDLE 5
30 #define MAX_USER_QUEUE_HANDLES 124
32 #define WAIT_ITEM_KEY_MASK (0x82000000)
33 #define SCHEDULED_ITEM_KEY_MASK (0x80000000)
35 static LONG next_item_key;
37 static RTWQWORKITEM_KEY get_item_key(DWORD mask, DWORD key)
39 return ((RTWQWORKITEM_KEY)mask << 32) | key;
42 static RTWQWORKITEM_KEY generate_item_key(DWORD mask)
44 return get_item_key(mask, InterlockedIncrement(&next_item_key));
47 struct queue_handle
49 void *obj;
50 LONG refcount;
51 WORD generation;
54 static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES];
55 static struct queue_handle *next_free_user_queue;
56 static struct queue_handle *next_unused_user_queue = user_queues;
57 static WORD queue_generation;
58 static DWORD shared_mt_queue;
60 static CRITICAL_SECTION queues_section;
61 static CRITICAL_SECTION_DEBUG queues_critsect_debug =
63 0, 0, &queues_section,
64 { &queues_critsect_debug.ProcessLocksList, &queues_critsect_debug.ProcessLocksList },
65 0, 0, { (DWORD_PTR)(__FILE__ ": queues_section") }
67 static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0, 0 };
69 static LONG platform_lock;
70 static CO_MTA_USAGE_COOKIE mta_cookie;
72 static struct queue_handle *get_queue_obj(DWORD handle)
74 unsigned int idx = HIWORD(handle) - FIRST_USER_QUEUE_HANDLE;
76 if (idx < MAX_USER_QUEUE_HANDLES && user_queues[idx].refcount)
78 if (LOWORD(handle) == user_queues[idx].generation)
79 return &user_queues[idx];
82 return NULL;
85 /* Should be kept in sync with corresponding MFASYNC_CALLBACK_ constants. */
86 enum rtwq_callback_queue_id
88 RTWQ_CALLBACK_QUEUE_UNDEFINED = 0x00000000,
89 RTWQ_CALLBACK_QUEUE_STANDARD = 0x00000001,
90 RTWQ_CALLBACK_QUEUE_RT = 0x00000002,
91 RTWQ_CALLBACK_QUEUE_IO = 0x00000003,
92 RTWQ_CALLBACK_QUEUE_TIMER = 0x00000004,
93 RTWQ_CALLBACK_QUEUE_MULTITHREADED = 0x00000005,
94 RTWQ_CALLBACK_QUEUE_LONG_FUNCTION = 0x00000007,
95 RTWQ_CALLBACK_QUEUE_PRIVATE_MASK = 0xffff0000,
96 RTWQ_CALLBACK_QUEUE_ALL = 0xffffffff,
99 /* Should be kept in sync with corresponding MFASYNC_ constants. */
100 enum rtwq_callback_flags
102 RTWQ_FAST_IO_PROCESSING_CALLBACK = 0x00000001,
103 RTWQ_SIGNAL_CALLBACK = 0x00000002,
104 RTWQ_BLOCKING_CALLBACK = 0x00000004,
105 RTWQ_REPLY_CALLBACK = 0x00000008,
106 RTWQ_LOCALIZE_REMOTE_CALLBACK = 0x00000010,
109 enum system_queue_index
111 SYS_QUEUE_STANDARD = 0,
112 SYS_QUEUE_RT,
113 SYS_QUEUE_IO,
114 SYS_QUEUE_TIMER,
115 SYS_QUEUE_MULTITHREADED,
116 SYS_QUEUE_DO_NOT_USE,
117 SYS_QUEUE_LONG_FUNCTION,
118 SYS_QUEUE_COUNT,
121 struct work_item
123 IUnknown IUnknown_iface;
124 LONG refcount;
125 struct list entry;
126 IRtwqAsyncResult *result;
127 IRtwqAsyncResult *reply_result;
128 struct queue *queue;
129 RTWQWORKITEM_KEY key;
130 LONG priority;
131 DWORD flags;
132 TP_WORK *work_object;
133 PTP_SIMPLE_CALLBACK finalization_callback;
134 union
136 TP_WAIT *wait_object;
137 TP_TIMER *timer_object;
138 } u;
141 static struct work_item *work_item_impl_from_IUnknown(IUnknown *iface)
143 return CONTAINING_RECORD(iface, struct work_item, IUnknown_iface);
146 static const TP_CALLBACK_PRIORITY priorities[] =
148 TP_CALLBACK_PRIORITY_HIGH,
149 TP_CALLBACK_PRIORITY_NORMAL,
150 TP_CALLBACK_PRIORITY_LOW,
153 struct queue;
154 struct queue_desc;
156 struct queue_ops
158 HRESULT (*init)(const struct queue_desc *desc, struct queue *queue);
159 BOOL (*shutdown)(struct queue *queue);
160 void (*submit)(struct queue *queue, struct work_item *item);
163 struct queue_desc
165 RTWQ_WORKQUEUE_TYPE queue_type;
166 const struct queue_ops *ops;
167 DWORD target_queue;
170 struct queue
172 IRtwqAsyncCallback IRtwqAsyncCallback_iface;
173 const struct queue_ops *ops;
174 TP_POOL *pool;
175 TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
176 CRITICAL_SECTION cs;
177 struct list pending_items;
178 DWORD id;
179 /* Data used for serial queues only. */
180 PTP_SIMPLE_CALLBACK finalization_callback;
181 DWORD target_queue;
184 static void shutdown_queue(struct queue *queue);
186 static HRESULT lock_user_queue(DWORD queue)
188 HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
189 struct queue_handle *entry;
191 if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
192 return S_OK;
194 EnterCriticalSection(&queues_section);
195 entry = get_queue_obj(queue);
196 if (entry && entry->refcount)
198 entry->refcount++;
199 hr = S_OK;
201 LeaveCriticalSection(&queues_section);
202 return hr;
205 static HRESULT unlock_user_queue(DWORD queue)
207 HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
208 struct queue_handle *entry;
210 if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
211 return S_OK;
213 EnterCriticalSection(&queues_section);
214 entry = get_queue_obj(queue);
215 if (entry && entry->refcount)
217 if (--entry->refcount == 0)
219 if (shared_mt_queue == queue) shared_mt_queue = 0;
220 shutdown_queue((struct queue *)entry->obj);
221 free(entry->obj);
222 entry->obj = next_free_user_queue;
223 next_free_user_queue = entry;
225 hr = S_OK;
227 LeaveCriticalSection(&queues_section);
228 return hr;
231 static struct queue *queue_impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback *iface)
233 return CONTAINING_RECORD(iface, struct queue, IRtwqAsyncCallback_iface);
236 static HRESULT WINAPI queue_serial_callback_QueryInterface(IRtwqAsyncCallback *iface, REFIID riid, void **obj)
238 if (IsEqualIID(riid, &IID_IRtwqAsyncCallback) ||
239 IsEqualIID(riid, &IID_IUnknown))
241 *obj = iface;
242 IRtwqAsyncCallback_AddRef(iface);
243 return S_OK;
246 *obj = NULL;
247 return E_NOINTERFACE;
250 static ULONG WINAPI queue_serial_callback_AddRef(IRtwqAsyncCallback *iface)
252 return 2;
255 static ULONG WINAPI queue_serial_callback_Release(IRtwqAsyncCallback *iface)
257 return 1;
260 static HRESULT WINAPI queue_serial_callback_GetParameters(IRtwqAsyncCallback *iface, DWORD *flags, DWORD *queue_id)
262 struct queue *queue = queue_impl_from_IRtwqAsyncCallback(iface);
264 *flags = 0;
265 *queue_id = queue->id;
267 return S_OK;
270 static HRESULT WINAPI queue_serial_callback_Invoke(IRtwqAsyncCallback *iface, IRtwqAsyncResult *result)
272 /* Reply callback won't be called in a regular way, pending items and chained queues will make it
273 unnecessary complicated to reach actual work queue that's able to execute this item. Instead
274 serial queues are cleaned up right away on submit(). */
275 return S_OK;
278 static const IRtwqAsyncCallbackVtbl queue_serial_callback_vtbl =
280 queue_serial_callback_QueryInterface,
281 queue_serial_callback_AddRef,
282 queue_serial_callback_Release,
283 queue_serial_callback_GetParameters,
284 queue_serial_callback_Invoke,
287 static struct queue system_queues[SYS_QUEUE_COUNT];
289 static struct queue *get_system_queue(DWORD queue_id)
291 switch (queue_id)
293 case RTWQ_CALLBACK_QUEUE_STANDARD:
294 case RTWQ_CALLBACK_QUEUE_RT:
295 case RTWQ_CALLBACK_QUEUE_IO:
296 case RTWQ_CALLBACK_QUEUE_TIMER:
297 case RTWQ_CALLBACK_QUEUE_MULTITHREADED:
298 case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION:
299 return &system_queues[queue_id - 1];
300 default:
301 return NULL;
305 static HRESULT grab_queue(DWORD queue_id, struct queue **ret);
307 static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
311 static HRESULT pool_queue_init(const struct queue_desc *desc, struct queue *queue)
313 TP_CALLBACK_ENVIRON_V3 env;
314 unsigned int max_thread, i;
316 queue->pool = CreateThreadpool(NULL);
318 memset(&env, 0, sizeof(env));
319 env.Version = 3;
320 env.Size = sizeof(env);
321 env.Pool = queue->pool;
322 env.CleanupGroup = CreateThreadpoolCleanupGroup();
323 env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
324 env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
325 for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
327 queue->envs[i] = env;
328 queue->envs[i].CallbackPriority = priorities[i];
330 list_init(&queue->pending_items);
331 InitializeCriticalSection(&queue->cs);
333 max_thread = (desc->queue_type == RTWQ_STANDARD_WORKQUEUE || desc->queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4;
335 SetThreadpoolThreadMinimum(queue->pool, 1);
336 SetThreadpoolThreadMaximum(queue->pool, max_thread);
338 if (desc->queue_type == RTWQ_WINDOW_WORKQUEUE)
339 FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
341 return S_OK;
344 static BOOL pool_queue_shutdown(struct queue *queue)
346 if (!queue->pool)
347 return FALSE;
349 CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
350 CloseThreadpool(queue->pool);
351 queue->pool = NULL;
353 return TRUE;
356 static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
358 struct work_item *item = context;
359 RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result;
361 TRACE("result object %p.\n", result);
363 /* Submitting from serial queue in reply mode, use different result object acting as receipt token.
364 It's submitted to user callback still, but when invoked, special serial queue callback will be used
365 to ensure correct destination queue. */
367 IRtwqAsyncCallback_Invoke(result->pCallback, item->reply_result ? item->reply_result : item->result);
369 IUnknown_Release(&item->IUnknown_iface);
372 static void pool_queue_submit(struct queue *queue, struct work_item *item)
374 TP_CALLBACK_PRIORITY callback_priority;
375 TP_CALLBACK_ENVIRON_V3 env;
377 if (item->priority == 0)
378 callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
379 else if (item->priority < 0)
380 callback_priority = TP_CALLBACK_PRIORITY_LOW;
381 else
382 callback_priority = TP_CALLBACK_PRIORITY_HIGH;
384 env = queue->envs[callback_priority];
385 env.FinalizationCallback = item->finalization_callback;
386 /* Worker pool callback will release one reference. Grab one more to keep object alive when
387 we need finalization callback. */
388 if (item->finalization_callback)
389 IUnknown_AddRef(&item->IUnknown_iface);
390 item->work_object = CreateThreadpoolWork(standard_queue_worker, item, (TP_CALLBACK_ENVIRON *)&env);
391 SubmitThreadpoolWork(item->work_object);
393 TRACE("dispatched %p.\n", item->result);
396 static const struct queue_ops pool_queue_ops =
398 pool_queue_init,
399 pool_queue_shutdown,
400 pool_queue_submit,
403 static struct work_item * serial_queue_get_next(struct queue *queue, struct work_item *item)
405 struct work_item *next_item = NULL;
407 list_remove(&item->entry);
408 if (!list_empty(&item->queue->pending_items))
409 next_item = LIST_ENTRY(list_head(&item->queue->pending_items), struct work_item, entry);
411 return next_item;
414 static void CALLBACK serial_queue_finalization_callback(PTP_CALLBACK_INSTANCE instance, void *user_data)
416 struct work_item *item = (struct work_item *)user_data, *next_item;
417 struct queue *target_queue, *queue = item->queue;
418 HRESULT hr;
420 EnterCriticalSection(&queue->cs);
422 if ((next_item = serial_queue_get_next(queue, item)))
424 if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue)))
425 target_queue->ops->submit(target_queue, next_item);
426 else
427 WARN("Failed to grab queue for id %#lx, hr %#lx.\n", queue->target_queue, hr);
430 LeaveCriticalSection(&queue->cs);
432 IUnknown_Release(&item->IUnknown_iface);
435 static HRESULT serial_queue_init(const struct queue_desc *desc, struct queue *queue)
437 queue->IRtwqAsyncCallback_iface.lpVtbl = &queue_serial_callback_vtbl;
438 queue->target_queue = desc->target_queue;
439 lock_user_queue(queue->target_queue);
440 queue->finalization_callback = serial_queue_finalization_callback;
442 return S_OK;
445 static BOOL serial_queue_shutdown(struct queue *queue)
447 unlock_user_queue(queue->target_queue);
449 return TRUE;
452 static struct work_item * serial_queue_is_ack_token(struct queue *queue, struct work_item *item)
454 RTWQASYNCRESULT *async_result = (RTWQASYNCRESULT *)item->result;
455 struct work_item *head;
457 if (list_empty(&queue->pending_items))
458 return NULL;
460 head = LIST_ENTRY(list_head(&queue->pending_items), struct work_item, entry);
461 if (head->reply_result == item->result && async_result->pCallback == &queue->IRtwqAsyncCallback_iface)
462 return head;
464 return NULL;
467 static void serial_queue_submit(struct queue *queue, struct work_item *item)
469 struct work_item *head, *next_item = NULL;
470 struct queue *target_queue;
471 HRESULT hr;
473 /* In reply mode queue will advance when 'reply_result' is invoked, in regular mode it will advance automatically,
474 via finalization callback. */
476 if (item->flags & RTWQ_REPLY_CALLBACK)
478 if (FAILED(hr = RtwqCreateAsyncResult(NULL, &queue->IRtwqAsyncCallback_iface, NULL, &item->reply_result)))
479 WARN("Failed to create reply object, hr %#lx.\n", hr);
481 else
482 item->finalization_callback = queue->finalization_callback;
484 /* Serial queues could be chained together, detach from current queue before transitioning item to this one.
485 Items are not detached when submitted to pool queues, because pool queues won't forward them further. */
486 EnterCriticalSection(&item->queue->cs);
487 list_remove(&item->entry);
488 LeaveCriticalSection(&item->queue->cs);
490 EnterCriticalSection(&queue->cs);
492 item->queue = queue;
494 if ((head = serial_queue_is_ack_token(queue, item)))
496 /* Ack receipt token - pop waiting item, advance. */
497 next_item = serial_queue_get_next(queue, head);
498 IUnknown_Release(&head->IUnknown_iface);
500 else
502 if (list_empty(&queue->pending_items))
503 next_item = item;
504 list_add_tail(&queue->pending_items, &item->entry);
505 IUnknown_AddRef(&item->IUnknown_iface);
508 if (next_item)
510 if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue)))
511 target_queue->ops->submit(target_queue, next_item);
512 else
513 WARN("Failed to grab queue for id %#lx, hr %#lx.\n", queue->target_queue, hr);
516 LeaveCriticalSection(&queue->cs);
519 static const struct queue_ops serial_queue_ops =
521 serial_queue_init,
522 serial_queue_shutdown,
523 serial_queue_submit,
526 static HRESULT WINAPI work_item_QueryInterface(IUnknown *iface, REFIID riid, void **obj)
528 if (IsEqualIID(riid, &IID_IUnknown))
530 *obj = iface;
531 IUnknown_AddRef(iface);
532 return S_OK;
535 *obj = NULL;
536 return E_NOINTERFACE;
539 static ULONG WINAPI work_item_AddRef(IUnknown *iface)
541 struct work_item *item = work_item_impl_from_IUnknown(iface);
542 return InterlockedIncrement(&item->refcount);
545 static ULONG WINAPI work_item_Release(IUnknown *iface)
547 struct work_item *item = work_item_impl_from_IUnknown(iface);
548 ULONG refcount = InterlockedDecrement(&item->refcount);
550 if (!refcount)
552 if (item->work_object)
553 CloseThreadpoolWork(item->work_object);
554 if (item->reply_result)
555 IRtwqAsyncResult_Release(item->reply_result);
556 IRtwqAsyncResult_Release(item->result);
557 free(item);
560 return refcount;
563 static const IUnknownVtbl work_item_vtbl =
565 work_item_QueryInterface,
566 work_item_AddRef,
567 work_item_Release,
570 static struct work_item * alloc_work_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
572 RTWQASYNCRESULT *async_result = (RTWQASYNCRESULT *)result;
573 DWORD flags = 0, queue_id = 0;
574 struct work_item *item;
576 item = calloc(1, sizeof(*item));
578 item->IUnknown_iface.lpVtbl = &work_item_vtbl;
579 item->result = result;
580 IRtwqAsyncResult_AddRef(item->result);
581 item->refcount = 1;
582 item->queue = queue;
583 list_init(&item->entry);
584 item->priority = priority;
586 if (SUCCEEDED(IRtwqAsyncCallback_GetParameters(async_result->pCallback, &flags, &queue_id)))
587 item->flags = flags;
589 return item;
592 static void init_work_queue(const struct queue_desc *desc, struct queue *queue)
594 assert(desc->ops != NULL);
596 queue->ops = desc->ops;
597 if (SUCCEEDED(queue->ops->init(desc, queue)))
599 list_init(&queue->pending_items);
600 InitializeCriticalSection(&queue->cs);
604 static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
606 struct queue *queue = get_system_queue(queue_id);
607 RTWQ_WORKQUEUE_TYPE queue_type;
608 struct queue_handle *entry;
610 *ret = NULL;
612 if (!system_queues[SYS_QUEUE_STANDARD].pool)
613 return RTWQ_E_SHUTDOWN;
615 if (queue && queue->pool)
617 *ret = queue;
618 return S_OK;
620 else if (queue)
622 struct queue_desc desc;
624 EnterCriticalSection(&queues_section);
625 switch (queue_id)
627 case RTWQ_CALLBACK_QUEUE_IO:
628 case RTWQ_CALLBACK_QUEUE_MULTITHREADED:
629 case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION:
630 queue_type = RTWQ_MULTITHREADED_WORKQUEUE;
631 break;
632 default:
633 queue_type = RTWQ_STANDARD_WORKQUEUE;
636 desc.queue_type = queue_type;
637 desc.ops = &pool_queue_ops;
638 desc.target_queue = 0;
639 init_work_queue(&desc, queue);
640 LeaveCriticalSection(&queues_section);
641 *ret = queue;
642 return S_OK;
645 /* Handles user queues. */
646 if ((entry = get_queue_obj(queue_id)))
647 *ret = entry->obj;
649 return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE;
652 static void shutdown_queue(struct queue *queue)
654 struct work_item *item, *item2;
656 if (!queue->ops || !queue->ops->shutdown(queue))
657 return;
659 EnterCriticalSection(&queue->cs);
660 LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
662 list_remove(&item->entry);
663 IUnknown_Release(&item->IUnknown_iface);
665 LeaveCriticalSection(&queue->cs);
667 DeleteCriticalSection(&queue->cs);
669 memset(queue, 0, sizeof(*queue));
672 static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
674 struct work_item *item;
676 if (!(item = alloc_work_item(queue, priority, result)))
677 return E_OUTOFMEMORY;
679 queue->ops->submit(queue, item);
681 return S_OK;
684 static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IRtwqAsyncResult *result)
686 struct queue *queue;
687 HRESULT hr;
689 if (FAILED(hr = grab_queue(queue_id, &queue)))
690 return hr;
692 return queue_submit_item(queue, priority, result);
695 static HRESULT invoke_async_callback(IRtwqAsyncResult *result)
697 RTWQASYNCRESULT *result_data = (RTWQASYNCRESULT *)result;
698 DWORD queue = RTWQ_CALLBACK_QUEUE_STANDARD, flags;
699 HRESULT hr;
701 if (FAILED(IRtwqAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue)))
702 queue = RTWQ_CALLBACK_QUEUE_STANDARD;
704 if (FAILED(lock_user_queue(queue)))
705 queue = RTWQ_CALLBACK_QUEUE_STANDARD;
707 hr = queue_put_work_item(queue, 0, result);
709 unlock_user_queue(queue);
711 return hr;
714 static void queue_release_pending_item(struct work_item *item)
716 EnterCriticalSection(&item->queue->cs);
717 if (item->key)
719 list_remove(&item->entry);
720 item->key = 0;
721 IUnknown_Release(&item->IUnknown_iface);
723 LeaveCriticalSection(&item->queue->cs);
726 static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
727 TP_WAIT_RESULT wait_result)
729 struct work_item *item = context;
731 TRACE("result object %p.\n", item->result);
733 invoke_async_callback(item->result);
735 IUnknown_Release(&item->IUnknown_iface);
738 static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
739 TP_WAIT_RESULT wait_result)
741 struct work_item *item = context;
743 TRACE("result object %p.\n", item->result);
745 queue_release_pending_item(item);
747 invoke_async_callback(item->result);
749 IUnknown_Release(&item->IUnknown_iface);
752 static void CALLBACK scheduled_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer)
754 struct work_item *item = context;
756 TRACE("result object %p.\n", item->result);
758 invoke_async_callback(item->result);
760 IUnknown_Release(&item->IUnknown_iface);
763 static void CALLBACK scheduled_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer)
765 struct work_item *item = context;
767 TRACE("result object %p.\n", item->result);
769 queue_release_pending_item(item);
771 invoke_async_callback(item->result);
773 IUnknown_Release(&item->IUnknown_iface);
776 static void CALLBACK periodic_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer)
778 struct work_item *item = context;
780 IUnknown_AddRef(&item->IUnknown_iface);
782 invoke_async_callback(item->result);
784 IUnknown_Release(&item->IUnknown_iface);
787 static void queue_mark_item_pending(DWORD mask, struct work_item *item, RTWQWORKITEM_KEY *key)
789 *key = generate_item_key(mask);
790 item->key = *key;
792 EnterCriticalSection(&item->queue->cs);
793 list_add_tail(&item->queue->pending_items, &item->entry);
794 IUnknown_AddRef(&item->IUnknown_iface);
795 LeaveCriticalSection(&item->queue->cs);
798 static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IRtwqAsyncResult *result,
799 RTWQWORKITEM_KEY *key)
801 PTP_WAIT_CALLBACK callback;
802 struct work_item *item;
804 if (!(item = alloc_work_item(queue, priority, result)))
805 return E_OUTOFMEMORY;
807 if (key)
809 queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key);
810 callback = waiting_item_cancelable_callback;
812 else
813 callback = waiting_item_callback;
815 item->u.wait_object = CreateThreadpoolWait(callback, item,
816 (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]);
817 SetThreadpoolWait(item->u.wait_object, event, NULL);
819 TRACE("dispatched %p.\n", result);
821 return S_OK;
824 static HRESULT queue_submit_timer(struct queue *queue, IRtwqAsyncResult *result, INT64 timeout, DWORD period,
825 RTWQWORKITEM_KEY *key)
827 PTP_TIMER_CALLBACK callback;
828 struct work_item *item;
829 FILETIME filetime;
830 LARGE_INTEGER t;
832 if (!(item = alloc_work_item(queue, 0, result)))
833 return E_OUTOFMEMORY;
835 if (key)
837 queue_mark_item_pending(SCHEDULED_ITEM_KEY_MASK, item, key);
840 if (period)
841 callback = periodic_item_callback;
842 else
843 callback = key ? scheduled_item_cancelable_callback : scheduled_item_callback;
845 t.QuadPart = timeout * 1000 * 10;
846 filetime.dwLowDateTime = t.u.LowPart;
847 filetime.dwHighDateTime = t.u.HighPart;
849 item->u.timer_object = CreateThreadpoolTimer(callback, item,
850 (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]);
851 SetThreadpoolTimer(item->u.timer_object, &filetime, period, 0);
853 TRACE("dispatched %p.\n", result);
855 return S_OK;
858 static HRESULT queue_cancel_item(struct queue *queue, RTWQWORKITEM_KEY key)
860 HRESULT hr = RTWQ_E_NOT_FOUND;
861 struct work_item *item;
863 EnterCriticalSection(&queue->cs);
864 LIST_FOR_EACH_ENTRY(item, &queue->pending_items, struct work_item, entry)
866 if (item->key == key)
868 key >>= 32;
869 if ((key & WAIT_ITEM_KEY_MASK) == WAIT_ITEM_KEY_MASK)
871 IRtwqAsyncResult_SetStatus(item->result, RTWQ_E_OPERATION_CANCELLED);
872 invoke_async_callback(item->result);
873 CloseThreadpoolWait(item->u.wait_object);
875 else if ((key & SCHEDULED_ITEM_KEY_MASK) == SCHEDULED_ITEM_KEY_MASK)
876 CloseThreadpoolTimer(item->u.timer_object);
877 else
878 WARN("Unknown item key mask %#I64x.\n", key);
879 queue_release_pending_item(item);
880 hr = S_OK;
881 break;
884 LeaveCriticalSection(&queue->cs);
886 return hr;
889 static HRESULT alloc_user_queue(const struct queue_desc *desc, DWORD *queue_id)
891 struct queue_handle *entry;
892 struct queue *queue;
893 unsigned int idx;
895 *queue_id = RTWQ_CALLBACK_QUEUE_UNDEFINED;
897 if (platform_lock <= 0)
898 return RTWQ_E_SHUTDOWN;
900 if (!(queue = calloc(1, sizeof(*queue))))
901 return E_OUTOFMEMORY;
903 init_work_queue(desc, queue);
905 EnterCriticalSection(&queues_section);
907 entry = next_free_user_queue;
908 if (entry)
909 next_free_user_queue = entry->obj;
910 else if (next_unused_user_queue < user_queues + MAX_USER_QUEUE_HANDLES)
911 entry = next_unused_user_queue++;
912 else
914 LeaveCriticalSection(&queues_section);
915 free(queue);
916 WARN("Out of user queue handles.\n");
917 return E_OUTOFMEMORY;
920 entry->refcount = 1;
921 entry->obj = queue;
922 if (++queue_generation == 0xffff) queue_generation = 1;
923 entry->generation = queue_generation;
924 idx = entry - user_queues + FIRST_USER_QUEUE_HANDLE;
925 *queue_id = (idx << 16) | entry->generation;
927 LeaveCriticalSection(&queues_section);
929 return S_OK;
932 struct async_result
934 RTWQASYNCRESULT result;
935 LONG refcount;
936 IUnknown *object;
937 IUnknown *state;
940 static struct async_result *impl_from_IRtwqAsyncResult(IRtwqAsyncResult *iface)
942 return CONTAINING_RECORD(iface, struct async_result, result.AsyncResult);
945 static HRESULT WINAPI async_result_QueryInterface(IRtwqAsyncResult *iface, REFIID riid, void **obj)
947 TRACE("%p, %s, %p.\n", iface, debugstr_guid(riid), obj);
949 if (IsEqualIID(riid, &IID_IRtwqAsyncResult) ||
950 IsEqualIID(riid, &IID_IUnknown))
952 *obj = iface;
953 IRtwqAsyncResult_AddRef(iface);
954 return S_OK;
957 *obj = NULL;
958 WARN("Unsupported interface %s.\n", debugstr_guid(riid));
959 return E_NOINTERFACE;
962 static ULONG WINAPI async_result_AddRef(IRtwqAsyncResult *iface)
964 struct async_result *result = impl_from_IRtwqAsyncResult(iface);
965 ULONG refcount = InterlockedIncrement(&result->refcount);
967 TRACE("%p, %lu.\n", iface, refcount);
969 return refcount;
972 static ULONG WINAPI async_result_Release(IRtwqAsyncResult *iface)
974 struct async_result *result = impl_from_IRtwqAsyncResult(iface);
975 ULONG refcount = InterlockedDecrement(&result->refcount);
977 TRACE("%p, %lu.\n", iface, refcount);
979 if (!refcount)
981 if (result->result.pCallback)
982 IRtwqAsyncCallback_Release(result->result.pCallback);
983 if (result->object)
984 IUnknown_Release(result->object);
985 if (result->state)
986 IUnknown_Release(result->state);
987 if (result->result.hEvent)
988 CloseHandle(result->result.hEvent);
989 free(result);
991 RtwqUnlockPlatform();
994 return refcount;
997 static HRESULT WINAPI async_result_GetState(IRtwqAsyncResult *iface, IUnknown **state)
999 struct async_result *result = impl_from_IRtwqAsyncResult(iface);
1001 TRACE("%p, %p.\n", iface, state);
1003 if (!result->state)
1004 return E_POINTER;
1006 *state = result->state;
1007 IUnknown_AddRef(*state);
1009 return S_OK;
1012 static HRESULT WINAPI async_result_GetStatus(IRtwqAsyncResult *iface)
1014 struct async_result *result = impl_from_IRtwqAsyncResult(iface);
1016 TRACE("%p.\n", iface);
1018 return result->result.hrStatusResult;
1021 static HRESULT WINAPI async_result_SetStatus(IRtwqAsyncResult *iface, HRESULT status)
1023 struct async_result *result = impl_from_IRtwqAsyncResult(iface);
1025 TRACE("%p, %#lx.\n", iface, status);
1027 result->result.hrStatusResult = status;
1029 return S_OK;
1032 static HRESULT WINAPI async_result_GetObject(IRtwqAsyncResult *iface, IUnknown **object)
1034 struct async_result *result = impl_from_IRtwqAsyncResult(iface);
1036 TRACE("%p, %p.\n", iface, object);
1038 if (!result->object)
1039 return E_POINTER;
1041 *object = result->object;
1042 IUnknown_AddRef(*object);
1044 return S_OK;
1047 static IUnknown * WINAPI async_result_GetStateNoAddRef(IRtwqAsyncResult *iface)
1049 struct async_result *result = impl_from_IRtwqAsyncResult(iface);
1051 TRACE("%p.\n", iface);
1053 return result->state;
1056 static const IRtwqAsyncResultVtbl async_result_vtbl =
1058 async_result_QueryInterface,
1059 async_result_AddRef,
1060 async_result_Release,
1061 async_result_GetState,
1062 async_result_GetStatus,
1063 async_result_SetStatus,
1064 async_result_GetObject,
1065 async_result_GetStateNoAddRef,
1068 static HRESULT create_async_result(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **out)
1070 struct async_result *result;
1072 if (!out)
1073 return E_INVALIDARG;
1075 if (!(result = calloc(1, sizeof(*result))))
1076 return E_OUTOFMEMORY;
1078 RtwqLockPlatform();
1080 result->result.AsyncResult.lpVtbl = &async_result_vtbl;
1081 result->refcount = 1;
1082 result->object = object;
1083 if (result->object)
1084 IUnknown_AddRef(result->object);
1085 result->result.pCallback = callback;
1086 if (result->result.pCallback)
1087 IRtwqAsyncCallback_AddRef(result->result.pCallback);
1088 result->state = state;
1089 if (result->state)
1090 IUnknown_AddRef(result->state);
1092 *out = &result->result.AsyncResult;
1094 TRACE("Created async result object %p.\n", *out);
1096 return S_OK;
1099 HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state,
1100 IRtwqAsyncResult **out)
1102 TRACE("%p, %p, %p, %p.\n", object, callback, state, out);
1104 return create_async_result(object, callback, state, out);
1107 HRESULT WINAPI RtwqLockPlatform(void)
1109 InterlockedIncrement(&platform_lock);
1111 return S_OK;
1114 HRESULT WINAPI RtwqUnlockPlatform(void)
1116 InterlockedDecrement(&platform_lock);
1118 return S_OK;
1121 static void init_system_queues(void)
1123 struct queue_desc desc;
1124 HRESULT hr;
1126 /* Always initialize standard queue, keep the rest lazy. */
1128 EnterCriticalSection(&queues_section);
1130 if (system_queues[SYS_QUEUE_STANDARD].pool)
1132 LeaveCriticalSection(&queues_section);
1133 return;
1136 if (FAILED(hr = CoIncrementMTAUsage(&mta_cookie)))
1137 WARN("Failed to initialize MTA, hr %#lx.\n", hr);
1139 desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
1140 desc.ops = &pool_queue_ops;
1141 desc.target_queue = 0;
1142 init_work_queue(&desc, &system_queues[SYS_QUEUE_STANDARD]);
1144 LeaveCriticalSection(&queues_section);
1147 HRESULT WINAPI RtwqStartup(void)
1149 if (InterlockedIncrement(&platform_lock) == 1)
1151 init_system_queues();
1154 return S_OK;
1157 static void shutdown_system_queues(void)
1159 unsigned int i;
1160 HRESULT hr;
1162 EnterCriticalSection(&queues_section);
1164 for (i = 0; i < ARRAY_SIZE(system_queues); ++i)
1166 shutdown_queue(&system_queues[i]);
1169 if (FAILED(hr = CoDecrementMTAUsage(mta_cookie)))
1170 WARN("Failed to uninitialize MTA, hr %#lx.\n", hr);
1172 LeaveCriticalSection(&queues_section);
1175 HRESULT WINAPI RtwqShutdown(void)
1177 if (platform_lock <= 0)
1178 return S_OK;
1180 if (InterlockedExchangeAdd(&platform_lock, -1) == 1)
1182 shutdown_system_queues();
1185 return S_OK;
1188 HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key)
1190 struct queue *queue;
1191 HRESULT hr;
1193 TRACE("%p, %ld, %p, %p.\n", event, priority, result, key);
1195 if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
1196 return hr;
1198 hr = queue_submit_wait(queue, event, priority, result, key);
1200 return hr;
1203 static HRESULT schedule_work_item(IRtwqAsyncResult *result, INT64 timeout, RTWQWORKITEM_KEY *key)
1205 struct queue *queue;
1206 HRESULT hr;
1208 if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
1209 return hr;
1211 TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key);
1213 return queue_submit_timer(queue, result, timeout, 0, key);
1216 HRESULT WINAPI RtwqScheduleWorkItem(IRtwqAsyncResult *result, INT64 timeout, RTWQWORKITEM_KEY *key)
1218 TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key);
1220 return schedule_work_item(result, timeout, key);
1223 struct periodic_callback
1225 IRtwqAsyncCallback IRtwqAsyncCallback_iface;
1226 LONG refcount;
1227 RTWQPERIODICCALLBACK callback;
1230 static struct periodic_callback *impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback *iface)
1232 return CONTAINING_RECORD(iface, struct periodic_callback, IRtwqAsyncCallback_iface);
1235 static HRESULT WINAPI periodic_callback_QueryInterface(IRtwqAsyncCallback *iface, REFIID riid, void **obj)
1237 if (IsEqualIID(riid, &IID_IRtwqAsyncCallback) ||
1238 IsEqualIID(riid, &IID_IUnknown))
1240 *obj = iface;
1241 IRtwqAsyncCallback_AddRef(iface);
1242 return S_OK;
1245 *obj = NULL;
1246 return E_NOINTERFACE;
1249 static ULONG WINAPI periodic_callback_AddRef(IRtwqAsyncCallback *iface)
1251 struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface);
1252 ULONG refcount = InterlockedIncrement(&callback->refcount);
1254 TRACE("%p, %lu.\n", iface, refcount);
1256 return refcount;
1259 static ULONG WINAPI periodic_callback_Release(IRtwqAsyncCallback *iface)
1261 struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface);
1262 ULONG refcount = InterlockedDecrement(&callback->refcount);
1264 TRACE("%p, %lu.\n", iface, refcount);
1266 if (!refcount)
1267 free(callback);
1269 return refcount;
1272 static HRESULT WINAPI periodic_callback_GetParameters(IRtwqAsyncCallback *iface, DWORD *flags, DWORD *queue)
1274 return E_NOTIMPL;
1277 static HRESULT WINAPI periodic_callback_Invoke(IRtwqAsyncCallback *iface, IRtwqAsyncResult *result)
1279 struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface);
1280 IUnknown *context = NULL;
1282 if (FAILED(IRtwqAsyncResult_GetObject(result, &context)))
1283 WARN("Expected object to be set for result object.\n");
1285 callback->callback(context);
1287 if (context)
1288 IUnknown_Release(context);
1290 return S_OK;
1293 static const IRtwqAsyncCallbackVtbl periodic_callback_vtbl =
1295 periodic_callback_QueryInterface,
1296 periodic_callback_AddRef,
1297 periodic_callback_Release,
1298 periodic_callback_GetParameters,
1299 periodic_callback_Invoke,
1302 static HRESULT create_periodic_callback_obj(RTWQPERIODICCALLBACK callback, IRtwqAsyncCallback **out)
1304 struct periodic_callback *object;
1306 if (!(object = calloc(1, sizeof(*object))))
1307 return E_OUTOFMEMORY;
1309 object->IRtwqAsyncCallback_iface.lpVtbl = &periodic_callback_vtbl;
1310 object->refcount = 1;
1311 object->callback = callback;
1313 *out = &object->IRtwqAsyncCallback_iface;
1315 return S_OK;
1318 HRESULT WINAPI RtwqAddPeriodicCallback(RTWQPERIODICCALLBACK callback, IUnknown *context, DWORD *key)
1320 IRtwqAsyncCallback *periodic_callback;
1321 RTWQWORKITEM_KEY workitem_key;
1322 IRtwqAsyncResult *result;
1323 struct queue *queue;
1324 HRESULT hr;
1326 TRACE("%p, %p, %p.\n", callback, context, key);
1328 if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
1329 return hr;
1331 if (FAILED(hr = create_periodic_callback_obj(callback, &periodic_callback)))
1332 return hr;
1334 hr = create_async_result(context, periodic_callback, NULL, &result);
1335 IRtwqAsyncCallback_Release(periodic_callback);
1336 if (FAILED(hr))
1337 return hr;
1339 /* Same period MFGetTimerPeriodicity() returns. */
1340 hr = queue_submit_timer(queue, result, 0, 10, key ? &workitem_key : NULL);
1342 IRtwqAsyncResult_Release(result);
1344 if (key)
1345 *key = workitem_key;
1347 return S_OK;
1350 HRESULT WINAPI RtwqRemovePeriodicCallback(DWORD key)
1352 struct queue *queue;
1353 HRESULT hr;
1355 TRACE("%#lx.\n", key);
1357 if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
1358 return hr;
1360 return queue_cancel_item(queue, get_item_key(SCHEDULED_ITEM_KEY_MASK, key));
1363 HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key)
1365 struct queue *queue;
1366 HRESULT hr;
1368 TRACE("%s.\n", wine_dbgstr_longlong(key));
1370 if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
1371 return hr;
1373 return queue_cancel_item(queue, key);
1376 HRESULT WINAPI RtwqInvokeCallback(IRtwqAsyncResult *result)
1378 TRACE("%p.\n", result);
1380 return invoke_async_callback(result);
1383 HRESULT WINAPI RtwqPutWorkItem(DWORD queue, LONG priority, IRtwqAsyncResult *result)
1385 TRACE("%#lx, %ld, %p.\n", queue, priority, result);
1387 return queue_put_work_item(queue, priority, result);
1390 HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queue)
1392 struct queue_desc desc;
1394 TRACE("%d, %p.\n", queue_type, queue);
1396 desc.queue_type = queue_type;
1397 desc.ops = &pool_queue_ops;
1398 desc.target_queue = 0;
1399 return alloc_user_queue(&desc, queue);
1402 HRESULT WINAPI RtwqLockWorkQueue(DWORD queue)
1404 TRACE("%#lx.\n", queue);
1406 return lock_user_queue(queue);
1409 HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue)
1411 TRACE("%#lx.\n", queue);
1413 return unlock_user_queue(queue);
1416 HRESULT WINAPI RtwqSetLongRunning(DWORD queue_id, BOOL enable)
1418 struct queue *queue;
1419 HRESULT hr;
1420 int i;
1422 TRACE("%#lx, %d.\n", queue_id, enable);
1424 lock_user_queue(queue_id);
1426 if (SUCCEEDED(hr = grab_queue(queue_id, &queue)))
1428 for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
1429 queue->envs[i].u.s.LongFunction = !!enable;
1432 unlock_user_queue(queue_id);
1434 return hr;
1437 HRESULT WINAPI RtwqLockSharedWorkQueue(const WCHAR *usageclass, LONG priority, DWORD *taskid, DWORD *queue)
1439 struct queue_desc desc;
1440 HRESULT hr;
1442 TRACE("%s, %ld, %p, %p.\n", debugstr_w(usageclass), priority, taskid, queue);
1444 if (!usageclass)
1445 return E_POINTER;
1447 if (!*usageclass && taskid)
1448 return E_INVALIDARG;
1450 if (*usageclass)
1451 FIXME("Class name is ignored.\n");
1453 EnterCriticalSection(&queues_section);
1455 if (shared_mt_queue)
1456 hr = lock_user_queue(shared_mt_queue);
1457 else
1459 desc.queue_type = RTWQ_MULTITHREADED_WORKQUEUE;
1460 desc.ops = &pool_queue_ops;
1461 desc.target_queue = 0;
1462 hr = alloc_user_queue(&desc, &shared_mt_queue);
1465 *queue = shared_mt_queue;
1467 LeaveCriticalSection(&queues_section);
1469 return hr;
1472 HRESULT WINAPI RtwqSetDeadline(DWORD queue_id, LONGLONG deadline, HANDLE *request)
1474 FIXME("%#lx, %s, %p.\n", queue_id, wine_dbgstr_longlong(deadline), request);
1476 return E_NOTIMPL;
1479 HRESULT WINAPI RtwqSetDeadline2(DWORD queue_id, LONGLONG deadline, LONGLONG predeadline, HANDLE *request)
1481 FIXME("%#lx, %s, %s, %p.\n", queue_id, wine_dbgstr_longlong(deadline), wine_dbgstr_longlong(predeadline), request);
1483 return E_NOTIMPL;
1486 HRESULT WINAPI RtwqCancelDeadline(HANDLE request)
1488 FIXME("%p.\n", request);
1490 return E_NOTIMPL;
1493 HRESULT WINAPI RtwqAllocateSerialWorkQueue(DWORD target_queue, DWORD *queue)
1495 struct queue_desc desc;
1497 TRACE("%#lx, %p.\n", target_queue, queue);
1499 desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
1500 desc.ops = &serial_queue_ops;
1501 desc.target_queue = target_queue;
1502 return alloc_user_queue(&desc, queue);
1505 HRESULT WINAPI RtwqJoinWorkQueue(DWORD queue, HANDLE hFile, HANDLE *cookie)
1507 FIXME("%#lx, %p, %p.\n", queue, hFile, cookie);
1509 return E_NOTIMPL;
1512 HRESULT WINAPI RtwqUnjoinWorkQueue(DWORD queue, HANDLE cookie)
1514 FIXME("%#lx, %p.\n", queue, cookie);
1516 return E_NOTIMPL;
1519 HRESULT WINAPI RtwqGetWorkQueueMMCSSClass(DWORD queue, WCHAR *class, DWORD *length)
1521 FIXME("%#lx, %p, %p.\n", queue, class, length);
1523 return E_NOTIMPL;
1526 HRESULT WINAPI RtwqGetWorkQueueMMCSSTaskId(DWORD queue, DWORD *taskid)
1528 FIXME("%#lx, %p.\n", queue, taskid);
1530 return E_NOTIMPL;
1533 HRESULT WINAPI RtwqGetWorkQueueMMCSSPriority(DWORD queue, LONG *priority)
1535 FIXME("%#lx, %p.\n", queue, priority);
1537 return E_NOTIMPL;
1540 HRESULT WINAPI RtwqRegisterPlatformWithMMCSS(const WCHAR *class, DWORD *taskid, LONG priority)
1542 FIXME("%s, %p, %ld.\n", debugstr_w(class), taskid, priority);
1544 return E_NOTIMPL;
1547 HRESULT WINAPI RtwqUnregisterPlatformFromMMCSS(void)
1549 FIXME("\n");
1551 return E_NOTIMPL;
1554 HRESULT WINAPI RtwqBeginRegisterWorkQueueWithMMCSS(DWORD queue, const WCHAR *class, DWORD taskid, LONG priority,
1555 IRtwqAsyncCallback *callback, IUnknown *state)
1557 FIXME("%#lx, %s, %lu, %ld, %p, %p.\n", queue, debugstr_w(class), taskid, priority, callback, state);
1559 return E_NOTIMPL;
1562 HRESULT WINAPI RtwqEndRegisterWorkQueueWithMMCSS(IRtwqAsyncResult *result, DWORD *taskid)
1564 FIXME("%p, %p.\n", result, taskid);
1566 return E_NOTIMPL;
1569 HRESULT WINAPI RtwqBeginUnregisterWorkQueueWithMMCSS(DWORD queue, IRtwqAsyncCallback *callback, IUnknown *state)
1571 FIXME("%#lx, %p, %p.\n", queue, callback, state);
1573 return E_NOTIMPL;
1576 HRESULT WINAPI RtwqEndUnregisterWorkQueueWithMMCSS(IRtwqAsyncResult *result)
1578 FIXME("%p.\n", result);
1580 return E_NOTIMPL;
1583 HRESULT WINAPI RtwqRegisterPlatformEvents(IRtwqPlatformEvents *events)
1585 FIXME("%p.\n", events);
1587 return E_NOTIMPL;
1590 HRESULT WINAPI RtwqUnregisterPlatformEvents(IRtwqPlatformEvents *events)
1592 FIXME("%p.\n", events);
1594 return E_NOTIMPL;