This source file includes following definitions.
- Create
- StartOnAudioThread
- observers_
- AddObserver
- RemoveObserver
- OnFileCanReadWithoutBlocking
- OnFileCanWriteWithoutBlocking
- StartTimer
- DoCapture
- WaitForPipeReadable
- Destruct
#include "remoting/host/linux/audio_pipe_reader.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "base/files/file_path.h"
#include "base/logging.h"
#include "base/posix/eintr_wrapper.h"
#include "base/stl_util.h"
namespace remoting {
namespace {
const int kSamplesPerSecond = 48000;
const int kChannels = 2;
const int kBytesPerSample = 2;
const int kSampleBytesPerSecond =
kSamplesPerSecond * kChannels * kBytesPerSample;
const int kCapturingPeriodMs = 40;
const int kPipeBufferSizeMs = kCapturingPeriodMs * 2;
const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond /
base::Time::kMillisecondsPerSecond;
#if !defined(F_SETPIPE_SZ)
#define F_SETPIPE_SZ 1031
#endif
}
scoped_refptr<AudioPipeReader> AudioPipeReader::Create(
scoped_refptr<base::SingleThreadTaskRunner> task_runner,
const base::FilePath& pipe_name) {
scoped_refptr<AudioPipeReader> pipe_reader =
new AudioPipeReader(task_runner);
task_runner->PostTask(FROM_HERE, base::Bind(
&AudioPipeReader::StartOnAudioThread, pipe_reader, pipe_name));
return pipe_reader;
}
void AudioPipeReader::StartOnAudioThread(const base::FilePath& pipe_name) {
DCHECK(task_runner_->BelongsToCurrentThread());
pipe_fd_ = HANDLE_EINTR(open(
pipe_name.value().c_str(), O_RDONLY | O_NONBLOCK));
if (pipe_fd_ < 0) {
LOG(ERROR) << "Failed to open " << pipe_name.value();
return;
}
int result = HANDLE_EINTR(
fcntl(pipe_fd_, F_SETPIPE_SZ, kPipeBufferSizeBytes));
if (result < 0) {
PLOG(ERROR) << "fcntl";
}
WaitForPipeReadable();
}
AudioPipeReader::AudioPipeReader(
scoped_refptr<base::SingleThreadTaskRunner> task_runner)
: task_runner_(task_runner),
observers_(new ObserverListThreadSafe<StreamObserver>()) {
}
AudioPipeReader::~AudioPipeReader() {
}
void AudioPipeReader::AddObserver(StreamObserver* observer) {
observers_->AddObserver(observer);
}
void AudioPipeReader::RemoveObserver(StreamObserver* observer) {
observers_->RemoveObserver(observer);
}
void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) {
DCHECK_EQ(fd, pipe_fd_);
StartTimer();
}
void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) {
NOTREACHED();
}
void AudioPipeReader::StartTimer() {
DCHECK(task_runner_->BelongsToCurrentThread());
started_time_ = base::TimeTicks::Now();
last_capture_position_ = 0;
timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs),
this, &AudioPipeReader::DoCapture);
}
void AudioPipeReader::DoCapture() {
DCHECK(task_runner_->BelongsToCurrentThread());
DCHECK_GT(pipe_fd_, 0);
base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_;
int64 stream_position_bytes = stream_position.InMilliseconds() *
kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond;
int64 bytes_to_read = stream_position_bytes - last_capture_position_;
std::string data = left_over_bytes_;
size_t pos = data.size();
left_over_bytes_.clear();
data.resize(pos + bytes_to_read);
while (pos < data.size()) {
int read_result = HANDLE_EINTR(
read(pipe_fd_, string_as_array(&data) + pos, data.size() - pos));
if (read_result > 0) {
pos += read_result;
} else {
if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
PLOG(ERROR) << "read";
break;
}
}
if (pos == 0) {
WaitForPipeReadable();
return;
}
int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample);
left_over_bytes_.assign(data, pos - incomplete_samples_bytes,
incomplete_samples_bytes);
data.resize(pos - incomplete_samples_bytes);
last_capture_position_ += data.size();
if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes)
last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes;
DCHECK_LE(last_capture_position_, stream_position_bytes);
scoped_refptr<base::RefCountedString> data_ref =
base::RefCountedString::TakeString(&data);
observers_->Notify(&StreamObserver::OnDataRead, data_ref);
}
void AudioPipeReader::WaitForPipeReadable() {
timer_.Stop();
base::MessageLoopForIO::current()->WatchFileDescriptor(
pipe_fd_,
false,
base::MessageLoopForIO::WATCH_READ,
&file_descriptor_watcher_,
this);
}
void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) {
audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader);
}
}