This source file includes following definitions.
- read_task_runner_
- ReadMessage
- NotifyEof
- weak_factory_
- Start
- InvokeMessageCallback
- InvokeEofCallback
#include "remoting/host/native_messaging/native_messaging_reader.h"
#include <string>
#include "base/bind.h"
#include "base/json/json_reader.h"
#include "base/location.h"
#include "base/sequenced_task_runner.h"
#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
#include "base/thread_task_runner_handle.h"
#include "base/threading/sequenced_worker_pool.h"
#include "base/values.h"
#include "net/base/file_stream.h"
namespace {
typedef uint32 MessageLengthType;
const int kMessageHeaderSize = sizeof(MessageLengthType);
const MessageLengthType kMaximumMessageSize = 1024 * 1024;
}
namespace remoting {
class NativeMessagingReader::Core {
public:
Core(base::PlatformFile handle,
scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner,
scoped_refptr<base::SequencedTaskRunner> read_task_runner,
base::WeakPtr<NativeMessagingReader> reader_);
~Core();
void ReadMessage();
private:
void NotifyEof();
net::FileStream read_stream_;
base::WeakPtr<NativeMessagingReader> reader_;
scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
scoped_refptr<base::SequencedTaskRunner> read_task_runner_;
DISALLOW_COPY_AND_ASSIGN(Core);
};
NativeMessagingReader::Core::Core(
base::PlatformFile handle,
scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner,
scoped_refptr<base::SequencedTaskRunner> read_task_runner,
base::WeakPtr<NativeMessagingReader> reader)
: read_stream_(handle, base::PLATFORM_FILE_READ, NULL),
reader_(reader),
caller_task_runner_(caller_task_runner),
read_task_runner_(read_task_runner) {
}
NativeMessagingReader::Core::~Core() {}
void NativeMessagingReader::Core::ReadMessage() {
DCHECK(read_task_runner_->RunsTasksOnCurrentThread());
while (true) {
MessageLengthType message_length;
int read_result = read_stream_.ReadUntilComplete(
reinterpret_cast<char*>(&message_length), kMessageHeaderSize);
if (read_result != kMessageHeaderSize) {
if (read_result != 0) {
LOG(ERROR) << "Failed to read message header, read returned "
<< read_result;
}
NotifyEof();
return;
}
if (message_length > kMaximumMessageSize) {
LOG(ERROR) << "Message size too large: " << message_length;
NotifyEof();
return;
}
std::string message_json(message_length, '\0');
read_result = read_stream_.ReadUntilComplete(string_as_array(&message_json),
message_length);
if (read_result != static_cast<int>(message_length)) {
LOG(ERROR) << "Failed to read message body, read returned "
<< read_result;
NotifyEof();
return;
}
scoped_ptr<base::Value> message(base::JSONReader::Read(message_json));
if (!message) {
LOG(ERROR) << "Failed to parse JSON message: " << message;
NotifyEof();
return;
}
caller_task_runner_->PostTask(
FROM_HERE, base::Bind(&NativeMessagingReader::InvokeMessageCallback,
reader_, base::Passed(&message)));
}
}
void NativeMessagingReader::Core::NotifyEof() {
DCHECK(read_task_runner_->RunsTasksOnCurrentThread());
caller_task_runner_->PostTask(
FROM_HERE,
base::Bind(&NativeMessagingReader::InvokeEofCallback, reader_));
}
NativeMessagingReader::NativeMessagingReader(base::PlatformFile handle)
: reader_thread_("Reader"),
weak_factory_(this) {
reader_thread_.Start();
read_task_runner_ = reader_thread_.message_loop_proxy();
core_.reset(new Core(handle, base::ThreadTaskRunnerHandle::Get(),
read_task_runner_, weak_factory_.GetWeakPtr()));
}
NativeMessagingReader::~NativeMessagingReader() {
read_task_runner_->DeleteSoon(FROM_HERE, core_.release());
}
void NativeMessagingReader::Start(MessageCallback message_callback,
base::Closure eof_callback) {
message_callback_ = message_callback;
eof_callback_ = eof_callback;
read_task_runner_->PostTask(
FROM_HERE, base::Bind(&NativeMessagingReader::Core::ReadMessage,
base::Unretained(core_.get())));
}
void NativeMessagingReader::InvokeMessageCallback(
scoped_ptr<base::Value> message) {
message_callback_.Run(message.Pass());
}
void NativeMessagingReader::InvokeEofCallback() {
eof_callback_.Run();
}
}