This source file includes following definitions.
- is_vista_or_higher
- SetFileCompletionNotificationModes
- CancelIoEx
- cancel_io_ex_
- handle
- pending_write_
- pending_read
- read_context
- OnPendingReadStarted
- pending_write_no_lock
- write_context_no_lock
- OnPendingWriteStartedNoLock
- OnIOCompleted
- DetachFromOwnerNoLock
- ShouldSelfDestruct
- OnReadCompleted
- OnWriteCompleted
- skip_completion_port_on_success_
- Read
- ScheduleRead
- WriteNoLock
- ScheduleWriteNoLock
- OnInit
- OnShutdownNoLock
- Create
#include "mojo/system/raw_channel.h"
#include <windows.h>
#include "base/basictypes.h"
#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/lazy_instance.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/synchronization/lock.h"
#include "base/win/windows_version.h"
#include "mojo/embedder/platform_handle.h"
namespace mojo {
namespace system {
namespace {
class VistaOrHigherFunctions {
public:
VistaOrHigherFunctions();
bool is_vista_or_higher() const { return is_vista_or_higher_; }
BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) {
return set_file_completion_notification_modes_(handle, flags);
}
BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) {
return cancel_io_ex_(handle, overlapped);
}
private:
typedef BOOL (WINAPI *SetFileCompletionNotificationModesFunc)(HANDLE, UCHAR);
typedef BOOL (WINAPI *CancelIoExFunc)(HANDLE, LPOVERLAPPED);
bool is_vista_or_higher_;
SetFileCompletionNotificationModesFunc
set_file_completion_notification_modes_;
CancelIoExFunc cancel_io_ex_;
};
VistaOrHigherFunctions::VistaOrHigherFunctions()
: is_vista_or_higher_(base::win::GetVersion() >= base::win::VERSION_VISTA),
set_file_completion_notification_modes_(NULL),
cancel_io_ex_(NULL) {
if (!is_vista_or_higher_)
return;
HMODULE module = GetModuleHandleW(L"kernel32.dll");
set_file_completion_notification_modes_ =
reinterpret_cast<SetFileCompletionNotificationModesFunc>(
GetProcAddress(module, "SetFileCompletionNotificationModes"));
DCHECK(set_file_completion_notification_modes_);
cancel_io_ex_ = reinterpret_cast<CancelIoExFunc>(
GetProcAddress(module, "CancelIoEx"));
DCHECK(cancel_io_ex_);
}
base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions =
LAZY_INSTANCE_INITIALIZER;
class RawChannelWin : public RawChannel {
public:
RawChannelWin(embedder::ScopedPlatformHandle handle);
virtual ~RawChannelWin();
private:
class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler {
public:
RawChannelIOHandler(RawChannelWin* owner,
embedder::ScopedPlatformHandle handle);
HANDLE handle() const { return handle_.get().handle; }
bool pending_read() const;
base::MessageLoopForIO::IOContext* read_context();
void OnPendingReadStarted();
bool pending_write_no_lock() const;
base::MessageLoopForIO::IOContext* write_context_no_lock();
void OnPendingWriteStartedNoLock();
virtual void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
DWORD bytes_transferred,
DWORD error) OVERRIDE;
void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer,
scoped_ptr<WriteBuffer> write_buffer);
private:
virtual ~RawChannelIOHandler();
bool ShouldSelfDestruct() const;
void OnReadCompleted(DWORD bytes_read, DWORD error);
void OnWriteCompleted(DWORD bytes_written, DWORD error);
embedder::ScopedPlatformHandle handle_;
RawChannelWin* owner_;
scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_;
scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_;
bool pending_read_;
base::MessageLoopForIO::IOContext read_context_;
bool pending_write_;
base::MessageLoopForIO::IOContext write_context_;
DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler);
};
virtual IOResult Read(size_t* bytes_read) OVERRIDE;
virtual IOResult ScheduleRead() OVERRIDE;
virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE;
virtual IOResult ScheduleWriteNoLock() OVERRIDE;
virtual bool OnInit() OVERRIDE;
virtual void OnShutdownNoLock(
scoped_ptr<ReadBuffer> read_buffer,
scoped_ptr<WriteBuffer> write_buffer) OVERRIDE;
embedder::ScopedPlatformHandle handle_;
RawChannelIOHandler* io_handler_;
const bool skip_completion_port_on_success_;
DISALLOW_COPY_AND_ASSIGN(RawChannelWin);
};
RawChannelWin::RawChannelIOHandler::RawChannelIOHandler(
RawChannelWin* owner,
embedder::ScopedPlatformHandle handle) : handle_(handle.Pass()),
owner_(owner),
pending_read_(false),
pending_write_(false) {
memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
read_context_.handler = this;
memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
write_context_.handler = this;
owner_->message_loop_for_io()->RegisterIOHandler(handle_.get().handle, this);
}
RawChannelWin::RawChannelIOHandler::~RawChannelIOHandler() {
DCHECK(ShouldSelfDestruct());
}
bool RawChannelWin::RawChannelIOHandler::pending_read() const {
DCHECK(owner_);
DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
return pending_read_;
}
base::MessageLoopForIO::IOContext*
RawChannelWin::RawChannelIOHandler::read_context() {
DCHECK(owner_);
DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
return &read_context_;
}
void RawChannelWin::RawChannelIOHandler::OnPendingReadStarted() {
DCHECK(owner_);
DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
DCHECK(!pending_read_);
pending_read_ = true;
}
bool RawChannelWin::RawChannelIOHandler::pending_write_no_lock() const {
DCHECK(owner_);
owner_->write_lock().AssertAcquired();
return pending_write_;
}
base::MessageLoopForIO::IOContext*
RawChannelWin::RawChannelIOHandler::write_context_no_lock() {
DCHECK(owner_);
owner_->write_lock().AssertAcquired();
return &write_context_;
}
void RawChannelWin::RawChannelIOHandler::OnPendingWriteStartedNoLock() {
DCHECK(owner_);
owner_->write_lock().AssertAcquired();
DCHECK(!pending_write_);
pending_write_ = true;
}
void RawChannelWin::RawChannelIOHandler::OnIOCompleted(
base::MessageLoopForIO::IOContext* context,
DWORD bytes_transferred,
DWORD error) {
DCHECK(!owner_ ||
base::MessageLoop::current() == owner_->message_loop_for_io());
if (context == &read_context_)
OnReadCompleted(bytes_transferred, error);
else if (context == &write_context_)
OnWriteCompleted(bytes_transferred, error);
else
NOTREACHED();
if (ShouldSelfDestruct())
delete this;
}
void RawChannelWin::RawChannelIOHandler::DetachFromOwnerNoLock(
scoped_ptr<ReadBuffer> read_buffer,
scoped_ptr<WriteBuffer> write_buffer) {
DCHECK(owner_);
DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
owner_->write_lock().AssertAcquired();
if (pending_read_)
preserved_read_buffer_after_detach_ = read_buffer.Pass();
if (pending_write_)
preserved_write_buffer_after_detach_ = write_buffer.Pass();
owner_ = NULL;
if (ShouldSelfDestruct())
delete this;
}
bool RawChannelWin::RawChannelIOHandler::ShouldSelfDestruct() const {
if (owner_)
return false;
return !pending_read_ && !pending_write_;
}
void RawChannelWin::RawChannelIOHandler::OnReadCompleted(DWORD bytes_read,
DWORD error) {
DCHECK(!owner_ ||
base::MessageLoop::current() == owner_->message_loop_for_io());
CHECK(pending_read_);
pending_read_ = false;
if (!owner_)
return;
if (error != ERROR_SUCCESS) {
DCHECK_EQ(bytes_read, 0u);
PLOG_IF(ERROR, error != ERROR_BROKEN_PIPE) << "ReadFile";
owner_->OnReadCompleted(false, 0);
} else {
DCHECK_GT(bytes_read, 0u);
owner_->OnReadCompleted(true, bytes_read);
}
}
void RawChannelWin::RawChannelIOHandler::OnWriteCompleted(DWORD bytes_written,
DWORD error) {
DCHECK(!owner_ ||
base::MessageLoop::current() == owner_->message_loop_for_io());
if (!owner_) {
CHECK(pending_write_);
pending_write_ = false;
return;
}
{
base::AutoLock locker(owner_->write_lock());
CHECK(pending_write_);
pending_write_ = false;
}
if (error != ERROR_SUCCESS) {
LOG(ERROR) << "WriteFile failed: " << error;
owner_->OnWriteCompleted(false, 0);
} else {
owner_->OnWriteCompleted(true, bytes_written);
}
}
RawChannelWin::RawChannelWin(embedder::ScopedPlatformHandle handle)
: handle_(handle.Pass()),
io_handler_(NULL),
skip_completion_port_on_success_(
g_vista_or_higher_functions.Get().is_vista_or_higher()) {
DCHECK(handle_.is_valid());
}
RawChannelWin::~RawChannelWin() {
DCHECK(!io_handler_);
}
RawChannel::IOResult RawChannelWin::Read(size_t* bytes_read) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
DCHECK(io_handler_);
DCHECK(!io_handler_->pending_read());
char* buffer = NULL;
size_t bytes_to_read = 0;
read_buffer()->GetBuffer(&buffer, &bytes_to_read);
DWORD bytes_read_dword = 0;
BOOL result = ReadFile(io_handler_->handle(),
buffer,
static_cast<DWORD>(bytes_to_read),
&bytes_read_dword,
&io_handler_->read_context()->overlapped);
if (!result) {
DCHECK_EQ(bytes_read_dword, 0u);
DWORD error = GetLastError();
if (error != ERROR_IO_PENDING) {
LOG_IF(ERROR, error != ERROR_BROKEN_PIPE) << "ReadFile failed: " << error;
return IO_FAILED;
}
}
if (result && skip_completion_port_on_success_) {
*bytes_read = bytes_read_dword;
return IO_SUCCEEDED;
}
io_handler_->OnPendingReadStarted();
return IO_PENDING;
}
RawChannel::IOResult RawChannelWin::ScheduleRead() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
DCHECK(io_handler_);
DCHECK(!io_handler_->pending_read());
size_t bytes_read = 0;
IOResult io_result = Read(&bytes_read);
if (io_result == IO_SUCCEEDED) {
DCHECK(skip_completion_port_on_success_);
io_handler_->OnPendingReadStarted();
message_loop_for_io()->PostTask(
FROM_HERE,
base::Bind(&RawChannelIOHandler::OnIOCompleted,
base::Unretained(io_handler_),
base::Unretained(io_handler_->read_context()),
static_cast<DWORD>(bytes_read),
ERROR_SUCCESS));
return IO_PENDING;
}
return io_result;
}
RawChannel::IOResult RawChannelWin::WriteNoLock(size_t* bytes_written) {
write_lock().AssertAcquired();
DCHECK(io_handler_);
DCHECK(!io_handler_->pending_write_no_lock());
std::vector<WriteBuffer::Buffer> buffers;
write_buffer_no_lock()->GetBuffers(&buffers);
DCHECK(!buffers.empty());
DWORD bytes_written_dword = 0;
BOOL result = WriteFile(io_handler_->handle(),
buffers[0].addr,
static_cast<DWORD>(buffers[0].size),
&bytes_written_dword,
&io_handler_->write_context_no_lock()->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
PLOG(ERROR) << "WriteFile";
return IO_FAILED;
}
if (result && skip_completion_port_on_success_) {
*bytes_written = bytes_written_dword;
return IO_SUCCEEDED;
}
io_handler_->OnPendingWriteStartedNoLock();
return IO_PENDING;
}
RawChannel::IOResult RawChannelWin::ScheduleWriteNoLock() {
write_lock().AssertAcquired();
DCHECK(io_handler_);
DCHECK(!io_handler_->pending_write_no_lock());
size_t bytes_written = 0;
IOResult io_result = WriteNoLock(&bytes_written);
if (io_result == IO_SUCCEEDED) {
DCHECK(skip_completion_port_on_success_);
io_handler_->OnPendingWriteStartedNoLock();
message_loop_for_io()->PostTask(
FROM_HERE,
base::Bind(&RawChannelIOHandler::OnIOCompleted,
base::Unretained(io_handler_),
base::Unretained(io_handler_->write_context_no_lock()),
static_cast<DWORD>(bytes_written),
ERROR_SUCCESS));
return IO_PENDING;
}
return io_result;
}
bool RawChannelWin::OnInit() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
DCHECK(handle_.is_valid());
if (skip_completion_port_on_success_ &&
!g_vista_or_higher_functions.Get().SetFileCompletionNotificationModes(
handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)) {
return false;
}
DCHECK(!io_handler_);
io_handler_ = new RawChannelIOHandler(this, handle_.Pass());
return true;
}
void RawChannelWin::OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
scoped_ptr<WriteBuffer> write_buffer) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
DCHECK(io_handler_);
write_lock().AssertAcquired();
if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) {
if (g_vista_or_higher_functions.Get().is_vista_or_higher())
g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(), NULL);
else
CancelIo(io_handler_->handle());
}
io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass());
io_handler_ = NULL;
}
}
scoped_ptr<RawChannel> RawChannel::Create(
embedder::ScopedPlatformHandle handle) {
return scoped_ptr<RawChannel>(new RawChannelWin(handle.Pass()));
}
}
}