This source file includes following definitions.
- weak_factory_
- Initialize
- Read
- Reset
- Stop
- CanReadWithoutStalling
- CanReadWithoutStalling
- OnDecoderSelected
- SatisfyRead
- AbortRead
- FlushDecoder
- OnDecodeOutputReady
- ReadFromDemuxerStream
- OnBufferReady
- ReinitializeDecoder
- OnDecoderReinitialized
- ResetDecoder
- OnDecoderReset
- StopDecoder
#include "media/filters/decoder_stream.h"
#include "base/bind.h"
#include "base/callback_helpers.h"
#include "base/debug/trace_event.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/single_thread_task_runner.h"
#include "media/base/audio_decoder.h"
#include "media/base/bind_to_current_loop.h"
#include "media/base/decoder_buffer.h"
#include "media/base/demuxer_stream.h"
#include "media/base/video_decoder.h"
#include "media/filters/decrypting_demuxer_stream.h"
namespace media {
template <DemuxerStream::Type StreamType>
static const char* GetTraceString();
#define FUNCTION_DVLOG(level) \
DVLOG(level) << __FUNCTION__ << \
"<" << DecoderStreamTraits<StreamType>::ToString() << ">"
template <>
const char* GetTraceString<DemuxerStream::VIDEO>() {
return "DecoderStream<VIDEO>::Decode";
}
template <>
const char* GetTraceString<DemuxerStream::AUDIO>() {
return "DecoderStream<AUDIO>::Decode";
}
template <DemuxerStream::Type StreamType>
DecoderStream<StreamType>::DecoderStream(
const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
ScopedVector<Decoder> decoders,
const SetDecryptorReadyCB& set_decryptor_ready_cb)
: task_runner_(task_runner),
state_(STATE_UNINITIALIZED),
stream_(NULL),
decoder_selector_(
new DecoderSelector<StreamType>(task_runner,
decoders.Pass(),
set_decryptor_ready_cb)),
weak_factory_(this) {}
template <DemuxerStream::Type StreamType>
DecoderStream<StreamType>::~DecoderStream() {
DCHECK(state_ == STATE_UNINITIALIZED || state_ == STATE_STOPPED) << state_;
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::Initialize(DemuxerStream* stream,
const StatisticsCB& statistics_cb,
const InitCB& init_cb) {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK_EQ(state_, STATE_UNINITIALIZED) << state_;
DCHECK(init_cb_.is_null());
DCHECK(!init_cb.is_null());
statistics_cb_ = statistics_cb;
init_cb_ = init_cb;
stream_ = stream;
state_ = STATE_INITIALIZING;
decoder_selector_->SelectDecoder(
stream,
base::Bind(&DecoderStream<StreamType>::OnDecoderSelected,
weak_factory_.GetWeakPtr()));
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::Read(const ReadCB& read_cb) {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
state_ == STATE_ERROR) << state_;
DCHECK(read_cb_.is_null());
DCHECK(reset_cb_.is_null());
DCHECK(stop_cb_.is_null());
if (state_ == STATE_ERROR) {
task_runner_->PostTask(FROM_HERE, base::Bind(
read_cb, DECODE_ERROR, scoped_refptr<Output>()));
return;
}
read_cb_ = read_cb;
if (state_ == STATE_FLUSHING_DECODER) {
FlushDecoder();
return;
}
scoped_refptr<Output> output = decoder_->GetDecodeOutput();
if (output) {
task_runner_->PostTask(
FROM_HERE, base::Bind(base::ResetAndReturn(&read_cb_), OK, output));
return;
}
ReadFromDemuxerStream();
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::Reset(const base::Closure& closure) {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_STOPPED) << state_;
DCHECK(reset_cb_.is_null());
DCHECK(stop_cb_.is_null());
reset_cb_ = closure;
if (state_ == STATE_REINITIALIZING_DECODER)
return;
if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_)
return;
if (decrypting_demuxer_stream_) {
decrypting_demuxer_stream_->Reset(base::Bind(
&DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr()));
return;
}
ResetDecoder();
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::Stop(const base::Closure& closure) {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK_NE(state_, STATE_STOPPED) << state_;
DCHECK(stop_cb_.is_null());
stop_cb_ = closure;
if (state_ == STATE_INITIALIZING) {
decoder_selector_->Abort();
return;
}
DCHECK(init_cb_.is_null());
weak_factory_.InvalidateWeakPtrs();
if (!read_cb_.is_null())
task_runner_->PostTask(FROM_HERE, base::Bind(
base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
if (!reset_cb_.is_null())
task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_));
if (decrypting_demuxer_stream_) {
decrypting_demuxer_stream_->Stop(base::Bind(
&DecoderStream<StreamType>::StopDecoder, weak_factory_.GetWeakPtr()));
return;
}
if (decoder_) {
StopDecoder();
return;
}
state_ = STATE_STOPPED;
stream_ = NULL;
decoder_.reset();
decrypting_demuxer_stream_.reset();
task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&stop_cb_));
}
template <DemuxerStream::Type StreamType>
bool DecoderStream<StreamType>::CanReadWithoutStalling() const {
DCHECK(task_runner_->BelongsToCurrentThread());
return decoder_->CanReadWithoutStalling();
}
template <>
bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const {
DCHECK(task_runner_->BelongsToCurrentThread());
return true;
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::OnDecoderSelected(
scoped_ptr<Decoder> selected_decoder,
scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK_EQ(state_, STATE_INITIALIZING) << state_;
DCHECK(!init_cb_.is_null());
DCHECK(read_cb_.is_null());
DCHECK(reset_cb_.is_null());
decoder_selector_.reset();
if (decrypting_demuxer_stream)
stream_ = decrypting_demuxer_stream.get();
if (!selected_decoder) {
state_ = STATE_UNINITIALIZED;
StreamTraits::FinishInitialization(
base::ResetAndReturn(&init_cb_), selected_decoder.get(), stream_);
} else {
state_ = STATE_NORMAL;
decoder_ = selected_decoder.Pass();
decrypting_demuxer_stream_ = decrypting_demuxer_stream.Pass();
StreamTraits::FinishInitialization(
base::ResetAndReturn(&init_cb_), decoder_.get(), stream_);
}
if (!stop_cb_.is_null()) {
Stop(base::ResetAndReturn(&stop_cb_));
return;
}
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::SatisfyRead(
Status status,
const scoped_refptr<Output>& output) {
DCHECK(!read_cb_.is_null());
base::ResetAndReturn(&read_cb_).Run(status, output);
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::AbortRead() {
DCHECK(!reset_cb_.is_null());
SatisfyRead(ABORTED, NULL);
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::Decode(
const scoped_refptr<DecoderBuffer>& buffer) {
FUNCTION_DVLOG(2);
DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_;
DCHECK(!read_cb_.is_null());
DCHECK(reset_cb_.is_null());
DCHECK(stop_cb_.is_null());
DCHECK(buffer);
int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size();
TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this);
decoder_->Decode(buffer,
base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady,
weak_factory_.GetWeakPtr(),
buffer_size));
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::FlushDecoder() {
Decode(DecoderBuffer::CreateEOSBuffer());
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::OnDecodeOutputReady(
int buffer_size,
typename Decoder::Status status,
const scoped_refptr<Output>& output) {
FUNCTION_DVLOG(2);
DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_;
DCHECK(!read_cb_.is_null());
DCHECK(stop_cb_.is_null());
DCHECK_EQ(status == Decoder::kOk, output != NULL);
TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this);
if (status == Decoder::kDecodeError) {
state_ = STATE_ERROR;
SatisfyRead(DECODE_ERROR, NULL);
return;
}
if (status == Decoder::kDecryptError) {
state_ = STATE_ERROR;
SatisfyRead(DECRYPT_ERROR, NULL);
return;
}
if (status == Decoder::kAborted) {
SatisfyRead(ABORTED, NULL);
return;
}
if (buffer_size > 0) {
StreamTraits::ReportStatistics(statistics_cb_, buffer_size);
}
if (!reset_cb_.is_null()) {
AbortRead();
return;
}
if (state_ == STATE_FLUSHING_DECODER &&
status == Decoder::kOk && output->end_of_stream()) {
ReinitializeDecoder();
return;
}
if (status == Decoder::kNotEnoughData) {
if (state_ == STATE_NORMAL)
ReadFromDemuxerStream();
else if (state_ == STATE_FLUSHING_DECODER)
FlushDecoder();
return;
}
DCHECK(output);
SatisfyRead(OK, output);
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::ReadFromDemuxerStream() {
FUNCTION_DVLOG(2);
DCHECK_EQ(state_, STATE_NORMAL) << state_;
DCHECK(!read_cb_.is_null());
DCHECK(reset_cb_.is_null());
DCHECK(stop_cb_.is_null());
state_ = STATE_PENDING_DEMUXER_READ;
stream_->Read(base::Bind(&DecoderStream<StreamType>::OnBufferReady,
weak_factory_.GetWeakPtr()));
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::OnBufferReady(
DemuxerStream::Status status,
const scoped_refptr<DecoderBuffer>& buffer) {
FUNCTION_DVLOG(2) << ": " << status;
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK_EQ(state_, STATE_PENDING_DEMUXER_READ) << state_;
DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status;
DCHECK(!read_cb_.is_null());
DCHECK(stop_cb_.is_null());
state_ = STATE_NORMAL;
if (status == DemuxerStream::kConfigChanged) {
FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
DCHECK(stream_->SupportsConfigChanges());
if (!config_change_observer_cb_.is_null())
config_change_observer_cb_.Run();
state_ = STATE_FLUSHING_DECODER;
if (!reset_cb_.is_null()) {
AbortRead();
if (!decrypting_demuxer_stream_)
Reset(base::ResetAndReturn(&reset_cb_));
} else {
FlushDecoder();
}
return;
}
if (!reset_cb_.is_null()) {
AbortRead();
if (!decrypting_demuxer_stream_)
Reset(base::ResetAndReturn(&reset_cb_));
return;
}
if (status == DemuxerStream::kAborted) {
SatisfyRead(DEMUXER_READ_ABORTED, NULL);
return;
}
if (!splice_observer_cb_.is_null() && !buffer->end_of_stream() &&
buffer->splice_timestamp() != kNoTimestamp()) {
splice_observer_cb_.Run(buffer->splice_timestamp());
}
DCHECK(status == DemuxerStream::kOk) << status;
Decode(buffer);
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::ReinitializeDecoder() {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK_EQ(state_, STATE_FLUSHING_DECODER) << state_;
DCHECK(StreamTraits::GetDecoderConfig(*stream_).IsValidConfig());
state_ = STATE_REINITIALIZING_DECODER;
decoder_->Initialize(
StreamTraits::GetDecoderConfig(*stream_),
base::Bind(&DecoderStream<StreamType>::OnDecoderReinitialized,
weak_factory_.GetWeakPtr()));
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK_EQ(state_, STATE_REINITIALIZING_DECODER) << state_;
DCHECK(stop_cb_.is_null());
state_ = (status == PIPELINE_OK) ? STATE_NORMAL : STATE_ERROR;
if (!reset_cb_.is_null()) {
if (!read_cb_.is_null())
AbortRead();
base::ResetAndReturn(&reset_cb_).Run();
}
if (read_cb_.is_null())
return;
if (state_ == STATE_ERROR) {
SatisfyRead(DECODE_ERROR, NULL);
return;
}
ReadFromDemuxerStream();
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::ResetDecoder() {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
state_ == STATE_ERROR) << state_;
DCHECK(!reset_cb_.is_null());
decoder_->Reset(base::Bind(&DecoderStream<StreamType>::OnDecoderReset,
weak_factory_.GetWeakPtr()));
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::OnDecoderReset() {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
state_ == STATE_ERROR) << state_;
DCHECK(read_cb_.is_null());
DCHECK(!reset_cb_.is_null());
DCHECK(stop_cb_.is_null());
if (state_ != STATE_FLUSHING_DECODER) {
base::ResetAndReturn(&reset_cb_).Run();
return;
}
ReinitializeDecoder();
}
template <DemuxerStream::Type StreamType>
void DecoderStream<StreamType>::StopDecoder() {
FUNCTION_DVLOG(2);
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_STOPPED) << state_;
DCHECK(!stop_cb_.is_null());
state_ = STATE_STOPPED;
decoder_->Stop();
stream_ = NULL;
decoder_.reset();
decrypting_demuxer_stream_.reset();
task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&stop_cb_));
}
template class DecoderStream<DemuxerStream::VIDEO>;
template class DecoderStream<DemuxerStream::AUDIO>;
}