This source file includes following definitions.
- ReadDataOnReaderThread
- main_message_loop_
- weak_ptr_factory_
- peer_pid
- Connect
- Close
- Send
- DidRecvMsg
- ReadDidFail
- CreatePipe
- ProcessOutgoingMessages
- CallOnChannelConnected
- ReadData
- WillDispatchInputMessage
- DidEmptyInputBuffers
- HandleInternalMessage
- Connect
- Close
- peer_pid
- Send
#include "ipc/ipc_channel_nacl.h"
#include <errno.h>
#include <stddef.h>
#include <sys/types.h>
#include <algorithm>
#include "base/bind.h"
#include "base/logging.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/synchronization/lock.h"
#include "base/task_runner_util.h"
#include "base/threading/simple_thread.h"
#include "ipc/file_descriptor_set_posix.h"
#include "ipc/ipc_listener.h"
#include "ipc/ipc_logging.h"
#include "native_client/src/public/imc_syscalls.h"
#include "native_client/src/public/imc_types.h"
namespace IPC {
struct MessageContents {
std::vector<char> data;
std::vector<int> fds;
};
namespace {
bool ReadDataOnReaderThread(int pipe, MessageContents* contents) {
DCHECK(pipe >= 0);
if (pipe < 0)
return false;
contents->data.resize(Channel::kReadBufferSize);
contents->fds.resize(FileDescriptorSet::kMaxDescriptorsPerMessage);
NaClAbiNaClImcMsgIoVec iov = { &contents->data[0], contents->data.size() };
NaClAbiNaClImcMsgHdr msg = {
&iov, 1, &contents->fds[0], contents->fds.size()
};
int bytes_read = imc_recvmsg(pipe, &msg, 0);
if (bytes_read <= 0) {
contents->data.clear();
contents->fds.clear();
return false;
}
DCHECK(bytes_read);
contents->data.resize(bytes_read);
contents->fds.resize(msg.desc_length);
return true;
}
}
class Channel::ChannelImpl::ReaderThreadRunner
: public base::DelegateSimpleThread::Delegate {
public:
ReaderThreadRunner(
int pipe,
base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback,
base::Callback<void ()> failure_callback,
scoped_refptr<base::MessageLoopProxy> main_message_loop);
virtual void Run() OVERRIDE;
private:
int pipe_;
base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback_;
base::Callback<void ()> failure_callback_;
scoped_refptr<base::MessageLoopProxy> main_message_loop_;
DISALLOW_COPY_AND_ASSIGN(ReaderThreadRunner);
};
Channel::ChannelImpl::ReaderThreadRunner::ReaderThreadRunner(
int pipe,
base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback,
base::Callback<void ()> failure_callback,
scoped_refptr<base::MessageLoopProxy> main_message_loop)
: pipe_(pipe),
data_read_callback_(data_read_callback),
failure_callback_(failure_callback),
main_message_loop_(main_message_loop) {
}
void Channel::ChannelImpl::ReaderThreadRunner::Run() {
while (true) {
scoped_ptr<MessageContents> msg_contents(new MessageContents);
bool success = ReadDataOnReaderThread(pipe_, msg_contents.get());
if (success) {
main_message_loop_->PostTask(FROM_HERE,
base::Bind(data_read_callback_, base::Passed(&msg_contents)));
} else {
main_message_loop_->PostTask(FROM_HERE, failure_callback_);
return;
}
}
}
Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle,
Mode mode,
Listener* listener)
: ChannelReader(listener),
mode_(mode),
waiting_connect_(true),
pipe_(-1),
pipe_name_(channel_handle.name),
weak_ptr_factory_(this) {
if (!CreatePipe(channel_handle)) {
const char *modestr = (mode_ & MODE_SERVER_FLAG) ? "server" : "client";
LOG(WARNING) << "Unable to create pipe named \"" << channel_handle.name
<< "\" in " << modestr << " mode";
}
}
Channel::ChannelImpl::~ChannelImpl() {
Close();
}
base::ProcessId Channel::ChannelImpl::peer_pid() const {
return -1;
}
bool Channel::ChannelImpl::Connect() {
if (pipe_ == -1) {
DLOG(WARNING) << "Channel creation failed: " << pipe_name_;
return false;
}
reader_thread_runner_.reset(
new ReaderThreadRunner(
pipe_,
base::Bind(&Channel::ChannelImpl::DidRecvMsg,
weak_ptr_factory_.GetWeakPtr()),
base::Bind(&Channel::ChannelImpl::ReadDidFail,
weak_ptr_factory_.GetWeakPtr()),
base::MessageLoopProxy::current()));
reader_thread_.reset(
new base::DelegateSimpleThread(reader_thread_runner_.get(),
"ipc_channel_nacl reader thread"));
reader_thread_->Start();
waiting_connect_ = false;
ProcessOutgoingMessages();
base::MessageLoopProxy::current()->PostTask(FROM_HERE,
base::Bind(&Channel::ChannelImpl::CallOnChannelConnected,
weak_ptr_factory_.GetWeakPtr()));
return true;
}
void Channel::ChannelImpl::Close() {
reader_thread_->Join();
close(pipe_);
pipe_ = -1;
reader_thread_runner_.reset();
reader_thread_.reset();
read_queue_.clear();
output_queue_.clear();
}
bool Channel::ChannelImpl::Send(Message* message) {
DVLOG(2) << "sending message @" << message << " on channel @" << this
<< " with type " << message->type();
scoped_ptr<Message> message_ptr(message);
#ifdef IPC_MESSAGE_LOG_ENABLED
Logging::GetInstance()->OnSendMessage(message_ptr.get(), "");
#endif
message->TraceMessageBegin();
output_queue_.push_back(linked_ptr<Message>(message_ptr.release()));
if (!waiting_connect_)
return ProcessOutgoingMessages();
return true;
}
void Channel::ChannelImpl::DidRecvMsg(scoped_ptr<MessageContents> contents) {
if (pipe_ == -1)
return;
linked_ptr<std::vector<char> > data(new std::vector<char>);
data->swap(contents->data);
read_queue_.push_back(data);
input_fds_.insert(input_fds_.end(),
contents->fds.begin(), contents->fds.end());
contents->fds.clear();
ProcessIncomingMessages();
}
void Channel::ChannelImpl::ReadDidFail() {
Close();
}
bool Channel::ChannelImpl::CreatePipe(
const IPC::ChannelHandle& channel_handle) {
DCHECK(pipe_ == -1);
if (channel_handle.socket.fd == -1) {
NOTIMPLEMENTED();
return false;
}
pipe_ = channel_handle.socket.fd;
return true;
}
bool Channel::ChannelImpl::ProcessOutgoingMessages() {
DCHECK(!waiting_connect_);
if (output_queue_.empty())
return true;
if (pipe_ == -1)
return false;
while (!output_queue_.empty()) {
linked_ptr<Message> msg = output_queue_.front();
output_queue_.pop_front();
int fds[FileDescriptorSet::kMaxDescriptorsPerMessage];
const size_t num_fds = msg->file_descriptor_set()->size();
DCHECK(num_fds <= FileDescriptorSet::kMaxDescriptorsPerMessage);
msg->file_descriptor_set()->GetDescriptors(fds);
NaClAbiNaClImcMsgIoVec iov = {
const_cast<void*>(msg->data()), msg->size()
};
NaClAbiNaClImcMsgHdr msgh = { &iov, 1, fds, num_fds };
ssize_t bytes_written = imc_sendmsg(pipe_, &msgh, 0);
DCHECK(bytes_written);
if (bytes_written < 0) {
DCHECK(errno == EPIPE);
Close();
PLOG(ERROR) << "pipe_ error on "
<< pipe_
<< " Currently writing message of size: "
<< msg->size();
return false;
} else {
msg->file_descriptor_set()->CommitAll();
}
DVLOG(2) << "sent message @" << msg.get() << " with type " << msg->type()
<< " on fd " << pipe_;
}
return true;
}
void Channel::ChannelImpl::CallOnChannelConnected() {
listener()->OnChannelConnected(peer_pid());
}
Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
char* buffer,
int buffer_len,
int* bytes_read) {
*bytes_read = 0;
if (pipe_ == -1)
return READ_FAILED;
if (read_queue_.empty())
return READ_PENDING;
while (!read_queue_.empty() && *bytes_read < buffer_len) {
linked_ptr<std::vector<char> > vec(read_queue_.front());
size_t bytes_to_read = buffer_len - *bytes_read;
if (vec->size() <= bytes_to_read) {
std::copy(vec->begin(), vec->end(), buffer + *bytes_read);
*bytes_read += vec->size();
read_queue_.pop_front();
} else {
std::copy(vec->begin(), vec->begin() + bytes_to_read,
buffer + *bytes_read);
vec->erase(vec->begin(), vec->begin() + bytes_to_read);
*bytes_read += bytes_to_read;
}
}
return READ_SUCCEEDED;
}
bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) {
uint16 header_fds = msg->header()->num_fds;
CHECK(header_fds == input_fds_.size());
if (header_fds == 0)
return true;
msg->file_descriptor_set()->SetDescriptors(&input_fds_.front(),
header_fds);
input_fds_.clear();
return true;
}
bool Channel::ChannelImpl::DidEmptyInputBuffers() {
return input_fds_.empty();
}
void Channel::ChannelImpl::HandleInternalMessage(const Message& msg) {
NOTREACHED();
}
Channel::Channel(const IPC::ChannelHandle& channel_handle,
Mode mode,
Listener* listener)
: channel_impl_(new ChannelImpl(channel_handle, mode, listener)) {
}
Channel::~Channel() {
delete channel_impl_;
}
bool Channel::Connect() {
return channel_impl_->Connect();
}
void Channel::Close() {
channel_impl_->Close();
}
base::ProcessId Channel::peer_pid() const {
return channel_impl_->peer_pid();
}
bool Channel::Send(Message* message) {
return channel_impl_->Send(message);
}
}