This source file includes following definitions.
- SetUp
- TearDown
- ConnectMessagePipes
- BootstrapMessagePipeNoWait
- RestoreInitialState
- io_thread
- SetUpOnIOThread
- TearDownOnIOThread
- CreateAndInitChannel
- ConnectMessagePipesOnIOThread
- BootstrapMessagePipeOnIOThread
- RestoreInitialStateOnIOThread
- TEST_F
- TEST_F
- TEST_F
- TEST_F
- TEST_F
#include <stdint.h>
#include <string.h>
#include <vector>
#include "base/basictypes.h"
#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "base/threading/platform_thread.h"
#include "mojo/embedder/platform_channel_pair.h"
#include "mojo/embedder/scoped_platform_handle.h"
#include "mojo/system/channel.h"
#include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/message_pipe.h"
#include "mojo/system/message_pipe_dispatcher.h"
#include "mojo/system/proxy_message_pipe_endpoint.h"
#include "mojo/system/raw_channel.h"
#include "mojo/system/test_utils.h"
#include "mojo/system/waiter.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
namespace system {
namespace {
class RemoteMessagePipeTest : public testing::Test {
public:
RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart) {}
virtual ~RemoteMessagePipeTest() {}
virtual void SetUp() OVERRIDE {
io_thread_.PostTaskAndWait(
FROM_HERE,
base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
base::Unretained(this)));
}
virtual void TearDown() OVERRIDE {
io_thread_.PostTaskAndWait(
FROM_HERE,
base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
base::Unretained(this)));
}
protected:
void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
scoped_refptr<MessagePipe> mp1) {
io_thread_.PostTaskAndWait(
FROM_HERE,
base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
base::Unretained(this), mp0, mp1));
}
void BootstrapMessagePipeNoWait(unsigned channel_index,
scoped_refptr<MessagePipe> mp) {
io_thread_.PostTask(
FROM_HERE,
base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
base::Unretained(this), channel_index, mp));
}
void RestoreInitialState() {
io_thread_.PostTaskAndWait(
FROM_HERE,
base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
base::Unretained(this)));
}
test::TestIOThread* io_thread() { return &io_thread_; }
private:
void SetUpOnIOThread() {
CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
embedder::PlatformChannelPair channel_pair;
platform_handles_[0] = channel_pair.PassServerHandle();
platform_handles_[1] = channel_pair.PassClientHandle();
}
void TearDownOnIOThread() {
CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
if (channels_[0].get()) {
channels_[0]->Shutdown();
channels_[0] = NULL;
}
if (channels_[1].get()) {
channels_[1]->Shutdown();
channels_[1] = NULL;
}
}
void CreateAndInitChannel(unsigned channel_index) {
CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
CHECK(channel_index == 0 || channel_index == 1);
CHECK(!channels_[channel_index].get());
channels_[channel_index] = new Channel();
CHECK(channels_[channel_index]->Init(
RawChannel::Create(platform_handles_[channel_index].Pass())));
}
void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
scoped_refptr<MessagePipe> mp1) {
CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
if (!channels_[0].get())
CreateAndInitChannel(0);
if (!channels_[1].get())
CreateAndInitChannel(1);
MessageInTransit::EndpointId local_id0 =
channels_[0]->AttachMessagePipeEndpoint(mp0, 1);
MessageInTransit::EndpointId local_id1 =
channels_[1]->AttachMessagePipeEndpoint(mp1, 0);
channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1);
channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0);
}
void BootstrapMessagePipeOnIOThread(unsigned channel_index,
scoped_refptr<MessagePipe> mp) {
CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
CHECK(channel_index == 0 || channel_index == 1);
unsigned port = channel_index ^ 1u;
CreateAndInitChannel(channel_index);
CHECK_EQ(channels_[channel_index]->AttachMessagePipeEndpoint(mp, port),
Channel::kBootstrapEndpointId);
channels_[channel_index]->RunMessagePipeEndpoint(
Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId);
}
void RestoreInitialStateOnIOThread() {
CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
TearDownOnIOThread();
SetUpOnIOThread();
}
test::TestIOThread io_thread_;
embedder::ScopedPlatformHandle platform_handles_[2];
scoped_refptr<Channel> channels_[2];
DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
};
TEST_F(RemoteMessagePipeTest, Basic) {
const char hello[] = "hello";
const char world[] = "world!!!1!!!1!";
char buffer[100] = { 0 };
uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
Waiter waiter;
scoped_refptr<MessagePipe> mp0(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
scoped_refptr<MessagePipe> mp1(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
ConnectMessagePipes(mp0, mp1);
waiter.Init();
EXPECT_EQ(MOJO_RESULT_OK,
mp1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
EXPECT_EQ(MOJO_RESULT_OK,
mp0->WriteMessage(0,
hello, sizeof(hello),
NULL,
MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
mp1->RemoveWaiter(1, &waiter);
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1,
buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
EXPECT_STREQ(hello, buffer);
waiter.Init();
EXPECT_EQ(MOJO_RESULT_OK,
mp0->AddWaiter(0, &waiter, MOJO_WAIT_FLAG_READABLE, 456));
EXPECT_EQ(MOJO_RESULT_OK,
mp1->WriteMessage(1,
world, sizeof(world),
NULL,
MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(456, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
mp0->RemoveWaiter(0, &waiter);
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_OK,
mp0->ReadMessage(0,
buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
EXPECT_STREQ(world, buffer);
mp0->Close(0);
waiter.Init();
MojoResult result = mp1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789);
if (result == MOJO_RESULT_OK) {
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
waiter.Wait(MOJO_DEADLINE_INDEFINITE));
mp1->RemoveWaiter(1, &waiter);
} else {
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
}
mp1->Close(1);
}
TEST_F(RemoteMessagePipeTest, Multiplex) {
const char hello[] = "hello";
const char world[] = "world!!!1!!!1!";
char buffer[100] = { 0 };
uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
Waiter waiter;
scoped_refptr<MessagePipe> mp0(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
scoped_refptr<MessagePipe> mp1(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
ConnectMessagePipes(mp0, mp1);
scoped_refptr<MessagePipe> mp2(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
scoped_refptr<MessagePipe> mp3(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
ConnectMessagePipes(mp2, mp3);
waiter.Init();
EXPECT_EQ(MOJO_RESULT_OK,
mp3->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789));
EXPECT_EQ(MOJO_RESULT_OK,
mp2->WriteMessage(0,
hello, sizeof(hello),
NULL,
MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(789, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
mp3->RemoveWaiter(1, &waiter);
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp0->ReadMessage(0,
buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp1->ReadMessage(1,
buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp2->ReadMessage(0,
buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_OK,
mp3->ReadMessage(1,
buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
EXPECT_STREQ(hello, buffer);
waiter.Init();
EXPECT_EQ(MOJO_RESULT_OK,
mp1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
EXPECT_EQ(MOJO_RESULT_OK,
mp0->WriteMessage(0,
world, sizeof(world),
NULL,
MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
mp1->RemoveWaiter(1, &waiter);
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp0->ReadMessage(0,
buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp2->ReadMessage(0,
buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
mp3->ReadMessage(1,
buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
buffer_size = static_cast<uint32_t>(sizeof(buffer));
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1,
buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
EXPECT_STREQ(world, buffer);
}
TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
const char hello[] = "hello";
char buffer[100] = { 0 };
uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
Waiter waiter;
scoped_refptr<MessagePipe> mp0(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
EXPECT_EQ(MOJO_RESULT_OK,
mp0->WriteMessage(0,
hello, sizeof(hello),
NULL,
MOJO_WRITE_MESSAGE_FLAG_NONE));
BootstrapMessagePipeNoWait(0, mp0);
mp0->Close(0);
scoped_refptr<MessagePipe> mp1(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
waiter.Init();
EXPECT_EQ(MOJO_RESULT_OK,
mp1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
BootstrapMessagePipeNoWait(1, mp1);
EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
mp1->RemoveWaiter(1, &waiter);
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1,
buffer, &buffer_size,
NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
EXPECT_STREQ(hello, buffer);
mp1->Close(1);
}
TEST_F(RemoteMessagePipeTest, HandlePassing) {
const char hello[] = "hello";
Waiter waiter;
scoped_refptr<MessagePipe> mp0(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
scoped_refptr<MessagePipe> mp1(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
ConnectMessagePipes(mp0, mp1);
scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher());
scoped_refptr<MessagePipe> local_mp(new MessagePipe());
dispatcher->Init(local_mp, 0);
waiter.Init();
EXPECT_EQ(MOJO_RESULT_OK,
mp1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
{
DispatcherTransport
transport(test::DispatcherTryStartTransport(dispatcher.get()));
EXPECT_TRUE(transport.is_valid());
std::vector<DispatcherTransport> transports;
transports.push_back(transport);
EXPECT_EQ(MOJO_RESULT_OK,
mp0->WriteMessage(0, hello, sizeof(hello), &transports,
MOJO_WRITE_MESSAGE_FLAG_NONE));
transport.End();
EXPECT_TRUE(dispatcher->HasOneRef());
dispatcher = NULL;
}
EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
mp1->RemoveWaiter(1, &waiter);
char read_buffer[100] = { 0 };
uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
std::vector<scoped_refptr<Dispatcher> > read_dispatchers;
uint32_t read_num_dispatchers = 10;
EXPECT_EQ(MOJO_RESULT_OK,
mp1->ReadMessage(1, read_buffer, &read_buffer_size,
&read_dispatchers, &read_num_dispatchers,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(hello, read_buffer);
EXPECT_EQ(1u, read_dispatchers.size());
EXPECT_EQ(1u, read_num_dispatchers);
ASSERT_TRUE(read_dispatchers[0].get());
EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
EXPECT_EQ(MOJO_RESULT_OK,
local_mp->WriteMessage(1, hello, sizeof(hello), NULL,
MOJO_WRITE_MESSAGE_FLAG_NONE));
waiter.Init();
EXPECT_EQ(MOJO_RESULT_OK,
dispatcher->AddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 456));
EXPECT_EQ(456, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
dispatcher->RemoveWaiter(&waiter);
memset(read_buffer, 0, sizeof(read_buffer));
read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
EXPECT_EQ(MOJO_RESULT_OK,
dispatcher->ReadMessage(read_buffer, &read_buffer_size, 0, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(hello, read_buffer);
waiter.Init();
EXPECT_EQ(MOJO_RESULT_OK,
local_mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789));
EXPECT_EQ(MOJO_RESULT_OK,
dispatcher->WriteMessage(hello, sizeof(hello), NULL,
MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(789, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
local_mp->RemoveWaiter(1, &waiter);
memset(read_buffer, 0, sizeof(read_buffer));
read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
EXPECT_EQ(MOJO_RESULT_OK,
local_mp->ReadMessage(1, read_buffer, &read_buffer_size, NULL, NULL,
MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(hello, read_buffer);
mp0->Close(0);
mp1->Close(1);
EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
local_mp->Close(1);
}
TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
for (unsigned i = 0u; i < 256u; i++) {
scoped_refptr<MessagePipe> mp0(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
BootstrapMessagePipeNoWait(0, mp0);
scoped_refptr<MessagePipe> mp1(new MessagePipe(
scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
BootstrapMessagePipeNoWait(1, mp1);
if (i & 1u) {
io_thread()->task_runner()->PostTask(
FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
}
if (i & 2u)
base::PlatformThread::Sleep(delay);
mp0->Close(0);
if (i & 4u) {
io_thread()->task_runner()->PostTask(
FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
}
if (i & 8u)
base::PlatformThread::Sleep(delay);
mp1->Close(1);
RestoreInitialState();
}
}
}
}
}