2 * Generic Implementation of COutputQueue
4 * Copyright 2011 Aric Stewart, CodeWeavers
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
21 #include "strmbase_private.h"
23 WINE_DEFAULT_DEBUG_CHANNEL(strmbase
);
25 enum {SAMPLE_PACKET
, EOS_PACKET
};
27 typedef struct tagQueuedEvent
{
31 IMediaSample
*pSample
;
34 static DWORD WINAPI
OutputQueue_InitialThreadProc(LPVOID data
)
36 OutputQueue
*This
= (OutputQueue
*)data
;
37 return This
->pFuncsTable
->pfnThreadProc(This
);
40 static void OutputQueue_FreeSamples(OutputQueue
*pOutputQueue
)
42 struct list
*cursor
, *cursor2
;
43 LIST_FOR_EACH_SAFE(cursor
, cursor2
, &pOutputQueue
->SampleList
)
45 QueuedEvent
*qev
= LIST_ENTRY(cursor
, QueuedEvent
, entry
);
47 HeapFree(GetProcessHeap(),0,qev
);
51 HRESULT WINAPI
OutputQueue_Construct(struct strmbase_source
*pInputPin
, BOOL bAuto
,
52 BOOL bQueue
, LONG lBatchSize
, BOOL bBatchExact
, DWORD dwPriority
,
53 const OutputQueueFuncTable
*pFuncsTable
, OutputQueue
**ppOutputQueue
)
56 BOOL threaded
= FALSE
;
61 if (!pInputPin
|| !pFuncsTable
|| !ppOutputQueue
)
64 *ppOutputQueue
= HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY
,sizeof(OutputQueue
));
68 This
= *ppOutputQueue
;
69 This
->pFuncsTable
= pFuncsTable
;
70 This
->lBatchSize
= lBatchSize
;
71 This
->bBatchExact
= bBatchExact
;
72 InitializeCriticalSection(&This
->csQueue
);
73 This
->csQueue
.DebugInfo
->Spare
[0] = (DWORD_PTR
)(__FILE__
": OutputQueue.csQueue");
74 list_init(&This
->SampleList
);
76 This
->pInputPin
= pInputPin
;
77 IPin_AddRef(&pInputPin
->pin
.IPin_iface
);
79 EnterCriticalSection(&This
->csQueue
);
80 if (bAuto
&& pInputPin
->pMemInputPin
)
81 threaded
= IMemInputPin_ReceiveCanBlock(pInputPin
->pMemInputPin
) == S_OK
;
87 This
->hThread
= CreateThread(NULL
, 0, OutputQueue_InitialThreadProc
, This
, 0, &tid
);
90 SetThreadPriority(This
->hThread
, dwPriority
);
91 This
->hProcessQueue
= CreateEventW(NULL
, 0, 0, NULL
);
94 LeaveCriticalSection(&This
->csQueue
);
99 HRESULT WINAPI
OutputQueue_Destroy(OutputQueue
*pOutputQueue
)
101 EnterCriticalSection(&pOutputQueue
->csQueue
);
102 OutputQueue_FreeSamples(pOutputQueue
);
103 pOutputQueue
->bTerminate
= TRUE
;
104 SetEvent(pOutputQueue
->hProcessQueue
);
105 LeaveCriticalSection(&pOutputQueue
->csQueue
);
107 pOutputQueue
->csQueue
.DebugInfo
->Spare
[0] = 0;
108 DeleteCriticalSection(&pOutputQueue
->csQueue
);
109 CloseHandle(pOutputQueue
->hProcessQueue
);
111 IPin_Release(&pOutputQueue
->pInputPin
->pin
.IPin_iface
);
112 HeapFree(GetProcessHeap(),0,pOutputQueue
);
116 HRESULT WINAPI
OutputQueue_ReceiveMultiple(OutputQueue
*pOutputQueue
, IMediaSample
**ppSamples
, LONG nSamples
, LONG
*nSamplesProcessed
)
121 if (!pOutputQueue
->pInputPin
->pin
.peer
|| !pOutputQueue
->pInputPin
->pMemInputPin
)
122 return VFW_E_NOT_CONNECTED
;
124 if (!pOutputQueue
->hThread
)
126 IMemInputPin_AddRef(pOutputQueue
->pInputPin
->pMemInputPin
);
127 hr
= IMemInputPin_ReceiveMultiple(pOutputQueue
->pInputPin
->pMemInputPin
,ppSamples
, nSamples
, nSamplesProcessed
);
128 IMemInputPin_Release(pOutputQueue
->pInputPin
->pMemInputPin
);
132 EnterCriticalSection(&pOutputQueue
->csQueue
);
133 *nSamplesProcessed
= 0;
135 for (i
= 0; i
< nSamples
; i
++)
137 QueuedEvent
*qev
= HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent
));
140 ERR("Out of Memory\n");
144 qev
->type
= SAMPLE_PACKET
;
145 qev
->pSample
= ppSamples
[i
];
146 IMediaSample_AddRef(ppSamples
[i
]);
147 list_add_tail(&pOutputQueue
->SampleList
, &qev
->entry
);
148 (*nSamplesProcessed
)++;
151 if (!pOutputQueue
->bBatchExact
|| list_count(&pOutputQueue
->SampleList
) >= pOutputQueue
->lBatchSize
)
152 SetEvent(pOutputQueue
->hProcessQueue
);
153 LeaveCriticalSection(&pOutputQueue
->csQueue
);
158 HRESULT WINAPI
OutputQueue_Receive(OutputQueue
*pOutputQueue
, IMediaSample
*pSample
)
161 return OutputQueue_ReceiveMultiple(pOutputQueue
,&pSample
,1,&processed
);
164 VOID WINAPI
OutputQueue_SendAnyway(OutputQueue
*pOutputQueue
)
166 if (pOutputQueue
->hThread
)
168 EnterCriticalSection(&pOutputQueue
->csQueue
);
169 if (!list_empty(&pOutputQueue
->SampleList
))
171 pOutputQueue
->bSendAnyway
= TRUE
;
172 SetEvent(pOutputQueue
->hProcessQueue
);
174 LeaveCriticalSection(&pOutputQueue
->csQueue
);
178 VOID WINAPI
OutputQueue_EOS(OutputQueue
*pOutputQueue
)
182 EnterCriticalSection(&pOutputQueue
->csQueue
);
183 if (pOutputQueue
->hThread
)
185 QueuedEvent
*qev
= HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent
));
188 ERR("Out of Memory\n");
189 LeaveCriticalSection(&pOutputQueue
->csQueue
);
192 qev
->type
= EOS_PACKET
;
194 list_add_tail(&pOutputQueue
->SampleList
, &qev
->entry
);
196 else if ((peer
= pOutputQueue
->pInputPin
->pin
.peer
))
197 IPin_EndOfStream(peer
);
198 LeaveCriticalSection(&pOutputQueue
->csQueue
);
199 /* Covers sending the Event to the worker Thread */
200 OutputQueue_SendAnyway(pOutputQueue
);
203 DWORD WINAPI
OutputQueueImpl_ThreadProc(OutputQueue
*pOutputQueue
)
207 EnterCriticalSection(&pOutputQueue
->csQueue
);
208 if (!list_empty(&pOutputQueue
->SampleList
) &&
209 (!pOutputQueue
->bBatchExact
||
210 list_count(&pOutputQueue
->SampleList
) >= pOutputQueue
->lBatchSize
||
211 pOutputQueue
->bSendAnyway
215 while (!list_empty(&pOutputQueue
->SampleList
))
217 IMediaSample
**ppSamples
;
219 LONG nSamplesProcessed
;
220 struct list
*cursor
, *cursor2
;
223 /* First Pass Process Samples */
224 i
= list_count(&pOutputQueue
->SampleList
);
225 ppSamples
= HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample
*) * i
);
227 LIST_FOR_EACH_SAFE(cursor
, cursor2
, &pOutputQueue
->SampleList
)
229 QueuedEvent
*qev
= LIST_ENTRY(cursor
, QueuedEvent
, entry
);
230 if (qev
->type
== SAMPLE_PACKET
)
231 ppSamples
[nSamples
++] = qev
->pSample
;
235 HeapFree(GetProcessHeap(),0,qev
);
238 if (pOutputQueue
->pInputPin
->pin
.peer
&& pOutputQueue
->pInputPin
->pMemInputPin
)
240 IMemInputPin_AddRef(pOutputQueue
->pInputPin
->pMemInputPin
);
241 LeaveCriticalSection(&pOutputQueue
->csQueue
);
242 IMemInputPin_ReceiveMultiple(pOutputQueue
->pInputPin
->pMemInputPin
, ppSamples
, nSamples
, &nSamplesProcessed
);
243 EnterCriticalSection(&pOutputQueue
->csQueue
);
244 IMemInputPin_Release(pOutputQueue
->pInputPin
->pMemInputPin
);
246 for (i
= 0; i
< nSamples
; i
++)
247 IMediaSample_Release(ppSamples
[i
]);
248 HeapFree(GetProcessHeap(),0,ppSamples
);
250 /* Process Non-Samples */
251 LIST_FOR_EACH_SAFE(cursor
, cursor2
, &pOutputQueue
->SampleList
)
253 QueuedEvent
*qev
= LIST_ENTRY(cursor
, QueuedEvent
, entry
);
254 if (qev
->type
== EOS_PACKET
)
257 if ((peer
= pOutputQueue
->pInputPin
->pin
.peer
))
258 IPin_EndOfStream(peer
);
260 else if (qev
->type
== SAMPLE_PACKET
)
263 FIXME("Unhandled Event type %i\n",qev
->type
);
265 HeapFree(GetProcessHeap(),0,qev
);
268 pOutputQueue
->bSendAnyway
= FALSE
;
270 LeaveCriticalSection(&pOutputQueue
->csQueue
);
271 WaitForSingleObject(pOutputQueue
->hProcessQueue
, INFINITE
);
273 while (!pOutputQueue
->bTerminate
);