2 * Generic Implementation of COutputQueue
4 * Copyright 2011 Aric Stewart, CodeWeavers
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
24 #include "wine/debug.h"
25 #include "wine/unicode.h"
26 #include "wine/list.h"
27 #include "wine/strmbase.h"
32 WINE_DEFAULT_DEBUG_CHANNEL(strmbase
);
34 enum {SAMPLE_PACKET
, EOS_PACKET
};
36 typedef struct tagQueuedEvent
{
40 IMediaSample
*pSample
;
43 static DWORD WINAPI
OutputQueue_InitialThreadProc(LPVOID data
)
45 OutputQueue
*This
= (OutputQueue
*)data
;
46 return This
->pFuncsTable
->pfnThreadProc(This
);
49 static void OutputQueue_FreeSamples(OutputQueue
*pOutputQueue
)
51 struct list
*cursor
, *cursor2
;
52 LIST_FOR_EACH_SAFE(cursor
, cursor2
, pOutputQueue
->SampleList
)
54 QueuedEvent
*qev
= LIST_ENTRY(cursor
, QueuedEvent
, entry
);
56 HeapFree(GetProcessHeap(),0,qev
);
60 HRESULT WINAPI
OutputQueue_Construct(
61 BaseOutputPin
*pInputPin
,
67 const OutputQueueFuncTable
* pFuncsTable
,
68 OutputQueue
**ppOutputQueue
)
72 BOOL threaded
= FALSE
;
77 if (!pInputPin
|| !pFuncsTable
|| !ppOutputQueue
)
80 *ppOutputQueue
= HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY
,sizeof(OutputQueue
));
84 This
= *ppOutputQueue
;
85 This
->pFuncsTable
= pFuncsTable
;
86 This
->lBatchSize
= lBatchSize
;
87 This
->bBatchExact
= bBatchExact
;
88 InitializeCriticalSection(&This
->csQueue
);
89 This
->csQueue
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": OutputQueue.csQueue");
90 This
->SampleList
= HeapAlloc(GetProcessHeap(),0,sizeof(struct list
));
91 if (!This
->SampleList
)
93 OutputQueue_Destroy(This
);
94 *ppOutputQueue
= NULL
;
97 list_init(This
->SampleList
);
99 This
->pInputPin
= pInputPin
;
100 IPin_AddRef((IPin
*)pInputPin
);
102 EnterCriticalSection(&This
->csQueue
);
103 if (bAuto
&& pInputPin
->pMemInputPin
)
104 threaded
= IMemInputPin_ReceiveCanBlock(pInputPin
->pMemInputPin
);
110 This
->hThread
= CreateThread(NULL
, 0, OutputQueue_InitialThreadProc
, This
, 0, &tid
);
113 SetThreadPriority(This
->hThread
, dwPriority
);
114 This
->hProcessQueue
= CreateEventW(NULL
, 0, 0, NULL
);
117 LeaveCriticalSection(&This
->csQueue
);
122 HRESULT WINAPI
OutputQueue_Destroy(OutputQueue
*pOutputQueue
)
124 EnterCriticalSection(&pOutputQueue
->csQueue
);
125 OutputQueue_FreeSamples(pOutputQueue
);
126 pOutputQueue
->bTerminate
= TRUE
;
127 SetEvent(pOutputQueue
->hProcessQueue
);
128 LeaveCriticalSection(&pOutputQueue
->csQueue
);
130 pOutputQueue
->csQueue
.DebugInfo
->Spare
[0] = 0;
131 DeleteCriticalSection(&pOutputQueue
->csQueue
);
132 CloseHandle(pOutputQueue
->hProcessQueue
);
134 HeapFree(GetProcessHeap(),0,pOutputQueue
->SampleList
);
136 IPin_Release((IPin
*)pOutputQueue
->pInputPin
);
137 HeapFree(GetProcessHeap(),0,pOutputQueue
);
141 HRESULT WINAPI
OutputQueue_ReceiveMultiple(OutputQueue
*pOutputQueue
, IMediaSample
**ppSamples
, LONG nSamples
, LONG
*nSamplesProcessed
)
146 if (!pOutputQueue
->pInputPin
->pin
.pConnectedTo
|| !pOutputQueue
->pInputPin
->pMemInputPin
)
147 return VFW_E_NOT_CONNECTED
;
149 if (!pOutputQueue
->hThread
)
151 IMemInputPin_AddRef(pOutputQueue
->pInputPin
->pMemInputPin
);
152 hr
= IMemInputPin_ReceiveMultiple(pOutputQueue
->pInputPin
->pMemInputPin
,ppSamples
, nSamples
, nSamplesProcessed
);
153 IMemInputPin_Release(pOutputQueue
->pInputPin
->pMemInputPin
);
157 EnterCriticalSection(&pOutputQueue
->csQueue
);
158 *nSamplesProcessed
= 0;
160 for (i
= 0; i
< nSamples
; i
++)
162 QueuedEvent
*qev
= HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent
));
165 ERR("Out of Memory\n");
169 qev
->type
= SAMPLE_PACKET
;
170 qev
->pSample
= ppSamples
[i
];
171 IMediaSample_AddRef(ppSamples
[i
]);
172 list_add_tail(pOutputQueue
->SampleList
, &qev
->entry
);
173 (*nSamplesProcessed
)++;
176 if (!pOutputQueue
->bBatchExact
|| list_count(pOutputQueue
->SampleList
) >= pOutputQueue
->lBatchSize
)
177 SetEvent(pOutputQueue
->hProcessQueue
);
178 LeaveCriticalSection(&pOutputQueue
->csQueue
);
183 HRESULT WINAPI
OutputQueue_Receive(OutputQueue
*pOutputQueue
, IMediaSample
*pSample
)
186 return OutputQueue_ReceiveMultiple(pOutputQueue
,&pSample
,1,&processed
);
189 VOID WINAPI
OutputQueue_SendAnyway(OutputQueue
*pOutputQueue
)
191 if (pOutputQueue
->hThread
)
193 EnterCriticalSection(&pOutputQueue
->csQueue
);
194 if (!list_empty(pOutputQueue
->SampleList
))
196 pOutputQueue
->bSendAnyway
= TRUE
;
197 SetEvent(pOutputQueue
->hProcessQueue
);
199 LeaveCriticalSection(&pOutputQueue
->csQueue
);
203 VOID WINAPI
OutputQueue_EOS(OutputQueue
*pOutputQueue
)
205 EnterCriticalSection(&pOutputQueue
->csQueue
);
206 if (pOutputQueue
->hThread
)
208 QueuedEvent
*qev
= HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent
));
211 ERR("Out of Memory\n");
212 LeaveCriticalSection(&pOutputQueue
->csQueue
);
215 qev
->type
= EOS_PACKET
;
217 list_add_tail(pOutputQueue
->SampleList
, &qev
->entry
);
222 IPin_ConnectedTo((IPin
*)pOutputQueue
->pInputPin
, &ppin
);
225 IPin_EndOfStream(ppin
);
229 LeaveCriticalSection(&pOutputQueue
->csQueue
);
230 /* Covers sending the Event to the worker Thread */
231 OutputQueue_SendAnyway(pOutputQueue
);
234 DWORD WINAPI
OutputQueueImpl_ThreadProc(OutputQueue
*pOutputQueue
)
238 EnterCriticalSection(&pOutputQueue
->csQueue
);
239 if (!list_empty(pOutputQueue
->SampleList
) &&
240 (!pOutputQueue
->bBatchExact
||
241 list_count(pOutputQueue
->SampleList
) >= pOutputQueue
->lBatchSize
||
242 pOutputQueue
->bSendAnyway
246 while (!list_empty(pOutputQueue
->SampleList
))
248 IMediaSample
**ppSamples
;
250 LONG nSamplesProcessed
;
251 struct list
*cursor
, *cursor2
;
254 /* First Pass Process Samples */
255 i
= list_count(pOutputQueue
->SampleList
);
256 ppSamples
= HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample
*) * i
);
258 LIST_FOR_EACH_SAFE(cursor
, cursor2
, pOutputQueue
->SampleList
)
260 QueuedEvent
*qev
= LIST_ENTRY(cursor
, QueuedEvent
, entry
);
261 if (qev
->type
== SAMPLE_PACKET
)
262 ppSamples
[nSamples
++] = qev
->pSample
;
266 HeapFree(GetProcessHeap(),0,qev
);
269 if (pOutputQueue
->pInputPin
->pin
.pConnectedTo
&& pOutputQueue
->pInputPin
->pMemInputPin
)
271 IMemInputPin_AddRef(pOutputQueue
->pInputPin
->pMemInputPin
);
272 LeaveCriticalSection(&pOutputQueue
->csQueue
);
273 IMemInputPin_ReceiveMultiple(pOutputQueue
->pInputPin
->pMemInputPin
, ppSamples
, nSamples
, &nSamplesProcessed
);
274 EnterCriticalSection(&pOutputQueue
->csQueue
);
275 IMemInputPin_Release(pOutputQueue
->pInputPin
->pMemInputPin
);
277 for (i
= 0; i
< nSamples
; i
++)
278 IUnknown_Release(ppSamples
[i
]);
279 HeapFree(GetProcessHeap(),0,ppSamples
);
281 /* Process Non-Samples */
282 LIST_FOR_EACH_SAFE(cursor
, cursor2
, pOutputQueue
->SampleList
)
284 QueuedEvent
*qev
= LIST_ENTRY(cursor
, QueuedEvent
, entry
);
285 if (qev
->type
== EOS_PACKET
)
288 IPin_ConnectedTo((IPin
*)pOutputQueue
->pInputPin
, &ppin
);
291 IPin_EndOfStream(ppin
);
295 else if (qev
->type
== SAMPLE_PACKET
)
298 FIXME("Unhandled Event type %i\n",qev
->type
);
300 HeapFree(GetProcessHeap(),0,qev
);
303 pOutputQueue
->bSendAnyway
= FALSE
;
305 LeaveCriticalSection(&pOutputQueue
->csQueue
);
306 WaitForSingleObject(pOutputQueue
->hProcessQueue
, INFINITE
);
308 while (!pOutputQueue
->bTerminate
);