1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/filters/decoder_stream.h"
8 #include "base/callback_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/trace_event/trace_event.h"
13 #include "media/base/bind_to_current_loop.h"
14 #include "media/base/decoder_buffer.h"
15 #include "media/base/media_log.h"
16 #include "media/base/timestamp_constants.h"
17 #include "media/base/video_decoder.h"
18 #include "media/base/video_frame.h"
19 #include "media/filters/decrypting_demuxer_stream.h"
23 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for
24 // templated classes such as this.
25 template <DemuxerStream::Type StreamType
>
26 static const char* GetTraceString();
28 #define FUNCTION_DVLOG(level) \
29 DVLOG(level) << __FUNCTION__ << "<" << GetStreamTypeString() << ">"
32 const char* GetTraceString
<DemuxerStream::VIDEO
>() {
33 return "DecoderStream<VIDEO>::Decode";
37 const char* GetTraceString
<DemuxerStream::AUDIO
>() {
38 return "DecoderStream<AUDIO>::Decode";
41 template <DemuxerStream::Type StreamType
>
42 DecoderStream
<StreamType
>::DecoderStream(
43 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
44 ScopedVector
<Decoder
> decoders
,
45 const scoped_refptr
<MediaLog
>& media_log
)
46 : task_runner_(task_runner
),
47 media_log_(media_log
),
48 state_(STATE_UNINITIALIZED
),
50 decoder_selector_(new DecoderSelector
<StreamType
>(task_runner
,
53 active_splice_(false),
55 pending_decode_requests_(0),
59 template <DemuxerStream::Type StreamType
>
60 DecoderStream
<StreamType
>::~DecoderStream() {
62 DCHECK(task_runner_
->BelongsToCurrentThread());
64 decoder_selector_
.reset();
66 if (!init_cb_
.is_null()) {
67 task_runner_
->PostTask(FROM_HERE
,
68 base::Bind(base::ResetAndReturn(&init_cb_
), false));
70 if (!read_cb_
.is_null()) {
71 task_runner_
->PostTask(FROM_HERE
, base::Bind(
72 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
74 if (!reset_cb_
.is_null())
75 task_runner_
->PostTask(FROM_HERE
, base::ResetAndReturn(&reset_cb_
));
79 decrypting_demuxer_stream_
.reset();
82 template <DemuxerStream::Type StreamType
>
83 std::string DecoderStream
<StreamType
>::GetStreamTypeString() {
84 return DecoderStreamTraits
<StreamType
>::ToString();
87 template <DemuxerStream::Type StreamType
>
88 void DecoderStream
<StreamType
>::Initialize(
89 DemuxerStream
* stream
,
90 const InitCB
& init_cb
,
91 const SetDecryptorReadyCB
& set_decryptor_ready_cb
,
92 const StatisticsCB
& statistics_cb
,
93 const base::Closure
& waiting_for_decryption_key_cb
) {
95 DCHECK(task_runner_
->BelongsToCurrentThread());
96 DCHECK_EQ(state_
, STATE_UNINITIALIZED
);
97 DCHECK(init_cb_
.is_null());
98 DCHECK(!init_cb
.is_null());
100 statistics_cb_
= statistics_cb
;
102 waiting_for_decryption_key_cb_
= waiting_for_decryption_key_cb
;
105 state_
= STATE_INITIALIZING
;
106 SelectDecoder(set_decryptor_ready_cb
);
109 template <DemuxerStream::Type StreamType
>
110 void DecoderStream
<StreamType
>::Read(const ReadCB
& read_cb
) {
112 DCHECK(task_runner_
->BelongsToCurrentThread());
113 DCHECK(state_
!= STATE_UNINITIALIZED
&& state_
!= STATE_INITIALIZING
)
115 // No two reads in the flight at any time.
116 DCHECK(read_cb_
.is_null());
117 // No read during resetting or stopping process.
118 DCHECK(reset_cb_
.is_null());
120 if (state_
== STATE_ERROR
) {
121 task_runner_
->PostTask(
122 FROM_HERE
, base::Bind(read_cb
, DECODE_ERROR
, scoped_refptr
<Output
>()));
126 if (state_
== STATE_END_OF_STREAM
&& ready_outputs_
.empty()) {
127 task_runner_
->PostTask(
128 FROM_HERE
, base::Bind(read_cb
, OK
, StreamTraits::CreateEOSOutput()));
132 if (!ready_outputs_
.empty()) {
133 task_runner_
->PostTask(FROM_HERE
,
134 base::Bind(read_cb
, OK
, ready_outputs_
.front()));
135 ready_outputs_
.pop_front();
140 if (state_
== STATE_NORMAL
&& CanDecodeMore())
141 ReadFromDemuxerStream();
144 template <DemuxerStream::Type StreamType
>
145 void DecoderStream
<StreamType
>::Reset(const base::Closure
& closure
) {
147 DCHECK(task_runner_
->BelongsToCurrentThread());
148 DCHECK_NE(state_
, STATE_UNINITIALIZED
);
149 DCHECK(reset_cb_
.is_null());
153 if (!read_cb_
.is_null()) {
154 task_runner_
->PostTask(FROM_HERE
, base::Bind(
155 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
158 ready_outputs_
.clear();
160 // During decoder reinitialization, the Decoder does not need to be and
161 // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
163 if (state_
== STATE_REINITIALIZING_DECODER
)
166 // During pending demuxer read and when not using DecryptingDemuxerStream,
167 // the Decoder will be reset after demuxer read is returned
168 // (in OnBufferReady()).
169 if (state_
== STATE_PENDING_DEMUXER_READ
&& !decrypting_demuxer_stream_
)
172 if (decrypting_demuxer_stream_
) {
173 decrypting_demuxer_stream_
->Reset(base::Bind(
174 &DecoderStream
<StreamType
>::ResetDecoder
, weak_factory_
.GetWeakPtr()));
181 template <DemuxerStream::Type StreamType
>
182 bool DecoderStream
<StreamType
>::CanReadWithoutStalling() const {
183 DCHECK(task_runner_
->BelongsToCurrentThread());
184 return !ready_outputs_
.empty() || decoder_
->CanReadWithoutStalling();
188 bool DecoderStream
<DemuxerStream::AUDIO
>::CanReadWithoutStalling() const {
189 DCHECK(task_runner_
->BelongsToCurrentThread());
193 template <DemuxerStream::Type StreamType
>
194 int DecoderStream
<StreamType
>::GetMaxDecodeRequests() const {
195 return decoder_
->GetMaxDecodeRequests();
199 int DecoderStream
<DemuxerStream::AUDIO
>::GetMaxDecodeRequests() const {
203 template <DemuxerStream::Type StreamType
>
204 bool DecoderStream
<StreamType
>::CanDecodeMore() const {
205 DCHECK(task_runner_
->BelongsToCurrentThread());
207 // Limit total number of outputs stored in |ready_outputs_| and being decoded.
208 // It only makes sense to saturate decoder completely when output queue is
211 static_cast<int>(ready_outputs_
.size()) + pending_decode_requests_
;
212 return !decoding_eos_
&& num_decodes
< GetMaxDecodeRequests();
215 template <DemuxerStream::Type StreamType
>
216 void DecoderStream
<StreamType
>::SelectDecoder(
217 const SetDecryptorReadyCB
& set_decryptor_ready_cb
) {
218 decoder_selector_
->SelectDecoder(
219 stream_
, set_decryptor_ready_cb
,
220 base::Bind(&DecoderStream
<StreamType
>::OnDecoderSelected
,
221 weak_factory_
.GetWeakPtr()),
222 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
223 weak_factory_
.GetWeakPtr()),
224 waiting_for_decryption_key_cb_
);
227 template <DemuxerStream::Type StreamType
>
228 void DecoderStream
<StreamType
>::OnDecoderSelected(
229 scoped_ptr
<Decoder
> selected_decoder
,
230 scoped_ptr
<DecryptingDemuxerStream
> decrypting_demuxer_stream
) {
231 FUNCTION_DVLOG(2) << ": "
232 << (selected_decoder
? selected_decoder
->GetDisplayName()
233 : "No decoder selected.");
234 DCHECK(task_runner_
->BelongsToCurrentThread());
235 DCHECK(state_
== STATE_INITIALIZING
|| state_
== STATE_REINITIALIZING_DECODER
)
237 if (state_
== STATE_INITIALIZING
) {
238 DCHECK(!init_cb_
.is_null());
239 DCHECK(read_cb_
.is_null());
240 DCHECK(reset_cb_
.is_null());
245 previous_decoder_
= decoder_
.Pass();
246 decoder_
= selected_decoder
.Pass();
247 if (decrypting_demuxer_stream
) {
248 decrypting_demuxer_stream_
= decrypting_demuxer_stream
.Pass();
249 stream_
= decrypting_demuxer_stream_
.get();
253 if (state_
== STATE_INITIALIZING
) {
254 state_
= STATE_UNINITIALIZED
;
255 MEDIA_LOG(ERROR
, media_log_
) << GetStreamTypeString()
256 << " decoder initialization failed";
257 base::ResetAndReturn(&init_cb_
).Run(false);
259 CompleteDecoderReinitialization(false);
264 media_log_
->SetBooleanProperty(GetStreamTypeString() + "_dds",
265 decrypting_demuxer_stream_
);
266 media_log_
->SetStringProperty(GetStreamTypeString() + "_decoder",
267 decoder_
->GetDisplayName());
269 if (state_
== STATE_REINITIALIZING_DECODER
) {
270 CompleteDecoderReinitialization(true);
274 // Initialization succeeded.
275 state_
= STATE_NORMAL
;
276 if (StreamTraits::NeedsBitstreamConversion(decoder_
.get()))
277 stream_
->EnableBitstreamConverter();
278 base::ResetAndReturn(&init_cb_
).Run(true);
281 template <DemuxerStream::Type StreamType
>
282 void DecoderStream
<StreamType
>::SatisfyRead(
284 const scoped_refptr
<Output
>& output
) {
285 DCHECK(!read_cb_
.is_null());
286 base::ResetAndReturn(&read_cb_
).Run(status
, output
);
289 template <DemuxerStream::Type StreamType
>
290 void DecoderStream
<StreamType
>::Decode(
291 const scoped_refptr
<DecoderBuffer
>& buffer
) {
293 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
) << state_
;
294 DCHECK_LT(pending_decode_requests_
, GetMaxDecodeRequests());
295 DCHECK(reset_cb_
.is_null());
296 DCHECK(buffer
.get());
298 int buffer_size
= buffer
->end_of_stream() ? 0 : buffer
->data_size();
300 TRACE_EVENT_ASYNC_BEGIN2(
301 "media", GetTraceString
<StreamType
>(), this, "key frame",
302 !buffer
->end_of_stream() && buffer
->is_key_frame(), "timestamp (ms)",
303 !buffer
->end_of_stream() ? buffer
->timestamp().InMilliseconds() : 0);
305 if (buffer
->end_of_stream())
306 decoding_eos_
= true;
308 ++pending_decode_requests_
;
309 decoder_
->Decode(buffer
,
310 base::Bind(&DecoderStream
<StreamType
>::OnDecodeDone
,
311 weak_factory_
.GetWeakPtr(),
313 buffer
->end_of_stream()));
316 template <DemuxerStream::Type StreamType
>
317 void DecoderStream
<StreamType
>::FlushDecoder() {
318 Decode(DecoderBuffer::CreateEOSBuffer());
321 template <DemuxerStream::Type StreamType
>
322 void DecoderStream
<StreamType
>::OnDecodeDone(int buffer_size
,
324 typename
Decoder::Status status
) {
325 FUNCTION_DVLOG(2) << ": " << status
;
326 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
327 state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
329 DCHECK_GT(pending_decode_requests_
, 0);
331 --pending_decode_requests_
;
333 TRACE_EVENT_ASYNC_END0("media", GetTraceString
<StreamType
>(), this);
336 DCHECK(!pending_decode_requests_
);
337 decoding_eos_
= false;
340 if (state_
== STATE_ERROR
) {
341 DCHECK(read_cb_
.is_null());
345 // Drop decoding result if Reset() was called during decoding.
346 // The resetting process will be handled when the decoder is reset.
347 if (!reset_cb_
.is_null())
351 case Decoder::kDecodeError
:
352 state_
= STATE_ERROR
;
353 MEDIA_LOG(ERROR
, media_log_
) << GetStreamTypeString() << " decode error";
354 ready_outputs_
.clear();
355 if (!read_cb_
.is_null())
356 SatisfyRead(DECODE_ERROR
, NULL
);
359 case Decoder::kAborted
:
360 // Decoder can return kAborted during Reset() or during destruction.
364 // Any successful decode counts!
366 StreamTraits::ReportStatistics(statistics_cb_
, buffer_size
);
368 if (state_
== STATE_NORMAL
) {
370 state_
= STATE_END_OF_STREAM
;
371 if (ready_outputs_
.empty() && !read_cb_
.is_null())
372 SatisfyRead(OK
, StreamTraits::CreateEOSOutput());
377 ReadFromDemuxerStream();
381 if (state_
== STATE_FLUSHING_DECODER
&& !pending_decode_requests_
)
382 ReinitializeDecoder();
387 template <DemuxerStream::Type StreamType
>
388 void DecoderStream
<StreamType
>::OnDecodeOutputReady(
389 const scoped_refptr
<Output
>& output
) {
390 FUNCTION_DVLOG(2) << ": " << output
->timestamp().InMilliseconds() << " ms";
391 DCHECK(output
.get());
392 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
393 state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
396 if (state_
== STATE_ERROR
) {
397 DCHECK(read_cb_
.is_null());
401 // Drop decoding result if Reset() was called during decoding.
402 // The resetting process will be handled when the decoder is reset.
403 if (!reset_cb_
.is_null())
406 if (!read_cb_
.is_null()) {
407 // If |ready_outputs_| was non-empty, the read would have already been
408 // satisifed by Read().
409 DCHECK(ready_outputs_
.empty());
410 SatisfyRead(OK
, output
);
414 // Store decoded output.
415 ready_outputs_
.push_back(output
);
418 template <DemuxerStream::Type StreamType
>
419 void DecoderStream
<StreamType
>::ReadFromDemuxerStream() {
421 DCHECK_EQ(state_
, STATE_NORMAL
);
422 DCHECK(CanDecodeMore());
423 DCHECK(reset_cb_
.is_null());
425 state_
= STATE_PENDING_DEMUXER_READ
;
426 stream_
->Read(base::Bind(&DecoderStream
<StreamType
>::OnBufferReady
,
427 weak_factory_
.GetWeakPtr()));
430 template <DemuxerStream::Type StreamType
>
431 void DecoderStream
<StreamType
>::OnBufferReady(
432 DemuxerStream::Status status
,
433 const scoped_refptr
<DecoderBuffer
>& buffer
) {
434 FUNCTION_DVLOG(2) << ": " << status
<< ", "
435 << (buffer
.get() ? buffer
->AsHumanReadableString()
438 DCHECK(task_runner_
->BelongsToCurrentThread());
439 DCHECK(state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
441 DCHECK_EQ(buffer
.get() != NULL
, status
== DemuxerStream::kOk
) << status
;
443 // Decoding has been stopped (e.g due to an error).
444 if (state_
!= STATE_PENDING_DEMUXER_READ
) {
445 DCHECK(state_
== STATE_ERROR
);
446 DCHECK(read_cb_
.is_null());
450 state_
= STATE_NORMAL
;
452 if (status
== DemuxerStream::kConfigChanged
) {
453 FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
454 DCHECK(stream_
->SupportsConfigChanges());
456 if (!config_change_observer_cb_
.is_null())
457 config_change_observer_cb_
.Run();
459 state_
= STATE_FLUSHING_DECODER
;
460 if (!reset_cb_
.is_null()) {
461 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
462 // which will continue the resetting process in it's callback.
463 if (!decrypting_demuxer_stream_
)
464 Reset(base::ResetAndReturn(&reset_cb_
));
465 // Reinitialization will continue after Reset() is done.
472 if (!reset_cb_
.is_null()) {
473 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
474 // which will continue the resetting process in it's callback.
475 if (!decrypting_demuxer_stream_
)
476 Reset(base::ResetAndReturn(&reset_cb_
));
480 if (status
== DemuxerStream::kAborted
) {
481 if (!read_cb_
.is_null())
482 SatisfyRead(DEMUXER_READ_ABORTED
, NULL
);
486 if (!splice_observer_cb_
.is_null() && !buffer
->end_of_stream()) {
487 const bool has_splice_ts
= buffer
->splice_timestamp() != kNoTimestamp();
488 if (active_splice_
|| has_splice_ts
) {
489 splice_observer_cb_
.Run(buffer
->splice_timestamp());
490 active_splice_
= has_splice_ts
;
494 DCHECK(status
== DemuxerStream::kOk
) << status
;
497 // Read more data if the decoder supports multiple parallel decoding requests.
499 ReadFromDemuxerStream();
502 template <DemuxerStream::Type StreamType
>
503 void DecoderStream
<StreamType
>::ReinitializeDecoder() {
505 DCHECK(task_runner_
->BelongsToCurrentThread());
506 DCHECK_EQ(state_
, STATE_FLUSHING_DECODER
);
507 DCHECK_EQ(pending_decode_requests_
, 0);
509 state_
= STATE_REINITIALIZING_DECODER
;
510 DecoderStreamTraits
<StreamType
>::InitializeDecoder(
511 decoder_
.get(), stream_
,
512 base::Bind(&DecoderStream
<StreamType
>::OnDecoderReinitialized
,
513 weak_factory_
.GetWeakPtr()),
514 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
515 weak_factory_
.GetWeakPtr()));
518 template <DemuxerStream::Type StreamType
>
519 void DecoderStream
<StreamType
>::OnDecoderReinitialized(bool success
) {
521 DCHECK(task_runner_
->BelongsToCurrentThread());
522 DCHECK_EQ(state_
, STATE_REINITIALIZING_DECODER
);
524 // ReinitializeDecoder() can be called in two cases:
525 // 1, Flushing decoder finished (see OnDecodeOutputReady()).
526 // 2, Reset() was called during flushing decoder (see OnDecoderReset()).
527 // Also, Reset() can be called during pending ReinitializeDecoder().
528 // This function needs to handle them all!
531 // Reinitialization failed. Try to fall back to one of the remaining
532 // decoders. This will consume at least one decoder so doing it more than
534 // For simplicity, don't attempt to fall back to a decryptor. Calling this
535 // with a null callback ensures that one won't be selected.
536 SelectDecoder(SetDecryptorReadyCB());
538 CompleteDecoderReinitialization(true);
542 template <DemuxerStream::Type StreamType
>
543 void DecoderStream
<StreamType
>::CompleteDecoderReinitialization(bool success
) {
545 DCHECK(task_runner_
->BelongsToCurrentThread());
546 DCHECK_EQ(state_
, STATE_REINITIALIZING_DECODER
);
548 state_
= success
? STATE_NORMAL
: STATE_ERROR
;
550 if (!reset_cb_
.is_null()) {
551 base::ResetAndReturn(&reset_cb_
).Run();
555 if (read_cb_
.is_null())
558 if (state_
== STATE_ERROR
) {
559 MEDIA_LOG(ERROR
, media_log_
) << GetStreamTypeString()
560 << " decoder reinitialization failed";
561 SatisfyRead(DECODE_ERROR
, NULL
);
565 ReadFromDemuxerStream();
568 template <DemuxerStream::Type StreamType
>
569 void DecoderStream
<StreamType
>::ResetDecoder() {
571 DCHECK(task_runner_
->BelongsToCurrentThread());
572 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
573 state_
== STATE_ERROR
|| state_
== STATE_END_OF_STREAM
) << state_
;
574 DCHECK(!reset_cb_
.is_null());
576 decoder_
->Reset(base::Bind(&DecoderStream
<StreamType
>::OnDecoderReset
,
577 weak_factory_
.GetWeakPtr()));
580 template <DemuxerStream::Type StreamType
>
581 void DecoderStream
<StreamType
>::OnDecoderReset() {
583 DCHECK(task_runner_
->BelongsToCurrentThread());
584 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
585 state_
== STATE_ERROR
|| state_
== STATE_END_OF_STREAM
) << state_
;
586 // If Reset() was called during pending read, read callback should be fired
587 // before the reset callback is fired.
588 DCHECK(read_cb_
.is_null());
589 DCHECK(!reset_cb_
.is_null());
591 if (state_
!= STATE_FLUSHING_DECODER
) {
592 state_
= STATE_NORMAL
;
593 active_splice_
= false;
594 base::ResetAndReturn(&reset_cb_
).Run();
598 // The resetting process will be continued in OnDecoderReinitialized().
599 ReinitializeDecoder();
602 template class DecoderStream
<DemuxerStream::VIDEO
>;
603 template class DecoderStream
<DemuxerStream::AUDIO
>;