2 * Copyright 2019-2020 Nikolay Sivov for CodeWeavers
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 #define NONAMELESSUNION
26 #include "wine/debug.h"
27 #include "wine/heap.h"
28 #include "wine/list.h"
30 WINE_DEFAULT_DEBUG_CHANNEL(mfplat
);
32 #define FIRST_USER_QUEUE_HANDLE 5
33 #define MAX_USER_QUEUE_HANDLES 124
35 #define WAIT_ITEM_KEY_MASK (0x82000000)
36 #define SCHEDULED_ITEM_KEY_MASK (0x80000000)
38 static LONG next_item_key
;
40 static RTWQWORKITEM_KEY
get_item_key(DWORD mask
, DWORD key
)
42 return ((RTWQWORKITEM_KEY
)mask
<< 32) | key
;
45 static RTWQWORKITEM_KEY
generate_item_key(DWORD mask
)
47 return get_item_key(mask
, InterlockedIncrement(&next_item_key
));
57 static struct queue_handle user_queues
[MAX_USER_QUEUE_HANDLES
];
58 static struct queue_handle
*next_free_user_queue
;
59 static struct queue_handle
*next_unused_user_queue
= user_queues
;
60 static WORD queue_generation
;
62 static CRITICAL_SECTION queues_section
;
63 static CRITICAL_SECTION_DEBUG queues_critsect_debug
=
65 0, 0, &queues_section
,
66 { &queues_critsect_debug
.ProcessLocksList
, &queues_critsect_debug
.ProcessLocksList
},
67 0, 0, { (DWORD_PTR
)(__FILE__
": queues_section") }
69 static CRITICAL_SECTION queues_section
= { &queues_critsect_debug
, -1, 0, 0, 0, 0 };
71 static LONG platform_lock
;
72 static CO_MTA_USAGE_COOKIE mta_cookie
;
74 static struct queue_handle
*get_queue_obj(DWORD handle
)
76 unsigned int idx
= HIWORD(handle
) - FIRST_USER_QUEUE_HANDLE
;
78 if (idx
< MAX_USER_QUEUE_HANDLES
&& user_queues
[idx
].refcount
)
80 if (LOWORD(handle
) == user_queues
[idx
].generation
)
81 return &user_queues
[idx
];
87 /* Should be kept in sync with corresponding MFASYNC_CALLBACK_ constants. */
88 enum rtwq_callback_queue_id
90 RTWQ_CALLBACK_QUEUE_UNDEFINED
= 0x00000000,
91 RTWQ_CALLBACK_QUEUE_STANDARD
= 0x00000001,
92 RTWQ_CALLBACK_QUEUE_RT
= 0x00000002,
93 RTWQ_CALLBACK_QUEUE_IO
= 0x00000003,
94 RTWQ_CALLBACK_QUEUE_TIMER
= 0x00000004,
95 RTWQ_CALLBACK_QUEUE_MULTITHREADED
= 0x00000005,
96 RTWQ_CALLBACK_QUEUE_LONG_FUNCTION
= 0x00000007,
97 RTWQ_CALLBACK_QUEUE_PRIVATE_MASK
= 0xffff0000,
98 RTWQ_CALLBACK_QUEUE_ALL
= 0xffffffff,
101 /* Should be kept in sync with corresponding MFASYNC_ constants. */
102 enum rtwq_callback_flags
104 RTWQ_FAST_IO_PROCESSING_CALLBACK
= 0x00000001,
105 RTWQ_SIGNAL_CALLBACK
= 0x00000002,
106 RTWQ_BLOCKING_CALLBACK
= 0x00000004,
107 RTWQ_REPLY_CALLBACK
= 0x00000008,
108 RTWQ_LOCALIZE_REMOTE_CALLBACK
= 0x00000010,
111 enum system_queue_index
113 SYS_QUEUE_STANDARD
= 0,
117 SYS_QUEUE_MULTITHREADED
,
118 SYS_QUEUE_DO_NOT_USE
,
119 SYS_QUEUE_LONG_FUNCTION
,
125 IUnknown IUnknown_iface
;
128 IRtwqAsyncResult
*result
;
129 IRtwqAsyncResult
*reply_result
;
131 RTWQWORKITEM_KEY key
;
134 PTP_SIMPLE_CALLBACK finalization_callback
;
137 TP_WAIT
*wait_object
;
138 TP_TIMER
*timer_object
;
142 static struct work_item
*work_item_impl_from_IUnknown(IUnknown
*iface
)
144 return CONTAINING_RECORD(iface
, struct work_item
, IUnknown_iface
);
147 static const TP_CALLBACK_PRIORITY priorities
[] =
149 TP_CALLBACK_PRIORITY_HIGH
,
150 TP_CALLBACK_PRIORITY_NORMAL
,
151 TP_CALLBACK_PRIORITY_LOW
,
159 HRESULT (*init
)(const struct queue_desc
*desc
, struct queue
*queue
);
160 BOOL (*shutdown
)(struct queue
*queue
);
161 void (*submit
)(struct queue
*queue
, struct work_item
*item
);
166 RTWQ_WORKQUEUE_TYPE queue_type
;
167 const struct queue_ops
*ops
;
173 IRtwqAsyncCallback IRtwqAsyncCallback_iface
;
174 const struct queue_ops
*ops
;
176 TP_CALLBACK_ENVIRON_V3 envs
[ARRAY_SIZE(priorities
)];
178 struct list pending_items
;
180 /* Data used for serial queues only. */
181 PTP_SIMPLE_CALLBACK finalization_callback
;
185 static void shutdown_queue(struct queue
*queue
);
187 static HRESULT
lock_user_queue(DWORD queue
)
189 HRESULT hr
= RTWQ_E_INVALID_WORKQUEUE
;
190 struct queue_handle
*entry
;
192 if (!(queue
& RTWQ_CALLBACK_QUEUE_PRIVATE_MASK
))
195 EnterCriticalSection(&queues_section
);
196 entry
= get_queue_obj(queue
);
197 if (entry
&& entry
->refcount
)
202 LeaveCriticalSection(&queues_section
);
206 static HRESULT
unlock_user_queue(DWORD queue
)
208 HRESULT hr
= RTWQ_E_INVALID_WORKQUEUE
;
209 struct queue_handle
*entry
;
211 if (!(queue
& RTWQ_CALLBACK_QUEUE_PRIVATE_MASK
))
214 EnterCriticalSection(&queues_section
);
215 entry
= get_queue_obj(queue
);
216 if (entry
&& entry
->refcount
)
218 if (--entry
->refcount
== 0)
220 shutdown_queue((struct queue
*)entry
->obj
);
221 heap_free(entry
->obj
);
222 entry
->obj
= next_free_user_queue
;
223 next_free_user_queue
= entry
;
227 LeaveCriticalSection(&queues_section
);
231 static struct queue
*queue_impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback
*iface
)
233 return CONTAINING_RECORD(iface
, struct queue
, IRtwqAsyncCallback_iface
);
236 static HRESULT WINAPI
queue_serial_callback_QueryInterface(IRtwqAsyncCallback
*iface
, REFIID riid
, void **obj
)
238 if (IsEqualIID(riid
, &IID_IRtwqAsyncCallback
) ||
239 IsEqualIID(riid
, &IID_IUnknown
))
242 IRtwqAsyncCallback_AddRef(iface
);
247 return E_NOINTERFACE
;
250 static ULONG WINAPI
queue_serial_callback_AddRef(IRtwqAsyncCallback
*iface
)
255 static ULONG WINAPI
queue_serial_callback_Release(IRtwqAsyncCallback
*iface
)
260 static HRESULT WINAPI
queue_serial_callback_GetParameters(IRtwqAsyncCallback
*iface
, DWORD
*flags
, DWORD
*queue_id
)
262 struct queue
*queue
= queue_impl_from_IRtwqAsyncCallback(iface
);
265 *queue_id
= queue
->id
;
270 static HRESULT WINAPI
queue_serial_callback_Invoke(IRtwqAsyncCallback
*iface
, IRtwqAsyncResult
*result
)
272 /* Reply callback won't be called in a regular way, pending items and chained queues will make it
273 unnecessary complicated to reach actual work queue that's able to execute this item. Instead
274 serial queues are cleaned up right away on submit(). */
278 static const IRtwqAsyncCallbackVtbl queue_serial_callback_vtbl
=
280 queue_serial_callback_QueryInterface
,
281 queue_serial_callback_AddRef
,
282 queue_serial_callback_Release
,
283 queue_serial_callback_GetParameters
,
284 queue_serial_callback_Invoke
,
287 static struct queue system_queues
[SYS_QUEUE_COUNT
];
289 static struct queue
*get_system_queue(DWORD queue_id
)
293 case RTWQ_CALLBACK_QUEUE_STANDARD
:
294 case RTWQ_CALLBACK_QUEUE_RT
:
295 case RTWQ_CALLBACK_QUEUE_IO
:
296 case RTWQ_CALLBACK_QUEUE_TIMER
:
297 case RTWQ_CALLBACK_QUEUE_MULTITHREADED
:
298 case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION
:
299 return &system_queues
[queue_id
- 1];
305 static HRESULT
grab_queue(DWORD queue_id
, struct queue
**ret
);
307 static void CALLBACK
standard_queue_cleanup_callback(void *object_data
, void *group_data
)
311 static HRESULT
pool_queue_init(const struct queue_desc
*desc
, struct queue
*queue
)
313 TP_CALLBACK_ENVIRON_V3 env
;
314 unsigned int max_thread
, i
;
316 queue
->pool
= CreateThreadpool(NULL
);
318 memset(&env
, 0, sizeof(env
));
320 env
.Size
= sizeof(env
);
321 env
.Pool
= queue
->pool
;
322 env
.CleanupGroup
= CreateThreadpoolCleanupGroup();
323 env
.CleanupGroupCancelCallback
= standard_queue_cleanup_callback
;
324 env
.CallbackPriority
= TP_CALLBACK_PRIORITY_NORMAL
;
325 for (i
= 0; i
< ARRAY_SIZE(queue
->envs
); ++i
)
327 queue
->envs
[i
] = env
;
328 queue
->envs
[i
].CallbackPriority
= priorities
[i
];
330 list_init(&queue
->pending_items
);
331 InitializeCriticalSection(&queue
->cs
);
333 max_thread
= (desc
->queue_type
== RTWQ_STANDARD_WORKQUEUE
|| desc
->queue_type
== RTWQ_WINDOW_WORKQUEUE
) ? 1 : 4;
335 SetThreadpoolThreadMinimum(queue
->pool
, 1);
336 SetThreadpoolThreadMaximum(queue
->pool
, max_thread
);
338 if (desc
->queue_type
== RTWQ_WINDOW_WORKQUEUE
)
339 FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
344 static BOOL
pool_queue_shutdown(struct queue
*queue
)
349 CloseThreadpoolCleanupGroupMembers(queue
->envs
[0].CleanupGroup
, TRUE
, NULL
);
350 CloseThreadpool(queue
->pool
);
356 static void CALLBACK
standard_queue_worker(TP_CALLBACK_INSTANCE
*instance
, void *context
, TP_WORK
*work
)
358 struct work_item
*item
= context
;
359 RTWQASYNCRESULT
*result
= (RTWQASYNCRESULT
*)item
->result
;
361 TRACE("result object %p.\n", result
);
363 /* Submitting from serial queue in reply mode, use different result object acting as receipt token.
364 It's submitted to user callback still, but when invoked, special serial queue callback will be used
365 to ensure correct destination queue. */
367 IRtwqAsyncCallback_Invoke(result
->pCallback
, item
->reply_result
? item
->reply_result
: item
->result
);
369 IUnknown_Release(&item
->IUnknown_iface
);
372 static void pool_queue_submit(struct queue
*queue
, struct work_item
*item
)
374 TP_CALLBACK_PRIORITY callback_priority
;
375 TP_CALLBACK_ENVIRON_V3 env
;
376 TP_WORK
*work_object
;
378 if (item
->priority
== 0)
379 callback_priority
= TP_CALLBACK_PRIORITY_NORMAL
;
380 else if (item
->priority
< 0)
381 callback_priority
= TP_CALLBACK_PRIORITY_LOW
;
383 callback_priority
= TP_CALLBACK_PRIORITY_HIGH
;
385 env
= queue
->envs
[callback_priority
];
386 env
.FinalizationCallback
= item
->finalization_callback
;
387 /* Worker pool callback will release one reference. Grab one more to keep object alive when
388 we need finalization callback. */
389 if (item
->finalization_callback
)
390 IUnknown_AddRef(&item
->IUnknown_iface
);
391 work_object
= CreateThreadpoolWork(standard_queue_worker
, item
, (TP_CALLBACK_ENVIRON
*)&env
);
392 SubmitThreadpoolWork(work_object
);
394 TRACE("dispatched %p.\n", item
->result
);
397 static const struct queue_ops pool_queue_ops
=
404 static struct work_item
* serial_queue_get_next(struct queue
*queue
, struct work_item
*item
)
406 struct work_item
*next_item
= NULL
;
408 list_remove(&item
->entry
);
409 if (!list_empty(&item
->queue
->pending_items
))
410 next_item
= LIST_ENTRY(list_head(&item
->queue
->pending_items
), struct work_item
, entry
);
415 static void CALLBACK
serial_queue_finalization_callback(PTP_CALLBACK_INSTANCE instance
, void *user_data
)
417 struct work_item
*item
= (struct work_item
*)user_data
, *next_item
;
418 struct queue
*target_queue
, *queue
= item
->queue
;
421 EnterCriticalSection(&queue
->cs
);
423 if ((next_item
= serial_queue_get_next(queue
, item
)))
425 if (SUCCEEDED(hr
= grab_queue(queue
->target_queue
, &target_queue
)))
426 target_queue
->ops
->submit(target_queue
, next_item
);
428 WARN("Failed to grab queue for id %#x, hr %#x.\n", queue
->target_queue
, hr
);
431 LeaveCriticalSection(&queue
->cs
);
433 IUnknown_Release(&item
->IUnknown_iface
);
436 static HRESULT
serial_queue_init(const struct queue_desc
*desc
, struct queue
*queue
)
438 queue
->IRtwqAsyncCallback_iface
.lpVtbl
= &queue_serial_callback_vtbl
;
439 queue
->target_queue
= desc
->target_queue
;
440 lock_user_queue(queue
->target_queue
);
441 queue
->finalization_callback
= serial_queue_finalization_callback
;
446 static BOOL
serial_queue_shutdown(struct queue
*queue
)
448 unlock_user_queue(queue
->target_queue
);
453 static struct work_item
* serial_queue_is_ack_token(struct queue
*queue
, struct work_item
*item
)
455 RTWQASYNCRESULT
*async_result
= (RTWQASYNCRESULT
*)item
->result
;
456 struct work_item
*head
;
458 if (list_empty(&queue
->pending_items
))
461 head
= LIST_ENTRY(list_head(&queue
->pending_items
), struct work_item
, entry
);
462 if (head
->reply_result
== item
->result
&& async_result
->pCallback
== &queue
->IRtwqAsyncCallback_iface
)
468 static void serial_queue_submit(struct queue
*queue
, struct work_item
*item
)
470 struct work_item
*head
, *next_item
= NULL
;
471 struct queue
*target_queue
;
474 /* In reply mode queue will advance when 'reply_result' is invoked, in regular mode it will advance automatically,
475 via finalization callback. */
477 if (item
->flags
& RTWQ_REPLY_CALLBACK
)
479 if (FAILED(hr
= RtwqCreateAsyncResult(NULL
, &queue
->IRtwqAsyncCallback_iface
, NULL
, &item
->reply_result
)))
480 WARN("Failed to create reply object, hr %#x.\n", hr
);
483 item
->finalization_callback
= queue
->finalization_callback
;
485 /* Serial queues could be chained together, detach from current queue before transitioning item to this one.
486 Items are not detached when submitted to pool queues, because pool queues won't forward them further. */
487 EnterCriticalSection(&item
->queue
->cs
);
488 list_remove(&item
->entry
);
489 LeaveCriticalSection(&item
->queue
->cs
);
491 EnterCriticalSection(&queue
->cs
);
495 if ((head
= serial_queue_is_ack_token(queue
, item
)))
497 /* Ack receipt token - pop waiting item, advance. */
498 next_item
= serial_queue_get_next(queue
, head
);
499 IUnknown_Release(&head
->IUnknown_iface
);
503 if (list_empty(&queue
->pending_items
))
505 list_add_tail(&queue
->pending_items
, &item
->entry
);
506 IUnknown_AddRef(&item
->IUnknown_iface
);
511 if (SUCCEEDED(hr
= grab_queue(queue
->target_queue
, &target_queue
)))
512 target_queue
->ops
->submit(target_queue
, next_item
);
514 WARN("Failed to grab queue for id %#x, hr %#x.\n", queue
->target_queue
, hr
);
517 LeaveCriticalSection(&queue
->cs
);
520 static const struct queue_ops serial_queue_ops
=
523 serial_queue_shutdown
,
527 static HRESULT WINAPI
work_item_QueryInterface(IUnknown
*iface
, REFIID riid
, void **obj
)
529 if (IsEqualIID(riid
, &IID_IUnknown
))
532 IUnknown_AddRef(iface
);
537 return E_NOINTERFACE
;
540 static ULONG WINAPI
work_item_AddRef(IUnknown
*iface
)
542 struct work_item
*item
= work_item_impl_from_IUnknown(iface
);
543 return InterlockedIncrement(&item
->refcount
);
546 static ULONG WINAPI
work_item_Release(IUnknown
*iface
)
548 struct work_item
*item
= work_item_impl_from_IUnknown(iface
);
549 ULONG refcount
= InterlockedDecrement(&item
->refcount
);
553 if (item
->reply_result
)
554 IRtwqAsyncResult_Release(item
->reply_result
);
555 IRtwqAsyncResult_Release(item
->result
);
562 static const IUnknownVtbl work_item_vtbl
=
564 work_item_QueryInterface
,
569 static struct work_item
* alloc_work_item(struct queue
*queue
, LONG priority
, IRtwqAsyncResult
*result
)
571 RTWQASYNCRESULT
*async_result
= (RTWQASYNCRESULT
*)result
;
572 DWORD flags
= 0, queue_id
= 0;
573 struct work_item
*item
;
575 item
= heap_alloc_zero(sizeof(*item
));
577 item
->IUnknown_iface
.lpVtbl
= &work_item_vtbl
;
578 item
->result
= result
;
579 IRtwqAsyncResult_AddRef(item
->result
);
582 list_init(&item
->entry
);
583 item
->priority
= priority
;
585 if (SUCCEEDED(IRtwqAsyncCallback_GetParameters(async_result
->pCallback
, &flags
, &queue_id
)))
591 static void init_work_queue(const struct queue_desc
*desc
, struct queue
*queue
)
593 assert(desc
->ops
!= NULL
);
595 queue
->ops
= desc
->ops
;
596 if (SUCCEEDED(queue
->ops
->init(desc
, queue
)))
598 list_init(&queue
->pending_items
);
599 InitializeCriticalSection(&queue
->cs
);
603 static HRESULT
grab_queue(DWORD queue_id
, struct queue
**ret
)
605 struct queue
*queue
= get_system_queue(queue_id
);
606 RTWQ_WORKQUEUE_TYPE queue_type
;
607 struct queue_handle
*entry
;
611 if (!system_queues
[SYS_QUEUE_STANDARD
].pool
)
612 return RTWQ_E_SHUTDOWN
;
614 if (queue
&& queue
->pool
)
621 struct queue_desc desc
;
623 EnterCriticalSection(&queues_section
);
626 case RTWQ_CALLBACK_QUEUE_IO
:
627 case RTWQ_CALLBACK_QUEUE_MULTITHREADED
:
628 case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION
:
629 queue_type
= RTWQ_MULTITHREADED_WORKQUEUE
;
632 queue_type
= RTWQ_STANDARD_WORKQUEUE
;
635 desc
.queue_type
= queue_type
;
636 desc
.ops
= &pool_queue_ops
;
637 desc
.target_queue
= 0;
638 init_work_queue(&desc
, queue
);
639 LeaveCriticalSection(&queues_section
);
644 /* Handles user queues. */
645 if ((entry
= get_queue_obj(queue_id
)))
648 return *ret
? S_OK
: RTWQ_E_INVALID_WORKQUEUE
;
651 static void shutdown_queue(struct queue
*queue
)
653 struct work_item
*item
, *item2
;
655 if (!queue
->ops
|| !queue
->ops
->shutdown(queue
))
658 EnterCriticalSection(&queue
->cs
);
659 LIST_FOR_EACH_ENTRY_SAFE(item
, item2
, &queue
->pending_items
, struct work_item
, entry
)
661 list_remove(&item
->entry
);
662 IUnknown_Release(&item
->IUnknown_iface
);
664 LeaveCriticalSection(&queue
->cs
);
666 DeleteCriticalSection(&queue
->cs
);
668 memset(queue
, 0, sizeof(*queue
));
671 static HRESULT
queue_submit_item(struct queue
*queue
, LONG priority
, IRtwqAsyncResult
*result
)
673 struct work_item
*item
;
675 if (!(item
= alloc_work_item(queue
, priority
, result
)))
676 return E_OUTOFMEMORY
;
678 queue
->ops
->submit(queue
, item
);
683 static HRESULT
queue_put_work_item(DWORD queue_id
, LONG priority
, IRtwqAsyncResult
*result
)
688 if (FAILED(hr
= grab_queue(queue_id
, &queue
)))
691 return queue_submit_item(queue
, priority
, result
);
694 static HRESULT
invoke_async_callback(IRtwqAsyncResult
*result
)
696 RTWQASYNCRESULT
*result_data
= (RTWQASYNCRESULT
*)result
;
697 DWORD queue
= RTWQ_CALLBACK_QUEUE_STANDARD
, flags
;
700 if (FAILED(IRtwqAsyncCallback_GetParameters(result_data
->pCallback
, &flags
, &queue
)))
701 queue
= RTWQ_CALLBACK_QUEUE_STANDARD
;
703 if (FAILED(lock_user_queue(queue
)))
704 queue
= RTWQ_CALLBACK_QUEUE_STANDARD
;
706 hr
= queue_put_work_item(queue
, 0, result
);
708 unlock_user_queue(queue
);
713 static void queue_release_pending_item(struct work_item
*item
)
715 EnterCriticalSection(&item
->queue
->cs
);
718 list_remove(&item
->entry
);
720 IUnknown_Release(&item
->IUnknown_iface
);
722 LeaveCriticalSection(&item
->queue
->cs
);
725 static void CALLBACK
waiting_item_callback(TP_CALLBACK_INSTANCE
*instance
, void *context
, TP_WAIT
*wait
,
726 TP_WAIT_RESULT wait_result
)
728 struct work_item
*item
= context
;
730 TRACE("result object %p.\n", item
->result
);
732 invoke_async_callback(item
->result
);
734 IUnknown_Release(&item
->IUnknown_iface
);
737 static void CALLBACK
waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE
*instance
, void *context
, TP_WAIT
*wait
,
738 TP_WAIT_RESULT wait_result
)
740 struct work_item
*item
= context
;
742 TRACE("result object %p.\n", item
->result
);
744 queue_release_pending_item(item
);
746 invoke_async_callback(item
->result
);
748 IUnknown_Release(&item
->IUnknown_iface
);
751 static void CALLBACK
scheduled_item_callback(TP_CALLBACK_INSTANCE
*instance
, void *context
, TP_TIMER
*timer
)
753 struct work_item
*item
= context
;
755 TRACE("result object %p.\n", item
->result
);
757 invoke_async_callback(item
->result
);
759 IUnknown_Release(&item
->IUnknown_iface
);
762 static void CALLBACK
scheduled_item_cancelable_callback(TP_CALLBACK_INSTANCE
*instance
, void *context
, TP_TIMER
*timer
)
764 struct work_item
*item
= context
;
766 TRACE("result object %p.\n", item
->result
);
768 queue_release_pending_item(item
);
770 invoke_async_callback(item
->result
);
772 IUnknown_Release(&item
->IUnknown_iface
);
775 static void CALLBACK
periodic_item_callback(TP_CALLBACK_INSTANCE
*instance
, void *context
, TP_TIMER
*timer
)
777 struct work_item
*item
= context
;
779 IUnknown_AddRef(&item
->IUnknown_iface
);
781 invoke_async_callback(item
->result
);
783 IUnknown_Release(&item
->IUnknown_iface
);
786 static void queue_mark_item_pending(DWORD mask
, struct work_item
*item
, RTWQWORKITEM_KEY
*key
)
788 *key
= generate_item_key(mask
);
791 EnterCriticalSection(&item
->queue
->cs
);
792 list_add_tail(&item
->queue
->pending_items
, &item
->entry
);
793 IUnknown_AddRef(&item
->IUnknown_iface
);
794 LeaveCriticalSection(&item
->queue
->cs
);
797 static HRESULT
queue_submit_wait(struct queue
*queue
, HANDLE event
, LONG priority
, IRtwqAsyncResult
*result
,
798 RTWQWORKITEM_KEY
*key
)
800 PTP_WAIT_CALLBACK callback
;
801 struct work_item
*item
;
803 if (!(item
= alloc_work_item(queue
, priority
, result
)))
804 return E_OUTOFMEMORY
;
808 queue_mark_item_pending(WAIT_ITEM_KEY_MASK
, item
, key
);
809 callback
= waiting_item_cancelable_callback
;
812 callback
= waiting_item_callback
;
814 item
->u
.wait_object
= CreateThreadpoolWait(callback
, item
,
815 (TP_CALLBACK_ENVIRON
*)&queue
->envs
[TP_CALLBACK_PRIORITY_NORMAL
]);
816 SetThreadpoolWait(item
->u
.wait_object
, event
, NULL
);
818 TRACE("dispatched %p.\n", result
);
823 static HRESULT
queue_submit_timer(struct queue
*queue
, IRtwqAsyncResult
*result
, INT64 timeout
, DWORD period
,
824 RTWQWORKITEM_KEY
*key
)
826 PTP_TIMER_CALLBACK callback
;
827 struct work_item
*item
;
831 if (!(item
= alloc_work_item(queue
, 0, result
)))
832 return E_OUTOFMEMORY
;
836 queue_mark_item_pending(SCHEDULED_ITEM_KEY_MASK
, item
, key
);
840 callback
= periodic_item_callback
;
842 callback
= key
? scheduled_item_cancelable_callback
: scheduled_item_callback
;
844 t
.QuadPart
= timeout
* 1000 * 10;
845 filetime
.dwLowDateTime
= t
.u
.LowPart
;
846 filetime
.dwHighDateTime
= t
.u
.HighPart
;
848 item
->u
.timer_object
= CreateThreadpoolTimer(callback
, item
,
849 (TP_CALLBACK_ENVIRON
*)&queue
->envs
[TP_CALLBACK_PRIORITY_NORMAL
]);
850 SetThreadpoolTimer(item
->u
.timer_object
, &filetime
, period
, 0);
852 TRACE("dispatched %p.\n", result
);
857 static HRESULT
queue_cancel_item(struct queue
*queue
, RTWQWORKITEM_KEY key
)
859 HRESULT hr
= RTWQ_E_NOT_FOUND
;
860 struct work_item
*item
;
862 EnterCriticalSection(&queue
->cs
);
863 LIST_FOR_EACH_ENTRY(item
, &queue
->pending_items
, struct work_item
, entry
)
865 if (item
->key
== key
)
868 if ((key
& WAIT_ITEM_KEY_MASK
) == WAIT_ITEM_KEY_MASK
)
870 IRtwqAsyncResult_SetStatus(item
->result
, RTWQ_E_OPERATION_CANCELLED
);
871 invoke_async_callback(item
->result
);
872 CloseThreadpoolWait(item
->u
.wait_object
);
874 else if ((key
& SCHEDULED_ITEM_KEY_MASK
) == SCHEDULED_ITEM_KEY_MASK
)
875 CloseThreadpoolTimer(item
->u
.timer_object
);
877 WARN("Unknown item key mask %#x.\n", (DWORD
)key
);
878 queue_release_pending_item(item
);
883 LeaveCriticalSection(&queue
->cs
);
888 static HRESULT
alloc_user_queue(const struct queue_desc
*desc
, DWORD
*queue_id
)
890 struct queue_handle
*entry
;
894 *queue_id
= RTWQ_CALLBACK_QUEUE_UNDEFINED
;
896 if (platform_lock
<= 0)
897 return RTWQ_E_SHUTDOWN
;
899 queue
= heap_alloc_zero(sizeof(*queue
));
901 return E_OUTOFMEMORY
;
903 init_work_queue(desc
, queue
);
905 EnterCriticalSection(&queues_section
);
907 entry
= next_free_user_queue
;
909 next_free_user_queue
= entry
->obj
;
910 else if (next_unused_user_queue
< user_queues
+ MAX_USER_QUEUE_HANDLES
)
911 entry
= next_unused_user_queue
++;
914 LeaveCriticalSection(&queues_section
);
916 WARN("Out of user queue handles.\n");
917 return E_OUTOFMEMORY
;
922 if (++queue_generation
== 0xffff) queue_generation
= 1;
923 entry
->generation
= queue_generation
;
924 idx
= entry
- user_queues
+ FIRST_USER_QUEUE_HANDLE
;
925 *queue_id
= (idx
<< 16) | entry
->generation
;
927 LeaveCriticalSection(&queues_section
);
934 RTWQASYNCRESULT result
;
940 static struct async_result
*impl_from_IRtwqAsyncResult(IRtwqAsyncResult
*iface
)
942 return CONTAINING_RECORD(iface
, struct async_result
, result
.AsyncResult
);
945 static HRESULT WINAPI
async_result_QueryInterface(IRtwqAsyncResult
*iface
, REFIID riid
, void **obj
)
947 TRACE("%p, %s, %p.\n", iface
, debugstr_guid(riid
), obj
);
949 if (IsEqualIID(riid
, &IID_IRtwqAsyncResult
) ||
950 IsEqualIID(riid
, &IID_IUnknown
))
953 IRtwqAsyncResult_AddRef(iface
);
958 WARN("Unsupported interface %s.\n", debugstr_guid(riid
));
959 return E_NOINTERFACE
;
962 static ULONG WINAPI
async_result_AddRef(IRtwqAsyncResult
*iface
)
964 struct async_result
*result
= impl_from_IRtwqAsyncResult(iface
);
965 ULONG refcount
= InterlockedIncrement(&result
->refcount
);
967 TRACE("%p, %u.\n", iface
, refcount
);
972 static ULONG WINAPI
async_result_Release(IRtwqAsyncResult
*iface
)
974 struct async_result
*result
= impl_from_IRtwqAsyncResult(iface
);
975 ULONG refcount
= InterlockedDecrement(&result
->refcount
);
977 TRACE("%p, %u.\n", iface
, refcount
);
981 if (result
->result
.pCallback
)
982 IRtwqAsyncCallback_Release(result
->result
.pCallback
);
984 IUnknown_Release(result
->object
);
986 IUnknown_Release(result
->state
);
987 if (result
->result
.hEvent
)
988 CloseHandle(result
->result
.hEvent
);
991 RtwqUnlockPlatform();
997 static HRESULT WINAPI
async_result_GetState(IRtwqAsyncResult
*iface
, IUnknown
**state
)
999 struct async_result
*result
= impl_from_IRtwqAsyncResult(iface
);
1001 TRACE("%p, %p.\n", iface
, state
);
1006 *state
= result
->state
;
1007 IUnknown_AddRef(*state
);
1012 static HRESULT WINAPI
async_result_GetStatus(IRtwqAsyncResult
*iface
)
1014 struct async_result
*result
= impl_from_IRtwqAsyncResult(iface
);
1016 TRACE("%p.\n", iface
);
1018 return result
->result
.hrStatusResult
;
1021 static HRESULT WINAPI
async_result_SetStatus(IRtwqAsyncResult
*iface
, HRESULT status
)
1023 struct async_result
*result
= impl_from_IRtwqAsyncResult(iface
);
1025 TRACE("%p, %#x.\n", iface
, status
);
1027 result
->result
.hrStatusResult
= status
;
1032 static HRESULT WINAPI
async_result_GetObject(IRtwqAsyncResult
*iface
, IUnknown
**object
)
1034 struct async_result
*result
= impl_from_IRtwqAsyncResult(iface
);
1036 TRACE("%p, %p.\n", iface
, object
);
1038 if (!result
->object
)
1041 *object
= result
->object
;
1042 IUnknown_AddRef(*object
);
1047 static IUnknown
* WINAPI
async_result_GetStateNoAddRef(IRtwqAsyncResult
*iface
)
1049 struct async_result
*result
= impl_from_IRtwqAsyncResult(iface
);
1051 TRACE("%p.\n", iface
);
1053 return result
->state
;
1056 static const IRtwqAsyncResultVtbl async_result_vtbl
=
1058 async_result_QueryInterface
,
1059 async_result_AddRef
,
1060 async_result_Release
,
1061 async_result_GetState
,
1062 async_result_GetStatus
,
1063 async_result_SetStatus
,
1064 async_result_GetObject
,
1065 async_result_GetStateNoAddRef
,
1068 static HRESULT
create_async_result(IUnknown
*object
, IRtwqAsyncCallback
*callback
, IUnknown
*state
, IRtwqAsyncResult
**out
)
1070 struct async_result
*result
;
1073 return E_INVALIDARG
;
1075 result
= heap_alloc_zero(sizeof(*result
));
1077 return E_OUTOFMEMORY
;
1081 result
->result
.AsyncResult
.lpVtbl
= &async_result_vtbl
;
1082 result
->refcount
= 1;
1083 result
->object
= object
;
1085 IUnknown_AddRef(result
->object
);
1086 result
->result
.pCallback
= callback
;
1087 if (result
->result
.pCallback
)
1088 IRtwqAsyncCallback_AddRef(result
->result
.pCallback
);
1089 result
->state
= state
;
1091 IUnknown_AddRef(result
->state
);
1093 *out
= &result
->result
.AsyncResult
;
1095 TRACE("Created async result object %p.\n", *out
);
1100 HRESULT WINAPI
RtwqCreateAsyncResult(IUnknown
*object
, IRtwqAsyncCallback
*callback
, IUnknown
*state
,
1101 IRtwqAsyncResult
**out
)
1103 TRACE("%p, %p, %p, %p.\n", object
, callback
, state
, out
);
1105 return create_async_result(object
, callback
, state
, out
);
1108 HRESULT WINAPI
RtwqLockPlatform(void)
1110 InterlockedIncrement(&platform_lock
);
1115 HRESULT WINAPI
RtwqUnlockPlatform(void)
1117 InterlockedDecrement(&platform_lock
);
1122 static void init_system_queues(void)
1124 struct queue_desc desc
;
1127 /* Always initialize standard queue, keep the rest lazy. */
1129 EnterCriticalSection(&queues_section
);
1131 if (system_queues
[SYS_QUEUE_STANDARD
].pool
)
1133 LeaveCriticalSection(&queues_section
);
1137 if (FAILED(hr
= CoIncrementMTAUsage(&mta_cookie
)))
1138 WARN("Failed to initialize MTA, hr %#x.\n", hr
);
1140 desc
.queue_type
= RTWQ_STANDARD_WORKQUEUE
;
1141 desc
.ops
= &pool_queue_ops
;
1142 desc
.target_queue
= 0;
1143 init_work_queue(&desc
, &system_queues
[SYS_QUEUE_STANDARD
]);
1145 LeaveCriticalSection(&queues_section
);
1148 HRESULT WINAPI
RtwqStartup(void)
1150 if (InterlockedIncrement(&platform_lock
) == 1)
1152 init_system_queues();
1158 static void shutdown_system_queues(void)
1163 EnterCriticalSection(&queues_section
);
1165 for (i
= 0; i
< ARRAY_SIZE(system_queues
); ++i
)
1167 shutdown_queue(&system_queues
[i
]);
1170 if (FAILED(hr
= CoDecrementMTAUsage(mta_cookie
)))
1171 WARN("Failed to uninitialize MTA, hr %#x.\n", hr
);
1173 LeaveCriticalSection(&queues_section
);
1176 HRESULT WINAPI
RtwqShutdown(void)
1178 if (platform_lock
<= 0)
1181 if (InterlockedExchangeAdd(&platform_lock
, -1) == 1)
1183 shutdown_system_queues();
1189 HRESULT WINAPI
RtwqPutWaitingWorkItem(HANDLE event
, LONG priority
, IRtwqAsyncResult
*result
, RTWQWORKITEM_KEY
*key
)
1191 struct queue
*queue
;
1194 TRACE("%p, %d, %p, %p.\n", event
, priority
, result
, key
);
1196 if (FAILED(hr
= grab_queue(RTWQ_CALLBACK_QUEUE_TIMER
, &queue
)))
1199 hr
= queue_submit_wait(queue
, event
, priority
, result
, key
);
1204 static HRESULT
schedule_work_item(IRtwqAsyncResult
*result
, INT64 timeout
, RTWQWORKITEM_KEY
*key
)
1206 struct queue
*queue
;
1209 if (FAILED(hr
= grab_queue(RTWQ_CALLBACK_QUEUE_TIMER
, &queue
)))
1212 TRACE("%p, %s, %p.\n", result
, wine_dbgstr_longlong(timeout
), key
);
1214 return queue_submit_timer(queue
, result
, timeout
, 0, key
);
1217 HRESULT WINAPI
RtwqScheduleWorkItem(IRtwqAsyncResult
*result
, INT64 timeout
, RTWQWORKITEM_KEY
*key
)
1219 TRACE("%p, %s, %p.\n", result
, wine_dbgstr_longlong(timeout
), key
);
1221 return schedule_work_item(result
, timeout
, key
);
1224 struct periodic_callback
1226 IRtwqAsyncCallback IRtwqAsyncCallback_iface
;
1228 RTWQPERIODICCALLBACK callback
;
1231 static struct periodic_callback
*impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback
*iface
)
1233 return CONTAINING_RECORD(iface
, struct periodic_callback
, IRtwqAsyncCallback_iface
);
1236 static HRESULT WINAPI
periodic_callback_QueryInterface(IRtwqAsyncCallback
*iface
, REFIID riid
, void **obj
)
1238 if (IsEqualIID(riid
, &IID_IRtwqAsyncCallback
) ||
1239 IsEqualIID(riid
, &IID_IUnknown
))
1242 IRtwqAsyncCallback_AddRef(iface
);
1247 return E_NOINTERFACE
;
1250 static ULONG WINAPI
periodic_callback_AddRef(IRtwqAsyncCallback
*iface
)
1252 struct periodic_callback
*callback
= impl_from_IRtwqAsyncCallback(iface
);
1253 ULONG refcount
= InterlockedIncrement(&callback
->refcount
);
1255 TRACE("%p, %u.\n", iface
, refcount
);
1260 static ULONG WINAPI
periodic_callback_Release(IRtwqAsyncCallback
*iface
)
1262 struct periodic_callback
*callback
= impl_from_IRtwqAsyncCallback(iface
);
1263 ULONG refcount
= InterlockedDecrement(&callback
->refcount
);
1265 TRACE("%p, %u.\n", iface
, refcount
);
1268 heap_free(callback
);
1273 static HRESULT WINAPI
periodic_callback_GetParameters(IRtwqAsyncCallback
*iface
, DWORD
*flags
, DWORD
*queue
)
1278 static HRESULT WINAPI
periodic_callback_Invoke(IRtwqAsyncCallback
*iface
, IRtwqAsyncResult
*result
)
1280 struct periodic_callback
*callback
= impl_from_IRtwqAsyncCallback(iface
);
1281 IUnknown
*context
= NULL
;
1283 if (FAILED(IRtwqAsyncResult_GetObject(result
, &context
)))
1284 WARN("Expected object to be set for result object.\n");
1286 callback
->callback(context
);
1289 IUnknown_Release(context
);
1294 static const IRtwqAsyncCallbackVtbl periodic_callback_vtbl
=
1296 periodic_callback_QueryInterface
,
1297 periodic_callback_AddRef
,
1298 periodic_callback_Release
,
1299 periodic_callback_GetParameters
,
1300 periodic_callback_Invoke
,
1303 static HRESULT
create_periodic_callback_obj(RTWQPERIODICCALLBACK callback
, IRtwqAsyncCallback
**out
)
1305 struct periodic_callback
*object
;
1307 object
= heap_alloc(sizeof(*object
));
1309 return E_OUTOFMEMORY
;
1311 object
->IRtwqAsyncCallback_iface
.lpVtbl
= &periodic_callback_vtbl
;
1312 object
->refcount
= 1;
1313 object
->callback
= callback
;
1315 *out
= &object
->IRtwqAsyncCallback_iface
;
1320 HRESULT WINAPI
RtwqAddPeriodicCallback(RTWQPERIODICCALLBACK callback
, IUnknown
*context
, DWORD
*key
)
1322 IRtwqAsyncCallback
*periodic_callback
;
1323 RTWQWORKITEM_KEY workitem_key
;
1324 IRtwqAsyncResult
*result
;
1325 struct queue
*queue
;
1328 TRACE("%p, %p, %p.\n", callback
, context
, key
);
1330 if (FAILED(hr
= grab_queue(RTWQ_CALLBACK_QUEUE_TIMER
, &queue
)))
1333 if (FAILED(hr
= create_periodic_callback_obj(callback
, &periodic_callback
)))
1336 hr
= create_async_result(context
, periodic_callback
, NULL
, &result
);
1337 IRtwqAsyncCallback_Release(periodic_callback
);
1341 /* Same period MFGetTimerPeriodicity() returns. */
1342 hr
= queue_submit_timer(queue
, result
, 0, 10, key
? &workitem_key
: NULL
);
1344 IRtwqAsyncResult_Release(result
);
1347 *key
= workitem_key
;
1352 HRESULT WINAPI
RtwqRemovePeriodicCallback(DWORD key
)
1354 struct queue
*queue
;
1357 TRACE("%#x.\n", key
);
1359 if (FAILED(hr
= grab_queue(RTWQ_CALLBACK_QUEUE_TIMER
, &queue
)))
1362 return queue_cancel_item(queue
, get_item_key(SCHEDULED_ITEM_KEY_MASK
, key
));
1365 HRESULT WINAPI
RtwqCancelWorkItem(RTWQWORKITEM_KEY key
)
1367 struct queue
*queue
;
1370 TRACE("%s.\n", wine_dbgstr_longlong(key
));
1372 if (FAILED(hr
= grab_queue(RTWQ_CALLBACK_QUEUE_TIMER
, &queue
)))
1375 return queue_cancel_item(queue
, key
);
1378 HRESULT WINAPI
RtwqInvokeCallback(IRtwqAsyncResult
*result
)
1380 TRACE("%p.\n", result
);
1382 return invoke_async_callback(result
);
1385 HRESULT WINAPI
RtwqPutWorkItem(DWORD queue
, LONG priority
, IRtwqAsyncResult
*result
)
1387 TRACE("%#x, %d, %p.\n", queue
, priority
, result
);
1389 return queue_put_work_item(queue
, priority
, result
);
1392 HRESULT WINAPI
RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type
, DWORD
*queue
)
1394 struct queue_desc desc
;
1396 TRACE("%d, %p.\n", queue_type
, queue
);
1398 desc
.queue_type
= queue_type
;
1399 desc
.ops
= &pool_queue_ops
;
1400 desc
.target_queue
= 0;
1401 return alloc_user_queue(&desc
, queue
);
1404 HRESULT WINAPI
RtwqLockWorkQueue(DWORD queue
)
1406 TRACE("%#x.\n", queue
);
1408 return lock_user_queue(queue
);
1411 HRESULT WINAPI
RtwqUnlockWorkQueue(DWORD queue
)
1413 TRACE("%#x.\n", queue
);
1415 return unlock_user_queue(queue
);
1418 HRESULT WINAPI
RtwqSetLongRunning(DWORD queue_id
, BOOL enable
)
1420 struct queue
*queue
;
1424 TRACE("%#x, %d.\n", queue_id
, enable
);
1426 lock_user_queue(queue_id
);
1428 if (SUCCEEDED(hr
= grab_queue(queue_id
, &queue
)))
1430 for (i
= 0; i
< ARRAY_SIZE(queue
->envs
); ++i
)
1431 queue
->envs
[i
].u
.s
.LongFunction
= !!enable
;
1434 unlock_user_queue(queue_id
);
1439 HRESULT WINAPI
RtwqLockSharedWorkQueue(const WCHAR
*usageclass
, LONG priority
, DWORD
*taskid
, DWORD
*queue
)
1441 FIXME("%s, %d, %p, %p.\n", debugstr_w(usageclass
), priority
, taskid
, queue
);
1443 return RtwqAllocateWorkQueue(RTWQ_STANDARD_WORKQUEUE
, queue
);
1446 HRESULT WINAPI
RtwqSetDeadline(DWORD queue_id
, LONGLONG deadline
, HANDLE
*request
)
1448 FIXME("%#x, %s, %p.\n", queue_id
, wine_dbgstr_longlong(deadline
), request
);
1453 HRESULT WINAPI
RtwqSetDeadline2(DWORD queue_id
, LONGLONG deadline
, LONGLONG predeadline
, HANDLE
*request
)
1455 FIXME("%#x, %s, %s, %p.\n", queue_id
, wine_dbgstr_longlong(deadline
), wine_dbgstr_longlong(predeadline
), request
);
1460 HRESULT WINAPI
RtwqCancelDeadline(HANDLE request
)
1462 FIXME("%p.\n", request
);
1467 HRESULT WINAPI
RtwqAllocateSerialWorkQueue(DWORD target_queue
, DWORD
*queue
)
1469 struct queue_desc desc
;
1471 TRACE("%#x, %p.\n", target_queue
, queue
);
1473 desc
.queue_type
= RTWQ_STANDARD_WORKQUEUE
;
1474 desc
.ops
= &serial_queue_ops
;
1475 desc
.target_queue
= target_queue
;
1476 return alloc_user_queue(&desc
, queue
);
1479 HRESULT WINAPI
RtwqJoinWorkQueue(DWORD queue
, HANDLE hFile
, HANDLE
*cookie
)
1481 FIXME("%#x, %p, %p.\n", queue
, hFile
, cookie
);
1486 HRESULT WINAPI
RtwqUnjoinWorkQueue(DWORD queue
, HANDLE cookie
)
1488 FIXME("%#x, %p.\n", queue
, cookie
);
1493 HRESULT WINAPI
RtwqGetWorkQueueMMCSSClass(DWORD queue
, WCHAR
*class, DWORD
*length
)
1495 FIXME("%#x, %p, %p.\n", queue
, class, length
);
1500 HRESULT WINAPI
RtwqGetWorkQueueMMCSSTaskId(DWORD queue
, DWORD
*taskid
)
1502 FIXME("%#x, %p.\n", queue
, taskid
);
1507 HRESULT WINAPI
RtwqGetWorkQueueMMCSSPriority(DWORD queue
, LONG
*priority
)
1509 FIXME("%#x, %p.\n", queue
, priority
);
1514 HRESULT WINAPI
RtwqRegisterPlatformWithMMCSS(const WCHAR
*class, DWORD
*taskid
, LONG priority
)
1516 FIXME("%s, %p, %d.\n", debugstr_w(class), taskid
, priority
);
1521 HRESULT WINAPI
RtwqUnregisterPlatformFromMMCSS(void)
1528 HRESULT WINAPI
RtwqBeginRegisterWorkQueueWithMMCSS(DWORD queue
, const WCHAR
*class, DWORD taskid
, LONG priority
,
1529 IRtwqAsyncCallback
*callback
, IUnknown
*state
)
1531 FIXME("%#x, %s, %u, %d, %p, %p.\n", queue
, debugstr_w(class), taskid
, priority
, callback
, state
);
1536 HRESULT WINAPI
RtwqRegisterPlatformEvents(IRtwqPlatformEvents
*events
)
1538 FIXME("%p.\n", events
);
1543 HRESULT WINAPI
RtwqUnregisterPlatformEvents(IRtwqPlatformEvents
*events
)
1545 FIXME("%p.\n", events
);