This source file includes following definitions.
- uv_unique_pipe_name
- uv_pipe_init
- uv_pipe_connection_init
- open_named_pipe
- uv_stdio_pipe_server
- uv_set_pipe_handle
- pipe_shutdown_thread_proc
- uv_pipe_endgame
- uv_pipe_pending_instances
- uv_pipe_bind
- pipe_connect_thread_proc
- uv_pipe_connect
- uv_pipe_cleanup
- uv_pipe_close
- uv_pipe_queue_accept
- uv_pipe_accept
- uv_pipe_listen
- uv_pipe_zero_readfile_thread_proc
- uv_pipe_writefile_thread_proc
- post_completion_read_wait
- post_completion_write_wait
- uv_pipe_queue_read
- uv_pipe_read_start_impl
- uv_pipe_read_start
- uv_pipe_read2_start
- uv_insert_non_overlapped_write_req
- uv_remove_non_overlapped_write_req
- uv_queue_non_overlapped_write
- uv_pipe_write_impl
- uv_pipe_write
- uv_pipe_write2
- uv_pipe_read_eof
- uv_pipe_read_error
- uv_pipe_read_error_or_eof
- uv_process_pipe_read_req
- uv_process_pipe_write_req
- uv_process_pipe_accept_req
- uv_process_pipe_connect_req
- uv_process_pipe_shutdown_req
- eof_timer_init
- eof_timer_start
- eof_timer_stop
- eof_timer_cb
- eof_timer_destroy
- eof_timer_close_cb
- uv_pipe_open
#include <assert.h>
#include <io.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include "uv.h"
#include "internal.h"
#include "handle-inl.h"
#include "stream-inl.h"
#include "req-inl.h"
static char uv_zero_[] = "";
static const uv_buf_t uv_null_buf_ = { 0, NULL };
static const int64_t eof_timeout = 50;
static const int default_pending_pipe_instances = 4;
#define UV_IPC_RAW_DATA 0x0001
#define UV_IPC_TCP_SERVER 0x0002
#define UV_IPC_TCP_CONNECTION 0x0004
typedef struct {
int flags;
uint64_t raw_data_length;
} uv_ipc_frame_header_t;
typedef struct {
uv_ipc_frame_header_t header;
WSAPROTOCOL_INFOW socket_info;
} uv_ipc_frame_uv_stream;
static void eof_timer_init(uv_pipe_t* pipe);
static void eof_timer_start(uv_pipe_t* pipe);
static void eof_timer_stop(uv_pipe_t* pipe);
static void eof_timer_cb(uv_timer_t* timer, int status);
static void eof_timer_destroy(uv_pipe_t* pipe);
static void eof_timer_close_cb(uv_handle_t* handle);
static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
_snprintf(name, size, "\\\\.\\pipe\\uv\\%p-%d", ptr, GetCurrentProcessId());
}
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
handle->reqs_pending = 0;
handle->handle = INVALID_HANDLE_VALUE;
handle->name = NULL;
handle->ipc_pid = 0;
handle->remaining_ipc_rawdata_bytes = 0;
handle->pending_ipc_info.socket_info = NULL;
handle->pending_ipc_info.tcp_connection = 0;
handle->ipc = ipc;
handle->non_overlapped_writes_tail = NULL;
uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
return 0;
}
static void uv_pipe_connection_init(uv_pipe_t* handle) {
uv_connection_init((uv_stream_t*) handle);
handle->read_req.data = handle;
handle->eof_timer = NULL;
}
static HANDLE open_named_pipe(WCHAR* name, DWORD* duplex_flags) {
HANDLE pipeHandle;
pipeHandle = CreateFileW(name,
GENERIC_READ | GENERIC_WRITE,
0,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
*duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
return pipeHandle;
}
if (GetLastError() == ERROR_ACCESS_DENIED) {
pipeHandle = CreateFileW(name,
GENERIC_READ | FILE_WRITE_ATTRIBUTES,
0,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
*duplex_flags = UV_HANDLE_READABLE;
return pipeHandle;
}
}
if (GetLastError() == ERROR_ACCESS_DENIED) {
pipeHandle = CreateFileW(name,
GENERIC_WRITE | FILE_READ_ATTRIBUTES,
0,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
*duplex_flags = UV_HANDLE_WRITABLE;
return pipeHandle;
}
}
return INVALID_HANDLE_VALUE;
}
int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
char* name, size_t nameSize) {
HANDLE pipeHandle;
int err;
char* ptr = (char*)handle;
for (;;) {
uv_unique_pipe_name(ptr, name, nameSize);
pipeHandle = CreateNamedPipeA(name,
access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
break;
}
err = GetLastError();
if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
goto error;
}
ptr++;
}
if (CreateIoCompletionPort(pipeHandle,
loop->iocp,
(ULONG_PTR)handle,
0) == NULL) {
err = GetLastError();
goto error;
}
uv_pipe_connection_init(handle);
handle->handle = pipeHandle;
return 0;
error:
if (pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(pipeHandle);
}
return err;
}
static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle,
HANDLE pipeHandle, DWORD duplex_flags) {
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_MODE_INFORMATION mode_info;
DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
if (GetLastError() == ERROR_INVALID_PARAMETER) {
SetLastError(WSAENOTSOCK);
}
return -1;
}
nt_status = pNtQueryInformationFile(pipeHandle,
&io_status,
&mode_info,
sizeof(mode_info),
FileModeInformation);
if (nt_status != STATUS_SUCCESS) {
return -1;
}
if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
} else {
if (CreateIoCompletionPort(pipeHandle,
loop->iocp,
(ULONG_PTR)handle,
0) == NULL) {
handle->flags |= UV_HANDLE_EMULATE_IOCP;
}
}
handle->handle = pipeHandle;
handle->flags |= duplex_flags;
return 0;
}
static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
uv_loop_t* loop;
uv_pipe_t* handle;
uv_shutdown_t* req;
req = (uv_shutdown_t*) parameter;
assert(req);
handle = (uv_pipe_t*) req->handle;
assert(handle);
loop = handle->loop;
assert(loop);
FlushFileBuffers(handle->handle);
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
int err;
DWORD result;
uv_shutdown_t* req;
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_PIPE_LOCAL_INFORMATION pipe_info;
if ((handle->flags & UV_HANDLE_CONNECTION) &&
handle->shutdown_req != NULL &&
handle->write_reqs_pending == 0) {
req = handle->shutdown_req;
handle->shutdown_req = NULL;
if (handle->flags & UV__HANDLE_CLOSING) {
UNREGISTER_HANDLE_REQ(loop, handle, req);
if (req->cb) {
req->cb(req, UV_ECANCELED);
}
DECREASE_PENDING_REQ_COUNT(handle);
return;
}
nt_status = pNtQueryInformationFile(handle->handle,
&io_status,
&pipe_info,
sizeof pipe_info,
FilePipeLocalInformation);
if (nt_status != STATUS_SUCCESS) {
UNREGISTER_HANDLE_REQ(loop, handle, req);
handle->flags |= UV_HANDLE_WRITABLE;
if (req->cb) {
err = pRtlNtStatusToDosError(nt_status);
req->cb(req, uv_translate_sys_error(err));
}
DECREASE_PENDING_REQ_COUNT(handle);
return;
}
if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
uv_insert_pending_req(loop, (uv_req_t*) req);
return;
}
result = QueueUserWorkItem(pipe_shutdown_thread_proc,
req,
WT_EXECUTELONGFUNCTION);
if (result) {
return;
} else {
UNREGISTER_HANDLE_REQ(loop, handle, req);
handle->flags |= UV_HANDLE_WRITABLE;
if (req->cb) {
err = GetLastError();
req->cb(req, uv_translate_sys_error(err));
}
DECREASE_PENDING_REQ_COUNT(handle);
return;
}
}
if (handle->flags & UV__HANDLE_CLOSING &&
handle->reqs_pending == 0) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
if (handle->flags & UV_HANDLE_CONNECTION) {
if (handle->pending_ipc_info.socket_info) {
free(handle->pending_ipc_info.socket_info);
handle->pending_ipc_info.socket_info = NULL;
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(handle->read_req.wait_handle);
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
}
if (handle->read_req.event_handle) {
CloseHandle(handle->read_req.event_handle);
handle->read_req.event_handle = NULL;
}
}
}
if (handle->flags & UV_HANDLE_PIPESERVER) {
assert(handle->accept_reqs);
free(handle->accept_reqs);
handle->accept_reqs = NULL;
}
uv__handle_close(handle);
}
}
void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
handle->pending_instances = count;
handle->flags |= UV_HANDLE_PIPESERVER;
}
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
uv_loop_t* loop = handle->loop;
int i, err, nameSize;
uv_pipe_accept_t* req;
if (handle->flags & UV_HANDLE_BOUND) {
return UV_EINVAL;
}
if (!name) {
return UV_EINVAL;
}
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
handle->pending_instances = default_pending_pipe_instances;
}
handle->accept_reqs = (uv_pipe_accept_t*)
malloc(sizeof(uv_pipe_accept_t) * handle->pending_instances);
if (!handle->accept_reqs) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
for (i = 0; i < handle->pending_instances; i++) {
req = &handle->accept_reqs[i];
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_ACCEPT;
req->data = handle;
req->pipeHandle = INVALID_HANDLE_VALUE;
req->next_pending = NULL;
}
nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
handle->name = (WCHAR*)malloc(nameSize);
if (!handle->name) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
return uv_translate_sys_error(GetLastError());
}
handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
FILE_FLAG_FIRST_PIPE_INSTANCE,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
err = GetLastError();
if (err == ERROR_ACCESS_DENIED) {
err = WSAEADDRINUSE;
} else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
err = WSAEACCES;
}
goto error;
}
if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle, 0)) {
err = GetLastError();
goto error;
}
handle->pending_accepts = NULL;
handle->flags |= UV_HANDLE_PIPESERVER;
handle->flags |= UV_HANDLE_BOUND;
return 0;
error:
if (handle->name) {
free(handle->name);
handle->name = NULL;
}
if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(handle->accept_reqs[0].pipeHandle);
handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
}
return uv_translate_sys_error(err);
}
static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
uv_loop_t* loop;
uv_pipe_t* handle;
uv_connect_t* req;
HANDLE pipeHandle = INVALID_HANDLE_VALUE;
DWORD duplex_flags;
req = (uv_connect_t*) parameter;
assert(req);
handle = (uv_pipe_t*) req->handle;
assert(handle);
loop = handle->loop;
assert(loop);
while (WaitNamedPipeW(handle->name, 30000)) {
pipeHandle = open_named_pipe(handle->name, &duplex_flags);
if (pipeHandle != INVALID_HANDLE_VALUE) {
break;
}
SwitchToThread();
}
if (pipeHandle != INVALID_HANDLE_VALUE &&
!uv_set_pipe_handle(loop, handle, pipeHandle, duplex_flags)) {
SET_REQ_SUCCESS(req);
} else {
SET_REQ_ERROR(req, GetLastError());
}
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
const char* name, uv_connect_cb cb) {
uv_loop_t* loop = handle->loop;
int err, nameSize;
HANDLE pipeHandle = INVALID_HANDLE_VALUE;
DWORD duplex_flags;
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_CONNECT;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
handle->name = (WCHAR*)malloc(nameSize);
if (!handle->name) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
err = GetLastError();
goto error;
}
pipeHandle = open_named_pipe(handle->name, &duplex_flags);
if (pipeHandle == INVALID_HANDLE_VALUE) {
if (GetLastError() == ERROR_PIPE_BUSY) {
if (!QueueUserWorkItem(&pipe_connect_thread_proc,
req,
WT_EXECUTELONGFUNCTION)) {
err = GetLastError();
goto error;
}
REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
return;
}
err = GetLastError();
goto error;
}
assert(pipeHandle != INVALID_HANDLE_VALUE);
if (uv_set_pipe_handle(loop,
(uv_pipe_t*) req->handle,
pipeHandle,
duplex_flags)) {
err = GetLastError();
goto error;
}
SET_REQ_SUCCESS(req);
uv_insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
return;
error:
if (handle->name) {
free(handle->name);
handle->name = NULL;
}
if (pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(pipeHandle);
}
SET_REQ_ERROR(req, err);
uv_insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
return;
}
void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
int i;
HANDLE pipeHandle;
if (handle->name) {
free(handle->name);
handle->name = NULL;
}
if (handle->flags & UV_HANDLE_PIPESERVER) {
for (i = 0; i < handle->pending_instances; i++) {
pipeHandle = handle->accept_reqs[i].pipeHandle;
if (pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(pipeHandle);
handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
}
}
}
if (handle->flags & UV_HANDLE_CONNECTION) {
handle->flags &= ~UV_HANDLE_WRITABLE;
eof_timer_destroy(handle);
}
if ((handle->flags & UV_HANDLE_CONNECTION)
&& handle->handle != INVALID_HANDLE_VALUE) {
CloseHandle(handle->handle);
handle->handle = INVALID_HANDLE_VALUE;
}
}
void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
if (handle->flags & UV_HANDLE_READING) {
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);
}
if (handle->flags & UV_HANDLE_LISTENING) {
handle->flags &= ~UV_HANDLE_LISTENING;
DECREASE_ACTIVE_COUNT(loop, handle);
}
uv_pipe_cleanup(loop, handle);
if (handle->reqs_pending == 0) {
uv_want_endgame(loop, (uv_handle_t*) handle);
}
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
uv__handle_closing(handle);
}
static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
uv_pipe_accept_t* req, BOOL firstInstance) {
assert(handle->flags & UV_HANDLE_LISTENING);
if (!firstInstance) {
assert(req->pipeHandle == INVALID_HANDLE_VALUE);
req->pipeHandle = CreateNamedPipeW(handle->name,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
if (req->pipeHandle == INVALID_HANDLE_VALUE) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
return;
}
if (uv_set_pipe_handle(loop, handle, req->pipeHandle, 0)) {
CloseHandle(req->pipeHandle);
req->pipeHandle = INVALID_HANDLE_VALUE;
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
return;
}
}
assert(req->pipeHandle != INVALID_HANDLE_VALUE);
memset(&(req->overlapped), 0, sizeof(req->overlapped));
if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) &&
GetLastError() != ERROR_IO_PENDING) {
if (GetLastError() == ERROR_PIPE_CONNECTED) {
SET_REQ_SUCCESS(req);
} else {
CloseHandle(req->pipeHandle);
req->pipeHandle = INVALID_HANDLE_VALUE;
SET_REQ_ERROR(req, GetLastError());
}
uv_insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
return;
}
handle->reqs_pending++;
}
int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
uv_loop_t* loop = server->loop;
uv_pipe_t* pipe_client;
uv_pipe_accept_t* req;
if (server->ipc) {
if (!server->pending_ipc_info.socket_info) {
return WSAEWOULDBLOCK;
}
return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
server->pending_ipc_info.tcp_connection);
} else {
pipe_client = (uv_pipe_t*)client;
req = server->pending_accepts;
if (!req) {
return WSAEWOULDBLOCK;
}
uv_pipe_connection_init(pipe_client);
pipe_client->handle = req->pipeHandle;
pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
server->pending_accepts = req->next_pending;
req->next_pending = NULL;
req->pipeHandle = INVALID_HANDLE_VALUE;
if (!(server->flags & UV__HANDLE_CLOSING)) {
uv_pipe_queue_accept(loop, server, req, FALSE);
}
}
return 0;
}
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
uv_loop_t* loop = handle->loop;
int i;
if (handle->flags & UV_HANDLE_LISTENING) {
handle->connection_cb = cb;
}
if (!(handle->flags & UV_HANDLE_BOUND)) {
return WSAEINVAL;
}
if (handle->flags & UV_HANDLE_READING) {
return WSAEISCONN;
}
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
return ERROR_NOT_SUPPORTED;
}
handle->flags |= UV_HANDLE_LISTENING;
INCREASE_ACTIVE_COUNT(loop, handle);
handle->connection_cb = cb;
assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
for (i = 0; i < handle->pending_instances; i++) {
uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0);
}
return 0;
}
static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
int result;
DWORD bytes;
uv_read_t* req = (uv_read_t*) parameter;
uv_pipe_t* handle = (uv_pipe_t*) req->data;
uv_loop_t* loop = handle->loop;
assert(req != NULL);
assert(req->type == UV_READ);
assert(handle->type == UV_NAMED_PIPE);
result = ReadFile(handle->handle,
&uv_zero_,
0,
&bytes,
NULL);
if (!result) {
SET_REQ_ERROR(req, GetLastError());
}
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
int result;
DWORD bytes;
uv_write_t* req = (uv_write_t*) parameter;
uv_pipe_t* handle = (uv_pipe_t*) req->handle;
uv_loop_t* loop = handle->loop;
assert(req != NULL);
assert(req->type == UV_WRITE);
assert(handle->type == UV_NAMED_PIPE);
assert(req->write_buffer.base);
result = WriteFile(handle->handle,
req->write_buffer.base,
req->write_buffer.len,
&bytes,
NULL);
if (!result) {
SET_REQ_ERROR(req, GetLastError());
}
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
uv_read_t* req;
uv_tcp_t* handle;
req = (uv_read_t*) context;
assert(req != NULL);
handle = (uv_tcp_t*)req->data;
assert(handle != NULL);
assert(!timed_out);
if (!PostQueuedCompletionStatus(handle->loop->iocp,
req->overlapped.InternalHigh,
0,
&req->overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
uv_write_t* req;
uv_tcp_t* handle;
req = (uv_write_t*) context;
assert(req != NULL);
handle = (uv_tcp_t*)req->handle;
assert(handle != NULL);
assert(!timed_out);
if (!PostQueuedCompletionStatus(handle->loop->iocp,
req->overlapped.InternalHigh,
0,
&req->overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
uv_read_t* req;
int result;
assert(handle->flags & UV_HANDLE_READING);
assert(!(handle->flags & UV_HANDLE_READ_PENDING));
assert(handle->handle != INVALID_HANDLE_VALUE);
req = &handle->read_req;
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
req,
WT_EXECUTELONGFUNCTION)) {
SET_REQ_ERROR(req, GetLastError());
goto error;
}
} else {
memset(&req->overlapped, 0, sizeof(req->overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
}
result = ReadFile(handle->handle,
&uv_zero_,
0,
NULL,
&req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
SET_REQ_ERROR(req, GetLastError());
goto error;
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (!req->event_handle) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
}
if (req->wait_handle == INVALID_HANDLE_VALUE) {
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->overlapped.hEvent, post_completion_read_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
goto error;
}
}
}
}
eof_timer_start(handle);
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
return;
error:
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
}
static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb, uv_read2_cb read2_cb) {
uv_loop_t* loop = handle->loop;
handle->flags |= UV_HANDLE_READING;
INCREASE_ACTIVE_COUNT(loop, handle);
handle->read_cb = read_cb;
handle->read2_cb = read2_cb;
handle->alloc_cb = alloc_cb;
if (!(handle->flags & UV_HANDLE_READ_PENDING))
uv_pipe_queue_read(loop, handle);
return 0;
}
int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL);
}
int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
uv_read2_cb read_cb) {
return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb);
}
static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
uv_write_t* req) {
req->next_req = NULL;
if (handle->non_overlapped_writes_tail) {
req->next_req =
handle->non_overlapped_writes_tail->next_req;
handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req;
handle->non_overlapped_writes_tail = req;
} else {
req->next_req = (uv_req_t*)req;
handle->non_overlapped_writes_tail = req;
}
}
static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
uv_write_t* req;
if (handle->non_overlapped_writes_tail) {
req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req;
if (req == handle->non_overlapped_writes_tail) {
handle->non_overlapped_writes_tail = NULL;
} else {
handle->non_overlapped_writes_tail->next_req =
req->next_req;
}
return req;
} else {
return NULL;
}
}
static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
if (req) {
if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
req,
WT_EXECUTELONGFUNCTION)) {
uv_fatal_error(GetLastError(), "QueueUserWorkItem");
}
}
}
static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt,
uv_stream_t* send_handle, uv_write_cb cb) {
int err;
int result;
uv_tcp_t* tcp_send_handle;
uv_write_t* ipc_header_req;
uv_ipc_frame_uv_stream ipc_frame;
if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) {
return ERROR_NOT_SUPPORTED;
}
if (send_handle && ((send_handle->type != UV_TCP) ||
(!(send_handle->flags & UV_HANDLE_BOUND) &&
!(send_handle->flags & UV_HANDLE_CONNECTION)))) {
return ERROR_NOT_SUPPORTED;
}
assert(handle->handle != INVALID_HANDLE_VALUE);
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_WRITE;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
req->ipc_header = 0;
req->event_handle = NULL;
req->wait_handle = INVALID_HANDLE_VALUE;
memset(&req->overlapped, 0, sizeof(req->overlapped));
if (handle->ipc) {
assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
ipc_frame.header.flags = 0;
if (send_handle) {
tcp_send_handle = (uv_tcp_t*)send_handle;
err = uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
&ipc_frame.socket_info);
if (err) {
return err;
}
ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
}
}
if (bufcnt == 1) {
ipc_frame.header.flags |= UV_IPC_RAW_DATA;
ipc_frame.header.raw_data_length = bufs[0].len;
}
if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
ipc_header_req = req;
} else {
if (handle->ipc_header_write_req.type != UV_WRITE) {
ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
} else {
ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
if (!ipc_header_req) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
}
uv_req_init(loop, (uv_req_t*) ipc_header_req);
ipc_header_req->type = UV_WRITE;
ipc_header_req->handle = (uv_stream_t*) handle;
ipc_header_req->cb = NULL;
ipc_header_req->ipc_header = 1;
}
memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));
ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
if (!ipc_header_req->overlapped.hEvent) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
result = WriteFile(handle->handle,
&ipc_frame,
ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
sizeof(ipc_frame) : sizeof(ipc_frame.header),
NULL,
&ipc_header_req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
err = GetLastError();
CloseHandle(ipc_header_req->overlapped.hEvent);
return err;
}
if (!result) {
if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
WAIT_OBJECT_0) {
err = GetLastError();
CloseHandle(ipc_header_req->overlapped.hEvent);
return err;
}
}
ipc_header_req->queued_bytes = 0;
CloseHandle(ipc_header_req->overlapped.hEvent);
ipc_header_req->overlapped.hEvent = NULL;
REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
handle->reqs_pending++;
handle->write_reqs_pending++;
if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
return 0;
}
}
if ((handle->flags &
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
DWORD bytes;
result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
&bytes,
NULL);
if (!result) {
return err;
} else {
req->queued_bytes = 0;
}
REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
handle->write_reqs_pending++;
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
} else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
req->write_buffer = bufs[0];
uv_insert_non_overlapped_write_req(handle, req);
if (handle->write_reqs_pending == 0) {
uv_queue_non_overlapped_write(handle);
}
req->queued_bytes = uv_count_bufs(bufs, bufcnt);
handle->write_queue_size += req->queued_bytes;
} else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
if (!req->overlapped.hEvent) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
NULL,
&req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
err = GetLastError();
CloseHandle(req->overlapped.hEvent);
return err;
}
if (result) {
req->queued_bytes = 0;
} else {
if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
WAIT_OBJECT_0) {
err = GetLastError();
CloseHandle(ipc_header_req->overlapped.hEvent);
return uv_translate_sys_error(err);
}
}
CloseHandle(req->overlapped.hEvent);
REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
handle->write_reqs_pending++;
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
} else {
result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
NULL,
&req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
return GetLastError();
}
if (result) {
req->queued_bytes = 0;
} else {
req->queued_bytes = uv_count_bufs(bufs, bufcnt);
handle->write_queue_size += req->queued_bytes;
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->overlapped.hEvent, post_completion_write_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
return GetLastError();
}
}
}
REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
handle->write_reqs_pending++;
return 0;
}
int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
uv_buf_t bufs[], int bufcnt, uv_write_cb cb) {
return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, NULL, cb);
}
int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) {
if (!handle->ipc) {
return WSAEINVAL;
}
return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb);
}
static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
uv_buf_t buf) {
eof_timer_destroy(handle);
handle->flags &= ~UV_HANDLE_READABLE;
uv_read_stop((uv_stream_t*) handle);
if (handle->read2_cb) {
handle->read2_cb(handle, UV_EOF, uv_null_buf_, UV_UNKNOWN_HANDLE);
} else {
handle->read_cb((uv_stream_t*) handle, UV_EOF, uv_null_buf_);
}
}
static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
uv_buf_t buf) {
eof_timer_destroy(handle);
uv_read_stop((uv_stream_t*) handle);
if (handle->read2_cb) {
handle->read2_cb(handle,
uv_translate_sys_error(error),
buf,
UV_UNKNOWN_HANDLE);
} else {
handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), buf);
}
}
static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
int error, uv_buf_t buf) {
if (error == ERROR_BROKEN_PIPE) {
uv_pipe_read_eof(loop, handle, buf);
} else {
uv_pipe_read_error(loop, handle, error, buf);
}
}
void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_req_t* req) {
DWORD bytes, avail;
uv_buf_t buf;
uv_ipc_frame_uv_stream ipc_frame;
assert(handle->type == UV_NAMED_PIPE);
handle->flags &= ~UV_HANDLE_READ_PENDING;
eof_timer_stop(handle);
if (!REQ_SUCCESS(req)) {
if (handle->flags & UV_HANDLE_READING) {
uv_pipe_read_error_or_eof(loop,
handle,
GET_REQ_ERROR(req),
uv_null_buf_);
}
} else {
while (handle->flags & UV_HANDLE_READING) {
if (!PeekNamedPipe(handle->handle,
NULL,
0,
NULL,
&avail,
NULL)) {
uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
break;
}
if (avail == 0) {
break;
}
if (handle->ipc) {
if (handle->remaining_ipc_rawdata_bytes == 0) {
assert(avail >= sizeof(ipc_frame.header));
if (!ReadFile(handle->handle,
&ipc_frame.header,
sizeof(ipc_frame.header),
&bytes,
NULL)) {
uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
uv_null_buf_);
break;
}
assert(bytes == sizeof(ipc_frame.header));
assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
UV_IPC_TCP_CONNECTION));
if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
assert(avail - sizeof(ipc_frame.header) >=
sizeof(ipc_frame.socket_info));
if (!ReadFile(handle->handle,
&ipc_frame.socket_info,
sizeof(ipc_frame) - sizeof(ipc_frame.header),
&bytes,
NULL)) {
uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
uv_null_buf_);
break;
}
assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
assert(!handle->pending_ipc_info.socket_info);
handle->pending_ipc_info.socket_info =
(WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info)));
if (!handle->pending_ipc_info.socket_info) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
*(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info;
handle->pending_ipc_info.tcp_connection =
ipc_frame.header.flags & UV_IPC_TCP_CONNECTION;
}
if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
handle->remaining_ipc_rawdata_bytes =
ipc_frame.header.raw_data_length;
continue;
}
} else {
avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes);
}
}
buf = handle->alloc_cb((uv_handle_t*) handle, avail);
if (buf.len == 0) {
if (handle->read2_cb) {
handle->read2_cb(handle, UV_ENOBUFS, buf, UV_UNKNOWN_HANDLE);
} else if (handle->read_cb) {
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, buf);
}
break;
}
assert(buf.base != NULL);
if (ReadFile(handle->handle,
buf.base,
buf.len,
&bytes,
NULL)) {
if (handle->ipc) {
assert(handle->remaining_ipc_rawdata_bytes >= bytes);
handle->remaining_ipc_rawdata_bytes =
handle->remaining_ipc_rawdata_bytes - bytes;
if (handle->read2_cb) {
handle->read2_cb(handle, bytes, buf,
handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
} else if (handle->read_cb) {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
}
if (handle->pending_ipc_info.socket_info) {
free(handle->pending_ipc_info.socket_info);
handle->pending_ipc_info.socket_info = NULL;
}
} else {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
}
if (bytes <= buf.len) {
break;
}
} else {
uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
break;
}
}
if ((handle->flags & UV_HANDLE_READING) &&
!(handle->flags & UV_HANDLE_READ_PENDING)) {
uv_pipe_queue_read(loop, handle);
}
}
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_write_t* req) {
int err;
assert(handle->type == UV_NAMED_PIPE);
assert(handle->write_queue_size >= req->queued_bytes);
handle->write_queue_size -= req->queued_bytes;
UNREGISTER_HANDLE_REQ(loop, handle, req);
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (req->wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(req->wait_handle);
req->wait_handle = INVALID_HANDLE_VALUE;
}
if (req->event_handle) {
CloseHandle(req->event_handle);
req->event_handle = NULL;
}
}
if (req->ipc_header) {
if (req == &handle->ipc_header_write_req) {
req->type = UV_UNKNOWN_REQ;
} else {
free(req);
}
} else {
if (req->cb) {
err = GET_REQ_ERROR(req);
req->cb(req, uv_translate_sys_error(err));
}
}
handle->write_reqs_pending--;
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
handle->non_overlapped_writes_tail) {
assert(handle->write_reqs_pending > 0);
uv_queue_non_overlapped_write(handle);
}
if (handle->shutdown_req != NULL &&
handle->write_reqs_pending == 0) {
uv_want_endgame(loop, (uv_handle_t*)handle);
}
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_req_t* raw_req) {
uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
assert(handle->type == UV_NAMED_PIPE);
if (REQ_SUCCESS(req)) {
assert(req->pipeHandle != INVALID_HANDLE_VALUE);
req->next_pending = handle->pending_accepts;
handle->pending_accepts = req;
if (handle->connection_cb) {
handle->connection_cb((uv_stream_t*)handle, 0);
}
} else {
if (req->pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(req->pipeHandle);
req->pipeHandle = INVALID_HANDLE_VALUE;
}
if (!(handle->flags & UV__HANDLE_CLOSING)) {
uv_pipe_queue_accept(loop, handle, req, FALSE);
}
}
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_connect_t* req) {
int err;
assert(handle->type == UV_NAMED_PIPE);
UNREGISTER_HANDLE_REQ(loop, handle, req);
if (req->cb) {
err = 0;
if (REQ_SUCCESS(req)) {
uv_pipe_connection_init(handle);
} else {
err = GET_REQ_ERROR(req);
}
req->cb(req, uv_translate_sys_error(err));
}
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_shutdown_t* req) {
assert(handle->type == UV_NAMED_PIPE);
UNREGISTER_HANDLE_REQ(loop, handle, req);
if (handle->flags & UV_HANDLE_READABLE) {
eof_timer_init(handle);
if (handle->flags & UV_HANDLE_READ_PENDING) {
eof_timer_start(handle);
}
}
if (req->cb) {
req->cb(req, 0);
}
DECREASE_PENDING_REQ_COUNT(handle);
}
static void eof_timer_init(uv_pipe_t* pipe) {
int r;
assert(pipe->eof_timer == NULL);
assert(pipe->flags & UV_HANDLE_CONNECTION);
pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer);
r = uv_timer_init(pipe->loop, pipe->eof_timer);
assert(r == 0);
pipe->eof_timer->data = pipe;
uv_unref((uv_handle_t*) pipe->eof_timer);
}
static void eof_timer_start(uv_pipe_t* pipe) {
assert(pipe->flags & UV_HANDLE_CONNECTION);
if (pipe->eof_timer != NULL) {
uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0);
}
}
static void eof_timer_stop(uv_pipe_t* pipe) {
assert(pipe->flags & UV_HANDLE_CONNECTION);
if (pipe->eof_timer != NULL) {
uv_timer_stop(pipe->eof_timer);
}
}
static void eof_timer_cb(uv_timer_t* timer, int status) {
uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
uv_loop_t* loop = timer->loop;
assert(status == 0);
assert(pipe->type == UV_NAMED_PIPE);
assert(pipe->flags & UV_HANDLE_READ_PENDING);
if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
HasOverlappedIoCompleted(&pipe->read_req.overlapped)) {
return;
}
CloseHandle(pipe->handle);
pipe->handle = INVALID_HANDLE_VALUE;
uv_read_stop((uv_stream_t*) pipe);
uv_pipe_read_eof(loop, pipe, uv_null_buf_);
}
static void eof_timer_destroy(uv_pipe_t* pipe) {
assert(pipe->flags && UV_HANDLE_CONNECTION);
if (pipe->eof_timer) {
uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb);
pipe->eof_timer = NULL;
}
}
static void eof_timer_close_cb(uv_handle_t* handle) {
assert(handle->type == UV_TIMER);
free(handle);
}
int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
HANDLE os_handle = (HANDLE)_get_osfhandle(file);
if (os_handle == INVALID_HANDLE_VALUE ||
uv_set_pipe_handle(pipe->loop, pipe, os_handle, 0) == -1) {
return UV_EINVAL;
}
uv_pipe_connection_init(pipe);
pipe->handle = os_handle;
pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
if (pipe->ipc) {
assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
pipe->ipc_pid = uv_parent_pid();
assert(pipe->ipc_pid != -1);
}
return 0;
}