This source file includes following definitions.
- uv__tcp_nodelay
- uv__tcp_keepalive
- uv_tcp_set_socket
- uv_tcp_init
- uv_tcp_endgame
- uv__bind
- uv__tcp_bind
- uv__tcp_bind6
- post_completion
- post_write_completion
- uv_tcp_queue_accept
- uv_tcp_queue_read
- uv_tcp_listen
- uv_tcp_accept
- uv_tcp_read_start
- uv__tcp_connect
- uv__tcp_connect6
- uv_tcp_getsockname
- uv_tcp_getpeername
- uv_tcp_write
- uv_process_tcp_read_req
- uv_process_tcp_write_req
- uv_process_tcp_accept_req
- uv_process_tcp_connect_req
- uv_tcp_import
- uv_tcp_nodelay
- uv_tcp_keepalive
- uv_tcp_duplicate_socket
- uv_tcp_simultaneous_accepts
- uv_tcp_try_cancel_io
- uv_tcp_close
- uv_tcp_open
#include <assert.h>
#include <stdlib.h>
#include "uv.h"
#include "internal.h"
#include "handle-inl.h"
#include "stream-inl.h"
#include "req-inl.h"
const unsigned int uv_active_tcp_streams_threshold = 0;
const unsigned int uv_simultaneous_server_accepts = 32;
static char uv_zero_[] = "";
static int uv__tcp_nodelay(uv_tcp_t* handle, SOCKET socket, int enable) {
if (setsockopt(socket,
IPPROTO_TCP,
TCP_NODELAY,
(const char*)&enable,
sizeof enable) == -1) {
return WSAGetLastError();
}
return 0;
}
static int uv__tcp_keepalive(uv_tcp_t* handle, SOCKET socket, int enable, unsigned int delay) {
if (setsockopt(socket,
SOL_SOCKET,
SO_KEEPALIVE,
(const char*)&enable,
sizeof enable) == -1) {
return WSAGetLastError();
}
if (enable && setsockopt(socket,
IPPROTO_TCP,
TCP_KEEPALIVE,
(const char*)&delay,
sizeof delay) == -1) {
return WSAGetLastError();
}
return 0;
}
static int uv_tcp_set_socket(uv_loop_t* loop, uv_tcp_t* handle,
SOCKET socket, int family, int imported) {
DWORD yes = 1;
int non_ifs_lsp;
int err;
assert(handle->socket == INVALID_SOCKET);
if (ioctlsocket(socket, FIONBIO, &yes) == SOCKET_ERROR) {
return WSAGetLastError();
}
if (CreateIoCompletionPort((HANDLE)socket,
loop->iocp,
(ULONG_PTR)socket,
0) == NULL) {
if (imported) {
handle->flags |= UV_HANDLE_EMULATE_IOCP;
} else {
return GetLastError();
}
}
if (family == AF_INET6) {
non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv6;
} else {
non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv4;
}
if (pSetFileCompletionNotificationModes &&
!(handle->flags & UV_HANDLE_EMULATE_IOCP) && !non_ifs_lsp) {
if (pSetFileCompletionNotificationModes((HANDLE) socket,
FILE_SKIP_SET_EVENT_ON_HANDLE |
FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)) {
handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
} else if (GetLastError() != ERROR_INVALID_FUNCTION) {
return GetLastError();
}
}
if (handle->flags & UV_HANDLE_TCP_NODELAY) {
err = uv__tcp_nodelay(handle, socket, 1);
if (err)
return err;
}
if (handle->flags & UV_HANDLE_TCP_KEEPALIVE) {
err = uv__tcp_keepalive(handle, socket, 1, 60);
if (err)
return err;
}
handle->socket = socket;
if (family == AF_INET6) {
handle->flags |= UV_HANDLE_IPV6;
} else {
assert(!(handle->flags & UV_HANDLE_IPV6));
}
return 0;
}
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
uv_stream_init(loop, (uv_stream_t*) handle, UV_TCP);
handle->accept_reqs = NULL;
handle->pending_accepts = NULL;
handle->socket = INVALID_SOCKET;
handle->reqs_pending = 0;
handle->func_acceptex = NULL;
handle->func_connectex = NULL;
handle->processed_accepts = 0;
return 0;
}
void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
int err;
unsigned int i;
uv_tcp_accept_t* req;
if (handle->flags & UV_HANDLE_CONNECTION &&
handle->shutdown_req != NULL &&
handle->write_reqs_pending == 0) {
UNREGISTER_HANDLE_REQ(loop, handle, handle->shutdown_req);
err = 0;
if (handle->flags & UV__HANDLE_CLOSING) {
err = ERROR_OPERATION_ABORTED;
} else if (shutdown(handle->socket, SD_SEND) == SOCKET_ERROR) {
err = WSAGetLastError();
}
if (handle->shutdown_req->cb) {
handle->shutdown_req->cb(handle->shutdown_req,
uv_translate_sys_error(err));
}
handle->shutdown_req = NULL;
DECREASE_PENDING_REQ_COUNT(handle);
return;
}
if (handle->flags & UV__HANDLE_CLOSING &&
handle->reqs_pending == 0) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
if (!(handle->flags & UV_HANDLE_TCP_SOCKET_CLOSED)) {
closesocket(handle->socket);
handle->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
}
if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) {
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
for (i = 0; i < uv_simultaneous_server_accepts; i++) {
req = &handle->accept_reqs[i];
if (req->wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(req->wait_handle);
req->wait_handle = INVALID_HANDLE_VALUE;
}
if (req->event_handle) {
CloseHandle(req->event_handle);
req->event_handle = NULL;
}
}
}
free(handle->accept_reqs);
handle->accept_reqs = NULL;
}
if (handle->flags & UV_HANDLE_CONNECTION &&
handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(handle->read_req.wait_handle);
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
}
if (handle->read_req.event_handle) {
CloseHandle(handle->read_req.event_handle);
handle->read_req.event_handle = NULL;
}
}
uv__handle_close(handle);
loop->active_tcp_streams--;
}
}
static int uv__bind(uv_tcp_t* handle,
int family,
struct sockaddr* addr,
int addrsize) {
DWORD err;
int r;
if (handle->socket == INVALID_SOCKET) {
SOCKET sock = socket(family, SOCK_STREAM, 0);
if (sock == INVALID_SOCKET) {
return WSAGetLastError();
}
if (!SetHandleInformation((HANDLE) sock, HANDLE_FLAG_INHERIT, 0)) {
err = GetLastError();
closesocket(sock);
return err;
}
err = uv_tcp_set_socket(handle->loop, handle, sock, family, 0);
if (err) {
closesocket(sock);
return err;
}
}
r = bind(handle->socket, addr, addrsize);
if (r == SOCKET_ERROR) {
err = WSAGetLastError();
if (err == WSAEADDRINUSE) {
handle->bind_error = err;
handle->flags |= UV_HANDLE_BIND_ERROR;
} else {
return err;
}
}
handle->flags |= UV_HANDLE_BOUND;
return 0;
}
int uv__tcp_bind(uv_tcp_t* handle, struct sockaddr_in addr) {
return uv_translate_sys_error(
uv__bind(handle, AF_INET, (struct sockaddr*) &addr, sizeof(addr)));
}
int uv__tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
return uv_translate_sys_error(
uv__bind(handle, AF_INET6, (struct sockaddr*) &addr, sizeof(addr)));
}
static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
uv_req_t* req;
uv_tcp_t* handle;
req = (uv_req_t*) context;
assert(req != NULL);
handle = (uv_tcp_t*)req->data;
assert(handle != NULL);
assert(!timed_out);
if (!PostQueuedCompletionStatus(handle->loop->iocp,
req->overlapped.InternalHigh,
0,
&req->overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
static void CALLBACK post_write_completion(void* context, BOOLEAN timed_out) {
uv_write_t* req;
uv_tcp_t* handle;
req = (uv_write_t*) context;
assert(req != NULL);
handle = (uv_tcp_t*)req->handle;
assert(handle != NULL);
assert(!timed_out);
if (!PostQueuedCompletionStatus(handle->loop->iocp,
req->overlapped.InternalHigh,
0,
&req->overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
uv_loop_t* loop = handle->loop;
BOOL success;
DWORD bytes;
SOCKET accept_socket;
short family;
assert(handle->flags & UV_HANDLE_LISTENING);
assert(req->accept_socket == INVALID_SOCKET);
if (handle->flags & UV_HANDLE_IPV6) {
family = AF_INET6;
} else {
family = AF_INET;
}
accept_socket = socket(family, SOCK_STREAM, 0);
if (accept_socket == INVALID_SOCKET) {
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
return;
}
if (!SetHandleInformation((HANDLE) accept_socket, HANDLE_FLAG_INHERIT, 0)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
closesocket(accept_socket);
return;
}
memset(&(req->overlapped), 0, sizeof(req->overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
}
success = handle->func_acceptex(handle->socket,
accept_socket,
(void*)req->accept_buffer,
0,
sizeof(struct sockaddr_storage),
sizeof(struct sockaddr_storage),
&bytes,
&req->overlapped);
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
req->accept_socket = accept_socket;
handle->reqs_pending++;
uv_insert_pending_req(loop, (uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
req->accept_socket = accept_socket;
handle->reqs_pending++;
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
req->wait_handle == INVALID_HANDLE_VALUE &&
!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_completion, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
return;
}
} else {
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
closesocket(accept_socket);
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
CloseHandle(req->overlapped.hEvent);
req->event_handle = NULL;
}
}
}
static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
uv_read_t* req;
uv_buf_t buf;
int result;
DWORD bytes, flags;
assert(handle->flags & UV_HANDLE_READING);
assert(!(handle->flags & UV_HANDLE_READ_PENDING));
req = &handle->read_req;
memset(&req->overlapped, 0, sizeof(req->overlapped));
if (loop->active_tcp_streams < uv_active_tcp_streams_threshold) {
handle->flags &= ~UV_HANDLE_ZERO_READ;
handle->read_buffer = handle->alloc_cb((uv_handle_t*) handle, 65536);
if (handle->read_buffer.len == 0) {
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, handle->read_buffer);
return;
}
assert(handle->read_buffer.base != NULL);
buf = handle->read_buffer;
} else {
handle->flags |= UV_HANDLE_ZERO_READ;
buf.base = (char*) &uv_zero_;
buf.len = 0;
}
memset(&(req->overlapped), 0, sizeof(req->overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
assert(req->event_handle);
req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
}
flags = 0;
result = WSARecv(handle->socket,
(WSABUF*)&buf,
1,
&bytes,
&flags,
&req->overlapped,
NULL);
if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
handle->flags |= UV_HANDLE_READ_PENDING;
req->overlapped.InternalHigh = bytes;
handle->reqs_pending++;
uv_insert_pending_req(loop, (uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
req->wait_handle == INVALID_HANDLE_VALUE &&
!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_completion, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
}
} else {
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
}
}
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
uv_loop_t* loop = handle->loop;
unsigned int i, simultaneous_accepts;
uv_tcp_accept_t* req;
int err;
assert(backlog > 0);
if (handle->flags & UV_HANDLE_LISTENING) {
handle->connection_cb = cb;
}
if (handle->flags & UV_HANDLE_READING) {
return WSAEISCONN;
}
if (handle->flags & UV_HANDLE_BIND_ERROR) {
return handle->bind_error;
}
if (!(handle->flags & UV_HANDLE_BOUND)) {
err = uv_tcp_bind(handle, uv_addr_ip4_any_);
if (err)
return err;
}
if (!handle->func_acceptex) {
if (!uv_get_acceptex_function(handle->socket, &handle->func_acceptex)) {
return WSAEAFNOSUPPORT;
}
}
if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
listen(handle->socket, backlog) == SOCKET_ERROR) {
return WSAGetLastError();
}
handle->flags |= UV_HANDLE_LISTENING;
handle->connection_cb = cb;
INCREASE_ACTIVE_COUNT(loop, handle);
simultaneous_accepts = handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT ? 1
: uv_simultaneous_server_accepts;
if(!handle->accept_reqs) {
handle->accept_reqs = (uv_tcp_accept_t*)
malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
if (!handle->accept_reqs) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
for (i = 0; i < simultaneous_accepts; i++) {
req = &handle->accept_reqs[i];
uv_req_init(loop, (uv_req_t*)req);
req->type = UV_ACCEPT;
req->accept_socket = INVALID_SOCKET;
req->data = handle;
req->wait_handle = INVALID_HANDLE_VALUE;
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
} else {
req->event_handle = NULL;
}
uv_tcp_queue_accept(handle, req);
}
for (i = simultaneous_accepts; i < uv_simultaneous_server_accepts; i++) {
req = &handle->accept_reqs[i];
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_ACCEPT;
req->accept_socket = INVALID_SOCKET;
req->data = handle;
req->wait_handle = INVALID_HANDLE_VALUE;
}
}
return 0;
}
int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
uv_loop_t* loop = server->loop;
int err = 0;
int family;
uv_tcp_accept_t* req = server->pending_accepts;
if (!req) {
return WSAEWOULDBLOCK;
}
if (req->accept_socket == INVALID_SOCKET) {
return WSAENOTCONN;
}
if (server->flags & UV_HANDLE_IPV6) {
family = AF_INET6;
} else {
family = AF_INET;
}
err = uv_tcp_set_socket(client->loop,
client,
req->accept_socket,
family,
0);
if (err) {
closesocket(req->accept_socket);
} else {
uv_connection_init((uv_stream_t*) client);
client->flags |= UV_HANDLE_BOUND | UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
}
server->pending_accepts = req->next_pending;
req->next_pending = NULL;
req->accept_socket = INVALID_SOCKET;
if (!(server->flags & UV__HANDLE_CLOSING)) {
if (!(server->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING)) {
uv_tcp_queue_accept(server, req);
} else {
assert(server->flags & UV_HANDLE_TCP_SINGLE_ACCEPT);
server->processed_accepts++;
if (server->processed_accepts >= uv_simultaneous_server_accepts) {
server->processed_accepts = 0;
uv_tcp_queue_accept(server, &server->accept_reqs[0]);
server->flags &= ~UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
server->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
}
}
}
loop->active_tcp_streams++;
return err;
}
int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
uv_loop_t* loop = handle->loop;
handle->flags |= UV_HANDLE_READING;
handle->read_cb = read_cb;
handle->alloc_cb = alloc_cb;
INCREASE_ACTIVE_COUNT(loop, handle);
if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
!handle->read_req.event_handle) {
handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!handle->read_req.event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
}
uv_tcp_queue_read(loop, handle);
}
return 0;
}
int uv__tcp_connect(uv_connect_t* req,
uv_tcp_t* handle,
struct sockaddr_in address,
uv_connect_cb cb) {
uv_loop_t* loop = handle->loop;
int addrsize = sizeof(struct sockaddr_in);
BOOL success;
DWORD bytes;
int err;
if (handle->flags & UV_HANDLE_BIND_ERROR) {
return handle->bind_error;
}
if (!(handle->flags & UV_HANDLE_BOUND)) {
err = uv_tcp_bind(handle, uv_addr_ip4_any_);
if (err)
return err;
}
if (!handle->func_connectex) {
if (!uv_get_connectex_function(handle->socket, &handle->func_connectex)) {
return WSAEAFNOSUPPORT;
}
}
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_CONNECT;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
memset(&req->overlapped, 0, sizeof(req->overlapped));
success = handle->func_connectex(handle->socket,
(struct sockaddr*) &address,
addrsize,
NULL,
0,
&bytes,
&req->overlapped);
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
uv_insert_pending_req(loop, (uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
} else {
return WSAGetLastError();
}
return 0;
}
int uv__tcp_connect6(uv_connect_t* req,
uv_tcp_t* handle,
struct sockaddr_in6 address,
uv_connect_cb cb) {
uv_loop_t* loop = handle->loop;
int addrsize = sizeof(struct sockaddr_in6);
BOOL success;
DWORD bytes;
int err;
if (handle->flags & UV_HANDLE_BIND_ERROR) {
return handle->bind_error;
}
if (!(handle->flags & UV_HANDLE_BOUND)) {
err = uv_tcp_bind6(handle, uv_addr_ip6_any_);
if (err)
return err;
}
if (!handle->func_connectex) {
if (!uv_get_connectex_function(handle->socket, &handle->func_connectex)) {
return WSAEAFNOSUPPORT;
}
}
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_CONNECT;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
memset(&req->overlapped, 0, sizeof(req->overlapped));
success = handle->func_connectex(handle->socket,
(struct sockaddr*) &address,
addrsize,
NULL,
0,
&bytes,
&req->overlapped);
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
uv_insert_pending_req(loop, (uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
} else {
return WSAGetLastError();
}
return 0;
}
int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name,
int* namelen) {
int result;
if (!(handle->flags & UV_HANDLE_BOUND)) {
return UV_EINVAL;
}
if (handle->flags & UV_HANDLE_BIND_ERROR) {
return uv_translate_sys_error(handle->bind_error);
}
result = getsockname(handle->socket, name, namelen);
if (result != 0) {
return uv_translate_sys_error(WSAGetLastError());
}
return 0;
}
int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name,
int* namelen) {
int result;
if (!(handle->flags & UV_HANDLE_BOUND)) {
return UV_EINVAL;
}
if (handle->flags & UV_HANDLE_BIND_ERROR) {
return uv_translate_sys_error(handle->bind_error);
}
result = getpeername(handle->socket, name, namelen);
if (result != 0) {
return uv_translate_sys_error(WSAGetLastError());
}
return 0;
}
int uv_tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle,
uv_buf_t bufs[], int bufcnt, uv_write_cb cb) {
int result;
DWORD bytes;
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_WRITE;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
memset(&req->overlapped, 0, sizeof(req->overlapped));
memset(&(req->overlapped), 0, sizeof(req->overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
req->wait_handle = INVALID_HANDLE_VALUE;
}
result = WSASend(handle->socket,
(WSABUF*)bufs,
bufcnt,
&bytes,
0,
&req->overlapped,
NULL);
if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
req->queued_bytes = 0;
handle->reqs_pending++;
handle->write_reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
uv_insert_pending_req(loop, (uv_req_t*) req);
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
req->queued_bytes = uv_count_bufs(bufs, bufcnt);
handle->reqs_pending++;
handle->write_reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
handle->write_queue_size += req->queued_bytes;
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_write_completion, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
}
} else {
return WSAGetLastError();
}
return 0;
}
void uv_process_tcp_read_req(uv_loop_t* loop, uv_tcp_t* handle,
uv_req_t* req) {
DWORD bytes, flags, err;
uv_buf_t buf;
assert(handle->type == UV_TCP);
handle->flags &= ~UV_HANDLE_READ_PENDING;
if (!REQ_SUCCESS(req)) {
if ((handle->flags & UV_HANDLE_READING) ||
!(handle->flags & UV_HANDLE_ZERO_READ)) {
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);
buf = (handle->flags & UV_HANDLE_ZERO_READ) ?
uv_buf_init(NULL, 0) : handle->read_buffer;
err = GET_REQ_SOCK_ERROR(req);
if (err == WSAECONNABORTED) {
err = WSAECONNRESET;
}
handle->read_cb((uv_stream_t*)handle,
uv_translate_sys_error(err),
buf);
}
} else {
if (!(handle->flags & UV_HANDLE_ZERO_READ)) {
if (req->overlapped.InternalHigh > 0) {
handle->read_cb((uv_stream_t*)handle,
req->overlapped.InternalHigh,
handle->read_buffer);
if (req->overlapped.InternalHigh < handle->read_buffer.len) {
goto done;
}
} else {
if (handle->flags & UV_HANDLE_READING) {
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);
}
handle->flags &= ~UV_HANDLE_READABLE;
buf.base = 0;
buf.len = 0;
handle->read_cb((uv_stream_t*)handle, UV_EOF, handle->read_buffer);
goto done;
}
}
while (handle->flags & UV_HANDLE_READING) {
buf = handle->alloc_cb((uv_handle_t*) handle, 65536);
if (buf.len == 0) {
handle->read_cb(handle, UV_ENOBUFS, buf);
break;
}
assert(buf.base != NULL);
flags = 0;
if (WSARecv(handle->socket,
(WSABUF*)&buf,
1,
&bytes,
&flags,
NULL,
NULL) != SOCKET_ERROR) {
if (bytes > 0) {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
if (bytes < buf.len) {
break;
}
} else {
handle->flags &= ~(UV_HANDLE_READING | UV_HANDLE_READABLE);
DECREASE_ACTIVE_COUNT(loop, handle);
handle->read_cb((uv_stream_t*)handle, UV_EOF, buf);
break;
}
} else {
err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
handle->read_cb((uv_stream_t*)handle, 0, buf);
} else {
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);
if (err == WSAECONNABORTED) {
err = WSAECONNRESET;
}
handle->read_cb((uv_stream_t*)handle,
uv_translate_sys_error(err),
buf);
}
break;
}
}
done:
if ((handle->flags & UV_HANDLE_READING) &&
!(handle->flags & UV_HANDLE_READ_PENDING)) {
uv_tcp_queue_read(loop, handle);
}
}
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv_process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
uv_write_t* req) {
int err;
assert(handle->type == UV_TCP);
assert(handle->write_queue_size >= req->queued_bytes);
handle->write_queue_size -= req->queued_bytes;
UNREGISTER_HANDLE_REQ(loop, handle, req);
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (req->wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(req->wait_handle);
}
if (req->event_handle) {
CloseHandle(req->event_handle);
}
}
if (req->cb) {
err = GET_REQ_SOCK_ERROR(req);
req->cb(req, uv_translate_sys_error(err));
}
handle->write_reqs_pending--;
if (handle->shutdown_req != NULL &&
handle->write_reqs_pending == 0) {
uv_want_endgame(loop, (uv_handle_t*)handle);
}
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv_process_tcp_accept_req(uv_loop_t* loop, uv_tcp_t* handle,
uv_req_t* raw_req) {
uv_tcp_accept_t* req = (uv_tcp_accept_t*) raw_req;
int err;
assert(handle->type == UV_TCP);
if (req->accept_socket == INVALID_SOCKET) {
if (handle->flags & UV_HANDLE_LISTENING) {
handle->flags &= ~UV_HANDLE_LISTENING;
DECREASE_ACTIVE_COUNT(loop, handle);
if (handle->connection_cb) {
err = GET_REQ_SOCK_ERROR(req);
handle->connection_cb((uv_stream_t*)handle,
uv_translate_sys_error(err));
}
}
} else if (REQ_SUCCESS(req) &&
setsockopt(req->accept_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0) {
req->next_pending = handle->pending_accepts;
handle->pending_accepts = req;
if (handle->connection_cb) {
handle->connection_cb((uv_stream_t*)handle, 0);
}
} else {
closesocket(req->accept_socket);
req->accept_socket = INVALID_SOCKET;
if (handle->flags & UV_HANDLE_LISTENING) {
uv_tcp_queue_accept(handle, req);
}
}
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
uv_connect_t* req) {
int err;
assert(handle->type == UV_TCP);
UNREGISTER_HANDLE_REQ(loop, handle, req);
err = 0;
if (REQ_SUCCESS(req)) {
if (setsockopt(handle->socket,
SOL_SOCKET,
SO_UPDATE_CONNECT_CONTEXT,
NULL,
0) == 0) {
uv_connection_init((uv_stream_t*)handle);
handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
loop->active_tcp_streams++;
} else {
err = WSAGetLastError();
}
} else {
err = GET_REQ_SOCK_ERROR(req);
}
req->cb(req, uv_translate_sys_error(err));
DECREASE_PENDING_REQ_COUNT(handle);
}
int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
int tcp_connection) {
int err;
SOCKET socket = WSASocketW(AF_INET,
SOCK_STREAM,
IPPROTO_IP,
socket_protocol_info,
0,
WSA_FLAG_OVERLAPPED);
if (socket == INVALID_SOCKET) {
return WSAGetLastError();
}
if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0)) {
err = GetLastError();
closesocket(socket);
return err;
}
err = uv_tcp_set_socket(tcp->loop,
tcp,
socket,
socket_protocol_info->iAddressFamily,
1);
if (err) {
closesocket(socket);
return err;
}
if (tcp_connection) {
uv_connection_init((uv_stream_t*)tcp);
tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
}
tcp->flags |= UV_HANDLE_BOUND;
tcp->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
tcp->loop->active_tcp_streams++;
return 0;
}
int uv_tcp_nodelay(uv_tcp_t* handle, int enable) {
int err;
if (handle->socket != INVALID_SOCKET) {
err = uv__tcp_nodelay(handle, handle->socket, enable);
if (err)
return err;
}
if (enable) {
handle->flags |= UV_HANDLE_TCP_NODELAY;
} else {
handle->flags &= ~UV_HANDLE_TCP_NODELAY;
}
return 0;
}
int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
int err;
if (handle->socket != INVALID_SOCKET) {
err = uv__tcp_keepalive(handle, handle->socket, enable, delay);
if (err)
return err;
}
if (enable) {
handle->flags |= UV_HANDLE_TCP_KEEPALIVE;
} else {
handle->flags &= ~UV_HANDLE_TCP_KEEPALIVE;
}
return 0;
}
int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
LPWSAPROTOCOL_INFOW protocol_info) {
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
if (!(handle->flags & UV_HANDLE_LISTENING)) {
if (!(handle->flags & UV_HANDLE_BOUND)) {
return ERROR_INVALID_PARAMETER;
}
if (listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
return WSAGetLastError();
}
}
}
if (WSADuplicateSocketW(handle->socket, pid, protocol_info)) {
return WSAGetLastError();
}
handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
return 0;
}
int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
if (handle->flags & UV_HANDLE_CONNECTION) {
return UV_EINVAL;
}
if ((enable && !(handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) ||
(!enable && handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
return 0;
}
if (enable) {
return UV_ENOTSUP;
}
if (handle->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING) {
return 0;
}
handle->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
if (handle->flags & UV_HANDLE_LISTENING) {
handle->flags |= UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
}
return 0;
}
static int uv_tcp_try_cancel_io(uv_tcp_t* tcp) {
SOCKET socket = tcp->socket;
int non_ifs_lsp;
non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 :
uv_tcp_non_ifs_lsp_ipv4;
if (non_ifs_lsp) {
DWORD bytes;
if (WSAIoctl(socket,
SIO_BASE_HANDLE,
NULL,
0,
&socket,
sizeof socket,
&bytes,
NULL,
NULL) != 0) {
return -1;
}
}
assert(socket != 0 && socket != INVALID_SOCKET);
if (!CancelIo((HANDLE) socket)) {
return GetLastError();
}
return 0;
}
void uv_tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) {
int close_socket = 1;
if (tcp->flags & UV_HANDLE_READ_PENDING) {
if (!(tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET)) {
shutdown(tcp->socket, SD_SEND);
} else if (uv_tcp_try_cancel_io(tcp) == 0) {
close_socket = 0;
} else {
}
} else if ((tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
tcp->accept_reqs != NULL) {
if (uv_tcp_try_cancel_io(tcp) != 0) {
unsigned int i;
for (i = 0; i < uv_simultaneous_server_accepts; i++) {
uv_tcp_accept_t* req = &tcp->accept_reqs[i];
if (req->accept_socket != INVALID_SOCKET &&
!HasOverlappedIoCompleted(&req->overlapped)) {
closesocket(req->accept_socket);
req->accept_socket = INVALID_SOCKET;
}
}
}
}
if (tcp->flags & UV_HANDLE_READING) {
tcp->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, tcp);
}
if (tcp->flags & UV_HANDLE_LISTENING) {
tcp->flags &= ~UV_HANDLE_LISTENING;
DECREASE_ACTIVE_COUNT(loop, tcp);
}
if (close_socket) {
closesocket(tcp->socket);
tcp->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
}
tcp->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
uv__handle_closing(tcp);
if (tcp->reqs_pending == 0) {
uv_want_endgame(tcp->loop, (uv_handle_t*)tcp);
}
}
int uv_tcp_open(uv_tcp_t* handle, uv_os_sock_t sock) {
WSAPROTOCOL_INFOW protocol_info;
int opt_len;
int err;
opt_len = (int) sizeof protocol_info;
if (getsockopt(sock,
SOL_SOCKET,
SO_PROTOCOL_INFOW,
(char*) &protocol_info,
&opt_len) == SOCKET_ERROR) {
return uv_translate_sys_error(GetLastError());
}
if (!SetHandleInformation((HANDLE) sock, HANDLE_FLAG_INHERIT, 0)) {
return uv_translate_sys_error(GetLastError());
}
err = uv_tcp_set_socket(handle->loop,
handle,
sock,
protocol_info.iAddressFamily,
1);
if (err) {
return uv_translate_sys_error(err);
}
return 0;
}