1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/filters/decoder_stream.h"
8 #include "base/callback_helpers.h"
9 #include "base/debug/trace_event.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/single_thread_task_runner.h"
13 #include "media/base/audio_decoder.h"
14 #include "media/base/bind_to_current_loop.h"
15 #include "media/base/decoder_buffer.h"
16 #include "media/base/demuxer_stream.h"
17 #include "media/base/video_decoder.h"
18 #include "media/filters/decrypting_demuxer_stream.h"
22 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for
23 // templated classes such as this.
24 template <DemuxerStream::Type StreamType
>
25 static const char* GetTraceString();
27 #define FUNCTION_DVLOG(level) \
28 DVLOG(level) << __FUNCTION__ << \
29 "<" << DecoderStreamTraits<StreamType>::ToString() << ">"
32 const char* GetTraceString
<DemuxerStream::VIDEO
>() {
33 return "DecoderStream<VIDEO>::Decode";
37 const char* GetTraceString
<DemuxerStream::AUDIO
>() {
38 return "DecoderStream<AUDIO>::Decode";
41 template <DemuxerStream::Type StreamType
>
42 DecoderStream
<StreamType
>::DecoderStream(
43 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
44 ScopedVector
<Decoder
> decoders
,
45 const SetDecryptorReadyCB
& set_decryptor_ready_cb
)
46 : task_runner_(task_runner
),
47 state_(STATE_UNINITIALIZED
),
51 new DecoderSelector
<StreamType
>(task_runner
,
53 set_decryptor_ready_cb
)),
54 active_splice_(false),
55 pending_decode_requests_(0),
56 weak_factory_(this) {}
58 template <DemuxerStream::Type StreamType
>
59 DecoderStream
<StreamType
>::~DecoderStream() {
61 DCHECK(task_runner_
->BelongsToCurrentThread());
63 decoder_selector_
.reset();
65 if (!init_cb_
.is_null()) {
66 task_runner_
->PostTask(FROM_HERE
,
67 base::Bind(base::ResetAndReturn(&init_cb_
), false));
69 if (!read_cb_
.is_null()) {
70 task_runner_
->PostTask(FROM_HERE
, base::Bind(
71 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
73 if (!reset_cb_
.is_null())
74 task_runner_
->PostTask(FROM_HERE
, base::ResetAndReturn(&reset_cb_
));
78 decrypting_demuxer_stream_
.reset();
81 template <DemuxerStream::Type StreamType
>
82 void DecoderStream
<StreamType
>::Initialize(DemuxerStream
* stream
,
84 const StatisticsCB
& statistics_cb
,
85 const InitCB
& init_cb
) {
87 DCHECK(task_runner_
->BelongsToCurrentThread());
88 DCHECK_EQ(state_
, STATE_UNINITIALIZED
) << state_
;
89 DCHECK(init_cb_
.is_null());
90 DCHECK(!init_cb
.is_null());
92 statistics_cb_
= statistics_cb
;
95 low_delay_
= low_delay
;
97 state_
= STATE_INITIALIZING
;
98 // TODO(xhwang): DecoderSelector only needs a config to select a decoder.
99 decoder_selector_
->SelectDecoder(
101 base::Bind(&DecoderStream
<StreamType
>::OnDecoderSelected
,
102 weak_factory_
.GetWeakPtr()),
103 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
104 weak_factory_
.GetWeakPtr()));
107 template <DemuxerStream::Type StreamType
>
108 void DecoderStream
<StreamType
>::Read(const ReadCB
& read_cb
) {
110 DCHECK(task_runner_
->BelongsToCurrentThread());
111 DCHECK(state_
!= STATE_UNINITIALIZED
&& state_
!= STATE_INITIALIZING
)
113 // No two reads in the flight at any time.
114 DCHECK(read_cb_
.is_null());
115 // No read during resetting or stopping process.
116 DCHECK(reset_cb_
.is_null());
118 if (state_
== STATE_ERROR
) {
119 task_runner_
->PostTask(
120 FROM_HERE
, base::Bind(read_cb
, DECODE_ERROR
, scoped_refptr
<Output
>()));
124 if (state_
== STATE_END_OF_STREAM
&& ready_outputs_
.empty()) {
125 task_runner_
->PostTask(
126 FROM_HERE
, base::Bind(read_cb
, OK
, StreamTraits::CreateEOSOutput()));
130 if (!ready_outputs_
.empty()) {
131 task_runner_
->PostTask(FROM_HERE
,
132 base::Bind(read_cb
, OK
, ready_outputs_
.front()));
133 ready_outputs_
.pop_front();
138 if (state_
== STATE_NORMAL
&& CanDecodeMore())
139 ReadFromDemuxerStream();
142 template <DemuxerStream::Type StreamType
>
143 void DecoderStream
<StreamType
>::Reset(const base::Closure
& closure
) {
145 DCHECK(task_runner_
->BelongsToCurrentThread());
146 DCHECK(state_
!= STATE_UNINITIALIZED
)<< state_
;
147 DCHECK(reset_cb_
.is_null());
151 if (!read_cb_
.is_null()) {
152 task_runner_
->PostTask(FROM_HERE
, base::Bind(
153 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
156 ready_outputs_
.clear();
158 // During decoder reinitialization, the Decoder does not need to be and
159 // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
161 if (state_
== STATE_REINITIALIZING_DECODER
)
164 // During pending demuxer read and when not using DecryptingDemuxerStream,
165 // the Decoder will be reset after demuxer read is returned
166 // (in OnBufferReady()).
167 if (state_
== STATE_PENDING_DEMUXER_READ
&& !decrypting_demuxer_stream_
)
170 if (decrypting_demuxer_stream_
) {
171 decrypting_demuxer_stream_
->Reset(base::Bind(
172 &DecoderStream
<StreamType
>::ResetDecoder
, weak_factory_
.GetWeakPtr()));
179 template <DemuxerStream::Type StreamType
>
180 bool DecoderStream
<StreamType
>::CanReadWithoutStalling() const {
181 DCHECK(task_runner_
->BelongsToCurrentThread());
182 return !ready_outputs_
.empty() || decoder_
->CanReadWithoutStalling();
186 bool DecoderStream
<DemuxerStream::AUDIO
>::CanReadWithoutStalling() const {
187 DCHECK(task_runner_
->BelongsToCurrentThread());
191 template <DemuxerStream::Type StreamType
>
192 int DecoderStream
<StreamType
>::GetMaxDecodeRequests() const {
193 return decoder_
->GetMaxDecodeRequests();
197 int DecoderStream
<DemuxerStream::AUDIO
>::GetMaxDecodeRequests() const {
201 template <DemuxerStream::Type StreamType
>
202 bool DecoderStream
<StreamType
>::CanDecodeMore() const {
203 DCHECK(task_runner_
->BelongsToCurrentThread());
205 // Limit total number of outputs stored in |ready_outputs_| and being decoded.
206 // It only makes sense to saturate decoder completely when output queue is
209 static_cast<int>(ready_outputs_
.size()) + pending_decode_requests_
;
210 return num_decodes
< GetMaxDecodeRequests();
213 template <DemuxerStream::Type StreamType
>
214 void DecoderStream
<StreamType
>::OnDecoderSelected(
215 scoped_ptr
<Decoder
> selected_decoder
,
216 scoped_ptr
<DecryptingDemuxerStream
> decrypting_demuxer_stream
) {
218 DCHECK(task_runner_
->BelongsToCurrentThread());
219 DCHECK_EQ(state_
, STATE_INITIALIZING
) << state_
;
220 DCHECK(!init_cb_
.is_null());
221 DCHECK(read_cb_
.is_null());
222 DCHECK(reset_cb_
.is_null());
224 decoder_selector_
.reset();
225 if (decrypting_demuxer_stream
)
226 stream_
= decrypting_demuxer_stream
.get();
228 if (!selected_decoder
) {
229 state_
= STATE_UNINITIALIZED
;
230 base::ResetAndReturn(&init_cb_
).Run(false);
234 state_
= STATE_NORMAL
;
235 decoder_
= selected_decoder
.Pass();
236 decrypting_demuxer_stream_
= decrypting_demuxer_stream
.Pass();
238 if (StreamTraits::NeedsBitstreamConversion(decoder_
.get()))
239 stream_
->EnableBitstreamConverter();
240 base::ResetAndReturn(&init_cb_
).Run(true);
243 template <DemuxerStream::Type StreamType
>
244 void DecoderStream
<StreamType
>::SatisfyRead(
246 const scoped_refptr
<Output
>& output
) {
247 DCHECK(!read_cb_
.is_null());
248 base::ResetAndReturn(&read_cb_
).Run(status
, output
);
251 template <DemuxerStream::Type StreamType
>
252 void DecoderStream
<StreamType
>::Decode(
253 const scoped_refptr
<DecoderBuffer
>& buffer
) {
255 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
) << state_
;
256 DCHECK_LT(pending_decode_requests_
, GetMaxDecodeRequests());
257 DCHECK(reset_cb_
.is_null());
258 DCHECK(buffer
.get());
260 int buffer_size
= buffer
->end_of_stream() ? 0 : buffer
->data_size();
262 TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString
<StreamType
>(), this);
263 ++pending_decode_requests_
;
264 decoder_
->Decode(buffer
,
265 base::Bind(&DecoderStream
<StreamType
>::OnDecodeDone
,
266 weak_factory_
.GetWeakPtr(),
268 buffer
->end_of_stream()));
271 template <DemuxerStream::Type StreamType
>
272 void DecoderStream
<StreamType
>::FlushDecoder() {
273 Decode(DecoderBuffer::CreateEOSBuffer());
276 template <DemuxerStream::Type StreamType
>
277 void DecoderStream
<StreamType
>::OnDecodeDone(int buffer_size
,
279 typename
Decoder::Status status
) {
280 FUNCTION_DVLOG(2) << status
;
281 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
282 state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
284 DCHECK_GT(pending_decode_requests_
, 0);
286 --pending_decode_requests_
;
288 TRACE_EVENT_ASYNC_END0("media", GetTraceString
<StreamType
>(), this);
290 if (state_
== STATE_ERROR
) {
291 DCHECK(read_cb_
.is_null());
295 // Drop decoding result if Reset() was called during decoding.
296 // The resetting process will be handled when the decoder is reset.
297 if (!reset_cb_
.is_null())
301 case Decoder::kDecodeError
:
302 case Decoder::kDecryptError
:
303 state_
= STATE_ERROR
;
304 ready_outputs_
.clear();
305 if (!read_cb_
.is_null())
306 SatisfyRead(DECODE_ERROR
, NULL
);
309 case Decoder::kAborted
:
310 // Decoder can return kAborted only when Reset is pending.
315 // Any successful decode counts!
317 StreamTraits::ReportStatistics(statistics_cb_
, buffer_size
);
319 if (state_
== STATE_NORMAL
) {
321 state_
= STATE_END_OF_STREAM
;
322 if (ready_outputs_
.empty() && !read_cb_
.is_null())
323 SatisfyRead(OK
, StreamTraits::CreateEOSOutput());
328 ReadFromDemuxerStream();
332 if (state_
== STATE_FLUSHING_DECODER
&& !pending_decode_requests_
)
333 ReinitializeDecoder();
338 template <DemuxerStream::Type StreamType
>
339 void DecoderStream
<StreamType
>::OnDecodeOutputReady(
340 const scoped_refptr
<Output
>& output
) {
341 FUNCTION_DVLOG(2) << ": " << output
->timestamp().InMilliseconds() << " ms";
342 DCHECK(output
.get());
343 DCHECK(!output
->end_of_stream());
344 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
345 state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
348 if (state_
== STATE_ERROR
) {
349 DCHECK(read_cb_
.is_null());
353 // Drop decoding result if Reset() was called during decoding.
354 // The resetting process will be handled when the decoder is reset.
355 if (!reset_cb_
.is_null())
358 // TODO(xhwang): VideoDecoder doesn't need to return EOS after it's flushed.
359 // Fix all decoders and remove this block.
360 // Store decoded output.
361 ready_outputs_
.push_back(output
);
363 if (read_cb_
.is_null())
366 // Satisfy outstanding read request, if any.
367 scoped_refptr
<Output
> read_result
= ready_outputs_
.front();
368 ready_outputs_
.pop_front();
369 SatisfyRead(OK
, output
);
372 template <DemuxerStream::Type StreamType
>
373 void DecoderStream
<StreamType
>::ReadFromDemuxerStream() {
375 DCHECK_EQ(state_
, STATE_NORMAL
) << state_
;
376 DCHECK(CanDecodeMore());
377 DCHECK(reset_cb_
.is_null());
379 state_
= STATE_PENDING_DEMUXER_READ
;
380 stream_
->Read(base::Bind(&DecoderStream
<StreamType
>::OnBufferReady
,
381 weak_factory_
.GetWeakPtr()));
384 template <DemuxerStream::Type StreamType
>
385 void DecoderStream
<StreamType
>::OnBufferReady(
386 DemuxerStream::Status status
,
387 const scoped_refptr
<DecoderBuffer
>& buffer
) {
388 FUNCTION_DVLOG(2) << ": " << status
<< ", "
389 << (buffer
.get() ? buffer
->AsHumanReadableString()
392 DCHECK(task_runner_
->BelongsToCurrentThread());
393 DCHECK(state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
395 DCHECK_EQ(buffer
.get() != NULL
, status
== DemuxerStream::kOk
) << status
;
397 // Decoding has been stopped (e.g due to an error).
398 if (state_
!= STATE_PENDING_DEMUXER_READ
) {
399 DCHECK(state_
== STATE_ERROR
);
400 DCHECK(read_cb_
.is_null());
404 state_
= STATE_NORMAL
;
406 if (status
== DemuxerStream::kConfigChanged
) {
407 FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
408 DCHECK(stream_
->SupportsConfigChanges());
410 if (!config_change_observer_cb_
.is_null())
411 config_change_observer_cb_
.Run();
413 state_
= STATE_FLUSHING_DECODER
;
414 if (!reset_cb_
.is_null()) {
415 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
416 // which will continue the resetting process in it's callback.
417 if (!decrypting_demuxer_stream_
)
418 Reset(base::ResetAndReturn(&reset_cb_
));
419 // Reinitialization will continue after Reset() is done.
426 if (!reset_cb_
.is_null()) {
427 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
428 // which will continue the resetting process in it's callback.
429 if (!decrypting_demuxer_stream_
)
430 Reset(base::ResetAndReturn(&reset_cb_
));
434 if (status
== DemuxerStream::kAborted
) {
435 if (!read_cb_
.is_null())
436 SatisfyRead(DEMUXER_READ_ABORTED
, NULL
);
440 if (!splice_observer_cb_
.is_null() && !buffer
->end_of_stream()) {
441 const bool has_splice_ts
= buffer
->splice_timestamp() != kNoTimestamp();
442 if (active_splice_
|| has_splice_ts
) {
443 splice_observer_cb_
.Run(buffer
->splice_timestamp());
444 active_splice_
= has_splice_ts
;
448 DCHECK(status
== DemuxerStream::kOk
) << status
;
451 // Read more data if the decoder supports multiple parallel decoding requests.
452 if (CanDecodeMore() && !buffer
->end_of_stream())
453 ReadFromDemuxerStream();
456 template <DemuxerStream::Type StreamType
>
457 void DecoderStream
<StreamType
>::ReinitializeDecoder() {
459 DCHECK(task_runner_
->BelongsToCurrentThread());
460 DCHECK_EQ(state_
, STATE_FLUSHING_DECODER
) << state_
;
461 DCHECK_EQ(pending_decode_requests_
, 0);
463 DCHECK(StreamTraits::GetDecoderConfig(*stream_
).IsValidConfig());
464 state_
= STATE_REINITIALIZING_DECODER
;
465 DecoderStreamTraits
<StreamType
>::Initialize(
467 StreamTraits::GetDecoderConfig(*stream_
),
469 base::Bind(&DecoderStream
<StreamType
>::OnDecoderReinitialized
,
470 weak_factory_
.GetWeakPtr()),
471 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
472 weak_factory_
.GetWeakPtr()));
475 template <DemuxerStream::Type StreamType
>
476 void DecoderStream
<StreamType
>::OnDecoderReinitialized(PipelineStatus status
) {
478 DCHECK(task_runner_
->BelongsToCurrentThread());
479 DCHECK_EQ(state_
, STATE_REINITIALIZING_DECODER
) << state_
;
481 // ReinitializeDecoder() can be called in two cases:
482 // 1, Flushing decoder finished (see OnDecodeOutputReady()).
483 // 2, Reset() was called during flushing decoder (see OnDecoderReset()).
484 // Also, Reset() can be called during pending ReinitializeDecoder().
485 // This function needs to handle them all!
487 state_
= (status
== PIPELINE_OK
) ? STATE_NORMAL
: STATE_ERROR
;
489 if (!reset_cb_
.is_null()) {
490 base::ResetAndReturn(&reset_cb_
).Run();
494 if (read_cb_
.is_null())
497 if (state_
== STATE_ERROR
) {
498 SatisfyRead(DECODE_ERROR
, NULL
);
502 ReadFromDemuxerStream();
505 template <DemuxerStream::Type StreamType
>
506 void DecoderStream
<StreamType
>::ResetDecoder() {
508 DCHECK(task_runner_
->BelongsToCurrentThread());
509 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
510 state_
== STATE_ERROR
|| state_
== STATE_END_OF_STREAM
) << state_
;
511 DCHECK(!reset_cb_
.is_null());
513 decoder_
->Reset(base::Bind(&DecoderStream
<StreamType
>::OnDecoderReset
,
514 weak_factory_
.GetWeakPtr()));
517 template <DemuxerStream::Type StreamType
>
518 void DecoderStream
<StreamType
>::OnDecoderReset() {
520 DCHECK(task_runner_
->BelongsToCurrentThread());
521 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
522 state_
== STATE_ERROR
|| state_
== STATE_END_OF_STREAM
) << state_
;
523 // If Reset() was called during pending read, read callback should be fired
524 // before the reset callback is fired.
525 DCHECK(read_cb_
.is_null());
526 DCHECK(!reset_cb_
.is_null());
528 if (state_
!= STATE_FLUSHING_DECODER
) {
529 state_
= STATE_NORMAL
;
530 active_splice_
= false;
531 base::ResetAndReturn(&reset_cb_
).Run();
535 // The resetting process will be continued in OnDecoderReinitialized().
536 ReinitializeDecoder();
539 template class DecoderStream
<DemuxerStream::VIDEO
>;
540 template class DecoderStream
<DemuxerStream::AUDIO
>;