This source file includes following definitions.
- port
- Init
- Shutdown
- AttachMessagePipeEndpoint
- RunMessagePipeEndpoint
- RunRemoteMessagePipeEndpoint
- WriteMessage
- IsWriteBufferEmpty
- DetachMessagePipeEndpoint
- OnReadMessage
- OnFatalError
- ValidateReadMessage
- OnReadMessageForDownstream
- OnReadMessageForChannel
- HandleRemoteError
- HandleLocalError
#include "mojo/system/channel.h"
#include "base/basictypes.h"
#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/strings/stringprintf.h"
#include "mojo/system/message_pipe_endpoint.h"
namespace mojo {
namespace system {
COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
MessageInTransit::kInvalidEndpointId,
kBootstrapEndpointId_is_invalid);
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
Channel::kBootstrapEndpointId;
Channel::EndpointInfo::EndpointInfo() {
}
Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
unsigned port)
: message_pipe(message_pipe),
port(port) {
}
Channel::EndpointInfo::~EndpointInfo() {
}
Channel::Channel()
: next_local_id_(kBootstrapEndpointId) {
}
bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
DCHECK(creation_thread_checker_.CalledOnValidThread());
DCHECK(raw_channel);
DCHECK(!raw_channel_);
raw_channel_ = raw_channel.Pass();
if (!raw_channel_->Init(this)) {
raw_channel_.reset();
return false;
}
return true;
}
void Channel::Shutdown() {
DCHECK(creation_thread_checker_.CalledOnValidThread());
base::AutoLock locker(lock_);
DCHECK(raw_channel_.get());
raw_channel_->Shutdown();
raw_channel_.reset();
}
MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
scoped_refptr<MessagePipe> message_pipe, unsigned port) {
DCHECK(port == 0 || port == 1);
DCHECK_EQ(message_pipe->GetType(port), MessagePipeEndpoint::kTypeProxy);
MessageInTransit::EndpointId local_id;
{
base::AutoLock locker(lock_);
while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
local_id_to_endpoint_info_map_.find(next_local_id_) !=
local_id_to_endpoint_info_map_.end())
next_local_id_++;
local_id = next_local_id_;
next_local_id_++;
local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
}
message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
return local_id;
}
void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id) {
EndpointInfo endpoint_info;
{
base::AutoLock locker(lock_);
IdToEndpointInfoMap::const_iterator it =
local_id_to_endpoint_info_map_.find(local_id);
CHECK(it != local_id_to_endpoint_info_map_.end());
endpoint_info = it->second;
}
endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
}
void Channel::RunRemoteMessagePipeEndpoint(
MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id) {
base::AutoLock locker(lock_);
DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
local_id_to_endpoint_info_map_.end());
scoped_ptr<MessageInTransit> message(new MessageInTransit(
MessageInTransit::kTypeChannel,
MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
0, 0, NULL));
message->set_source_id(local_id);
message->set_destination_id(remote_id);
if (!raw_channel_->WriteMessage(message.Pass())) {
CHECK(false) << "Not yet handled";
}
}
bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
base::AutoLock locker(lock_);
if (!raw_channel_.get()) {
LOG(WARNING) << "WriteMessage() after shutdown";
return false;
}
return raw_channel_->WriteMessage(message.Pass());
}
bool Channel::IsWriteBufferEmpty() {
base::AutoLock locker(lock_);
DCHECK(raw_channel_.get());
return raw_channel_->IsWriteBufferEmpty();
}
void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
base::AutoLock locker_(lock_);
local_id_to_endpoint_info_map_.erase(local_id);
}
Channel::~Channel() {
DCHECK(!raw_channel_.get());
DLOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty())
<< "Destroying Channel with " << local_id_to_endpoint_info_map_.size()
<< " endpoints still present";
}
void Channel::OnReadMessage(const MessageInTransit::View& message_view) {
if (!ValidateReadMessage(message_view))
return;
switch (message_view.type()) {
case MessageInTransit::kTypeMessagePipeEndpoint:
case MessageInTransit::kTypeMessagePipe:
OnReadMessageForDownstream(message_view);
break;
case MessageInTransit::kTypeChannel:
OnReadMessageForChannel(message_view);
break;
default:
HandleRemoteError(base::StringPrintf(
"Received message of invalid type %u",
static_cast<unsigned>(message_view.type())));
break;
}
}
void Channel::OnFatalError(FatalError fatal_error) {
NOTIMPLEMENTED();
}
bool Channel::ValidateReadMessage(const MessageInTransit::View& message_view) {
const char* error_message = NULL;
if (!message_view.IsValid(&error_message)) {
DCHECK(error_message);
HandleRemoteError(error_message);
return false;
}
return true;
}
void Channel::OnReadMessageForDownstream(
const MessageInTransit::View& message_view) {
DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
message_view.type() == MessageInTransit::kTypeMessagePipe);
MessageInTransit::EndpointId local_id = message_view.destination_id();
if (local_id == MessageInTransit::kInvalidEndpointId) {
HandleRemoteError("Received message with no destination ID");
return;
}
EndpointInfo endpoint_info;
{
base::AutoLock locker(lock_);
DCHECK(raw_channel_.get());
IdToEndpointInfoMap::const_iterator it =
local_id_to_endpoint_info_map_.find(local_id);
if (it == local_id_to_endpoint_info_map_.end()) {
HandleRemoteError(base::StringPrintf(
"Received a message for nonexistent local destination ID %u",
static_cast<unsigned>(local_id)));
DLOG(ERROR) << "This should not happen under normal operation.";
return;
}
endpoint_info = it->second;
}
scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
message->DeserializeDispatchers(this);
MojoResult result = endpoint_info.message_pipe->EnqueueMessage(
MessagePipe::GetPeerPort(endpoint_info.port), message.Pass(), NULL);
if (result != MOJO_RESULT_OK) {
HandleLocalError(base::StringPrintf(
"Failed to enqueue message to local destination ID %u (result %d)",
static_cast<unsigned>(local_id), static_cast<int>(result)));
return;
}
}
void Channel::OnReadMessageForChannel(
const MessageInTransit::View& message_view) {
DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
switch (message_view.subtype()) {
case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
DVLOG(2) << "Handling channel message to run message pipe (local ID = "
<< message_view.destination_id() << ", remote ID = "
<< message_view.source_id() << ")";
RunMessagePipeEndpoint(message_view.destination_id(),
message_view.source_id());
break;
default:
HandleRemoteError("Received invalid channel message");
NOTREACHED();
break;
}
}
void Channel::HandleRemoteError(const base::StringPiece& error_message) {
LOG(WARNING) << error_message;
}
void Channel::HandleLocalError(const base::StringPiece& error_message) {
LOG(WARNING) << error_message;
}
}
}