This source file includes following definitions.
- uv__open_cloexec
- uv__buf_count
- uv__stream_init
- uv__stream_osx_select
- uv__stream_osx_interrupt_select
- uv__stream_osx_select_cb
- uv__stream_osx_cb_close
- uv__stream_try_select
- uv__stream_open
- uv__stream_destroy
- uv__emfile_trick
- uv__server_io
- uv_accept
- uv_listen
- uv__drain
- uv__write_req_size
- uv__write_req_finish
- uv__handle_fd
- uv__getiovmax
- uv__write
- uv__write_callbacks
- uv__handle_type
- uv__stream_read_cb
- uv__stream_eof
- uv__read
- uv_shutdown
- uv__stream_io
- uv__stream_connect
- uv_write2
- uv_write
- uv__read_start_common
- uv_read_start
- uv_read2_start
- uv_read_stop
- uv_is_readable
- uv_is_writable
- uv___stream_fd
- uv__stream_close
- uv_stream_set_blocking
#include "uv.h"
#include "internal.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <unistd.h>
#include <limits.h>
#if defined(__APPLE__)
# include <sys/event.h>
# include <sys/time.h>
# include <sys/select.h>
typedef struct uv__stream_select_s uv__stream_select_t;
struct uv__stream_select_s {
uv_stream_t* stream;
uv_thread_t thread;
uv_sem_t close_sem;
uv_sem_t async_sem;
uv_async_t async;
int events;
int fake_fd;
int int_fd;
int fd;
};
#endif
static void uv__stream_connect(uv_stream_t*);
static void uv__write(uv_stream_t* stream);
static void uv__read(uv_stream_t* stream);
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
static size_t uv__write_req_size(uv_write_t* req);
static int uv__open_cloexec(const char* path, int flags) {
int err;
int fd;
#if defined(__linux__)
fd = open(path, flags | UV__O_CLOEXEC);
if (fd != -1)
return fd;
if (errno != EINVAL)
return -errno;
#endif
fd = open(path, flags);
if (fd == -1)
return -errno;
err = uv__cloexec(fd, 1);
if (err) {
close(fd);
return err;
}
return fd;
}
static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) {
size_t total = 0;
int i;
for (i = 0; i < bufcnt; i++) {
total += bufs[i].len;
}
return total;
}
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;
uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = NULL;
stream->read2_cb = NULL;
stream->alloc_cb = NULL;
stream->close_cb = NULL;
stream->connection_cb = NULL;
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
stream->write_queue_size = 0;
if (loop->emfile_fd == -1) {
err = uv__open_cloexec("/", O_RDONLY);
if (err >= 0)
loop->emfile_fd = err;
}
#if defined(__APPLE__)
stream->select = NULL;
#endif
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
#if defined(__APPLE__)
static void uv__stream_osx_select(void* arg) {
uv_stream_t* stream;
uv__stream_select_t* s;
char buf[1024];
fd_set sread;
fd_set swrite;
int events;
int fd;
int r;
int max_fd;
stream = arg;
s = stream->select;
fd = s->fd;
if (fd > s->int_fd)
max_fd = fd;
else
max_fd = s->int_fd;
while (1) {
if (uv_sem_trywait(&s->close_sem) == 0)
break;
FD_ZERO(&sread);
FD_ZERO(&swrite);
if (uv_is_readable(stream))
FD_SET(fd, &sread);
if (uv_is_writable(stream))
FD_SET(fd, &swrite);
FD_SET(s->int_fd, &sread);
r = select(max_fd + 1, &sread, &swrite, NULL, NULL);
if (r == -1) {
if (errno == EINTR)
continue;
abort();
}
if (r == 0)
continue;
if (FD_ISSET(s->int_fd, &sread))
while (1) {
r = read(s->int_fd, buf, sizeof(buf));
if (r == sizeof(buf))
continue;
if (r != -1)
break;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
abort();
}
events = 0;
if (FD_ISSET(fd, &sread))
events |= UV__POLLIN;
if (FD_ISSET(fd, &swrite))
events |= UV__POLLOUT;
assert(events != 0 || FD_ISSET(s->int_fd, &sread));
if (events != 0) {
ACCESS_ONCE(int, s->events) = events;
uv_async_send(&s->async);
uv_sem_wait(&s->async_sem);
assert((s->events == 0) || (stream->flags & UV_CLOSING));
}
}
}
static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
uv__stream_select_t* s;
int r;
s = stream->select;
do
r = write(s->fake_fd, "x", 1);
while (r == -1 && errno == EINTR);
assert(r == 1);
}
static void uv__stream_osx_select_cb(uv_async_t* handle, int status) {
uv__stream_select_t* s;
uv_stream_t* stream;
int events;
s = container_of(handle, uv__stream_select_t, async);
stream = s->stream;
events = s->events;
ACCESS_ONCE(int, s->events) = 0;
uv_sem_post(&s->async_sem);
assert(events != 0);
assert(events == (events & (UV__POLLIN | UV__POLLOUT)));
if ((events & UV__POLLIN) && uv__io_active(&stream->io_watcher, UV__POLLIN))
uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLIN);
if ((events & UV__POLLOUT) && uv__io_active(&stream->io_watcher, UV__POLLOUT))
uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLOUT);
}
static void uv__stream_osx_cb_close(uv_handle_t* async) {
uv__stream_select_t* s;
s = container_of(async, uv__stream_select_t, async);
free(s);
}
int uv__stream_try_select(uv_stream_t* stream, int* fd) {
struct kevent filter[1];
struct kevent events[1];
struct timespec timeout;
uv__stream_select_t* s;
int fds[2];
int err;
int ret;
int kq;
kq = kqueue();
if (kq == -1) {
fprintf(stderr, "(libuv) Failed to create kqueue (%d)\n", errno);
return -errno;
}
EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
timeout.tv_sec = 0;
timeout.tv_nsec = 1;
ret = kevent(kq, filter, 1, events, 1, &timeout);
SAVE_ERRNO(close(kq));
if (ret == -1)
return -errno;
if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
return 0;
s = malloc(sizeof(*s));
if (s == NULL)
return -ENOMEM;
s->events = 0;
s->fd = *fd;
err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
if (err) {
free(s);
return err;
}
s->async.flags |= UV__HANDLE_INTERNAL;
uv__handle_unref(&s->async);
if (uv_sem_init(&s->close_sem, 0))
goto fatal1;
if (uv_sem_init(&s->async_sem, 0))
goto fatal2;
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
goto fatal3;
s->fake_fd = fds[0];
s->int_fd = fds[1];
if (uv_thread_create(&s->thread, uv__stream_osx_select, stream))
goto fatal4;
s->stream = stream;
stream->select = s;
*fd = s->fake_fd;
return 0;
fatal4:
close(s->fake_fd);
close(s->int_fd);
s->fake_fd = -1;
s->int_fd = -1;
fatal3:
uv_sem_destroy(&s->async_sem);
fatal2:
uv_sem_destroy(&s->close_sem);
fatal1:
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
return -errno;
}
#endif
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
assert(fd >= 0);
stream->flags |= flags;
if (stream->type == UV_TCP) {
if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
return -errno;
if ((stream->flags & UV_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60))
return -errno;
}
stream->io_watcher.fd = fd;
return 0;
}
void uv__stream_destroy(uv_stream_t* stream) {
uv_write_t* req;
QUEUE* q;
assert(!uv__io_active(&stream->io_watcher, UV__POLLIN | UV__POLLOUT));
assert(stream->flags & UV_CLOSED);
if (stream->connect_req) {
uv__req_unregister(stream->loop, stream->connect_req);
stream->connect_req->cb(stream->connect_req, -ECANCELED);
stream->connect_req = NULL;
}
while (!QUEUE_EMPTY(&stream->write_queue)) {
q = QUEUE_HEAD(&stream->write_queue);
QUEUE_REMOVE(q);
req = QUEUE_DATA(q, uv_write_t, queue);
uv__req_unregister(stream->loop, req);
if (req->bufs != req->bufsml)
free(req->bufs);
req->bufs = NULL;
if (req->cb != NULL)
req->cb(req, -ECANCELED);
}
while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
q = QUEUE_HEAD(&stream->write_completed_queue);
QUEUE_REMOVE(q);
req = QUEUE_DATA(q, uv_write_t, queue);
uv__req_unregister(stream->loop, req);
if (req->bufs != NULL) {
stream->write_queue_size -= uv__write_req_size(req);
if (req->bufs != req->bufsml)
free(req->bufs);
req->bufs = NULL;
}
if (req->cb)
req->cb(req, req->error);
}
if (stream->shutdown_req) {
uv__req_unregister(stream->loop, stream->shutdown_req);
stream->shutdown_req->cb(stream->shutdown_req, -ECANCELED);
stream->shutdown_req = NULL;
}
}
static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
int fd;
if (loop->emfile_fd == -1)
return -EMFILE;
close(loop->emfile_fd);
for (;;) {
fd = uv__accept(accept_fd);
if (fd != -1) {
close(fd);
continue;
}
if (errno == EINTR)
continue;
SAVE_ERRNO(loop->emfile_fd = uv__open_cloexec("/", O_RDONLY));
return -errno;
}
}
#if defined(UV_HAVE_KQUEUE)
# define UV_DEC_BACKLOG(w) w->rcount--;
#else
# define UV_DEC_BACKLOG(w)
#endif
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
int err;
stream = container_of(w, uv_stream_t, io_watcher);
assert(events == UV__POLLIN);
assert(stream->accepted_fd == -1);
assert(!(stream->flags & UV_CLOSING));
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
while (uv__stream_fd(stream) != -1) {
assert(stream->accepted_fd == -1);
#if defined(UV_HAVE_KQUEUE)
if (w->rcount <= 0)
return;
#endif
err = uv__accept(uv__stream_fd(stream));
if (err < 0) {
if (err == -EAGAIN || err == -EWOULDBLOCK)
return;
if (err == -ECONNABORTED)
continue;
if (err == -EMFILE || err == -ENFILE) {
err = uv__emfile_trick(loop, uv__stream_fd(stream));
if (err == -EAGAIN || err == -EWOULDBLOCK)
break;
}
stream->connection_cb(stream, err);
continue;
}
UV_DEC_BACKLOG(w)
stream->accepted_fd = err;
stream->connection_cb(stream, 0);
if (stream->accepted_fd != -1) {
uv__io_stop(loop, &stream->io_watcher, UV__POLLIN);
return;
}
if (stream->type == UV_TCP && (stream->flags & UV_TCP_SINGLE_ACCEPT)) {
struct timespec timeout = { 0, 1 };
nanosleep(&timeout, NULL);
}
}
}
#undef UV_DEC_BACKLOG
int uv_accept(uv_stream_t* server, uv_stream_t* client) {
int err;
assert(server->loop == client->loop);
if (server->accepted_fd == -1)
return -EAGAIN;
switch (client->type) {
case UV_NAMED_PIPE:
case UV_TCP:
err = uv__stream_open(client,
server->accepted_fd,
UV_STREAM_READABLE | UV_STREAM_WRITABLE);
if (err) {
close(server->accepted_fd);
server->accepted_fd = -1;
return err;
}
break;
case UV_UDP:
err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
if (err) {
close(server->accepted_fd);
server->accepted_fd = -1;
return err;
}
break;
default:
assert(0);
}
uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
server->accepted_fd = -1;
return 0;
}
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int err;
err = -EINVAL;
switch (stream->type) {
case UV_TCP:
err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
break;
case UV_NAMED_PIPE:
err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
break;
default:
assert(0);
}
if (err == 0)
uv__handle_start(stream);
return err;
}
static void uv__drain(uv_stream_t* stream) {
uv_shutdown_t* req;
int err;
assert(QUEUE_EMPTY(&stream->write_queue));
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
if ((stream->flags & UV_STREAM_SHUTTING) &&
!(stream->flags & UV_CLOSING) &&
!(stream->flags & UV_STREAM_SHUT)) {
assert(stream->shutdown_req);
req = stream->shutdown_req;
stream->shutdown_req = NULL;
stream->flags &= ~UV_STREAM_SHUTTING;
uv__req_unregister(stream->loop, req);
err = 0;
if (shutdown(uv__stream_fd(stream), SHUT_WR))
err = -errno;
if (err == 0)
stream->flags |= UV_STREAM_SHUT;
if (req->cb != NULL)
req->cb(req, err);
}
}
static size_t uv__write_req_size(uv_write_t* req) {
size_t size;
assert(req->bufs != NULL);
size = uv__buf_count(req->bufs + req->write_index,
req->bufcnt - req->write_index);
assert(req->handle->write_queue_size >= size);
return size;
}
static void uv__write_req_finish(uv_write_t* req) {
uv_stream_t* stream = req->handle;
QUEUE_REMOVE(&req->queue);
if (req->error == 0) {
if (req->bufs != req->bufsml)
free(req->bufs);
req->bufs = NULL;
}
QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
uv__io_feed(stream->loop, &stream->io_watcher);
}
static int uv__handle_fd(uv_handle_t* handle) {
switch (handle->type) {
case UV_NAMED_PIPE:
case UV_TCP:
return ((uv_stream_t*) handle)->io_watcher.fd;
case UV_UDP:
return ((uv_udp_t*) handle)->io_watcher.fd;
default:
return -1;
}
}
static int uv__getiovmax() {
#if defined(IOV_MAX)
return IOV_MAX;
#elif defined(_SC_IOV_MAX)
static int iovmax = -1;
if (iovmax == -1)
iovmax = sysconf(_SC_IOV_MAX);
return iovmax;
#else
return 1024;
#endif
}
static void uv__write(uv_stream_t* stream) {
struct iovec* iov;
QUEUE* q;
uv_write_t* req;
int iovmax;
int iovcnt;
ssize_t n;
start:
assert(uv__stream_fd(stream) >= 0);
if (QUEUE_EMPTY(&stream->write_queue))
return;
q = QUEUE_HEAD(&stream->write_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
assert(req->handle == stream);
assert(sizeof(uv_buf_t) == sizeof(struct iovec));
iov = (struct iovec*) &(req->bufs[req->write_index]);
iovcnt = req->bufcnt - req->write_index;
iovmax = uv__getiovmax();
if (iovcnt > iovmax)
iovcnt = iovmax;
if (req->send_handle) {
struct msghdr msg;
char scratch[64];
struct cmsghdr *cmsg;
int fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
assert(fd_to_send >= 0);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_flags = 0;
msg.msg_control = (void*) scratch;
msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = msg.msg_controllen;
{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
*pi = fd_to_send;
}
do {
n = sendmsg(uv__stream_fd(stream), &msg, 0);
}
while (n == -1 && errno == EINTR);
} else {
do {
if (iovcnt == 1) {
n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
} else {
n = writev(uv__stream_fd(stream), iov, iovcnt);
}
}
while (n == -1 && errno == EINTR);
}
if (n < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
req->error = -errno;
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
if (!uv__io_active(&stream->io_watcher, UV__POLLIN))
uv__handle_stop(stream);
return;
} else if (stream->flags & UV_STREAM_BLOCKING) {
goto start;
}
} else {
while (n >= 0) {
uv_buf_t* buf = &(req->bufs[req->write_index]);
size_t len = buf->len;
assert(req->write_index < req->bufcnt);
if ((size_t)n < len) {
buf->base += n;
buf->len -= n;
stream->write_queue_size -= n;
n = 0;
if (stream->flags & UV_STREAM_BLOCKING) {
goto start;
} else {
break;
}
} else {
req->write_index++;
assert((size_t)n >= len);
n -= len;
assert(stream->write_queue_size >= len);
stream->write_queue_size -= len;
if (req->write_index == req->bufcnt) {
assert(n == 0);
uv__write_req_finish(req);
return;
}
}
}
}
assert(n == 0 || n == -1);
assert(!(stream->flags & UV_STREAM_BLOCKING));
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
}
static void uv__write_callbacks(uv_stream_t* stream) {
uv_write_t* req;
QUEUE* q;
while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
q = QUEUE_HEAD(&stream->write_completed_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
QUEUE_REMOVE(q);
uv__req_unregister(stream->loop, req);
if (req->bufs != NULL) {
stream->write_queue_size -= uv__write_req_size(req);
if (req->bufs != req->bufsml)
free(req->bufs);
req->bufs = NULL;
}
if (req->cb)
req->cb(req, req->error);
}
assert(QUEUE_EMPTY(&stream->write_completed_queue));
if (QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
static uv_handle_type uv__handle_type(int fd) {
struct sockaddr_storage ss;
socklen_t len;
int type;
memset(&ss, 0, sizeof(ss));
len = sizeof(ss);
if (getsockname(fd, (struct sockaddr*)&ss, &len))
return UV_UNKNOWN_HANDLE;
len = sizeof type;
if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
return UV_UNKNOWN_HANDLE;
if (type == SOCK_STREAM) {
switch (ss.ss_family) {
case AF_UNIX:
return UV_NAMED_PIPE;
case AF_INET:
case AF_INET6:
return UV_TCP;
}
}
if (type == SOCK_DGRAM &&
(ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
return UV_UDP;
return UV_UNKNOWN_HANDLE;
}
static void uv__stream_read_cb(uv_stream_t* stream,
int status,
uv_buf_t buf,
uv_handle_type type) {
if (stream->read_cb != NULL)
stream->read_cb(stream, status, buf);
else
stream->read2_cb((uv_pipe_t*) stream, status, buf, type);
}
static void uv__stream_eof(uv_stream_t* stream, uv_buf_t buf) {
stream->flags |= UV_STREAM_READ_EOF;
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
uv__handle_stop(stream);
uv__stream_read_cb(stream, UV_EOF, buf, UV_UNKNOWN_HANDLE);
}
static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
ssize_t nread;
struct msghdr msg;
struct cmsghdr* cmsg;
char cmsg_space[64];
int count;
stream->flags &= ~UV_STREAM_READ_PARTIAL;
count = 32;
while ((stream->read_cb || stream->read2_cb)
&& (stream->flags & UV_STREAM_READING)
&& (count-- > 0)) {
assert(stream->alloc_cb != NULL);
buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024);
if (buf.len == 0) {
uv__stream_read_cb(stream, UV_ENOBUFS, buf, UV_UNKNOWN_HANDLE);
return;
}
assert(buf.base != NULL);
assert(uv__stream_fd(stream) >= 0);
if (stream->read_cb) {
do {
nread = read(uv__stream_fd(stream), buf.base, buf.len);
}
while (nread < 0 && errno == EINTR);
} else {
assert(stream->read2_cb);
msg.msg_flags = 0;
msg.msg_iov = (struct iovec*) &buf;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_controllen = 64;
msg.msg_control = (void*) cmsg_space;
do {
nread = recvmsg(uv__stream_fd(stream), &msg, 0);
}
while (nread < 0 && errno == EINTR);
}
if (nread < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (stream->flags & UV_STREAM_READING) {
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
}
uv__stream_read_cb(stream, 0, buf, UV_UNKNOWN_HANDLE);
} else {
uv__stream_read_cb(stream, -errno, buf, UV_UNKNOWN_HANDLE);
assert(!uv__io_active(&stream->io_watcher, UV__POLLIN) &&
"stream->read_cb(status=-1) did not call uv_close()");
}
return;
} else if (nread == 0) {
uv__stream_eof(stream, buf);
return;
} else {
ssize_t buflen = buf.len;
if (stream->read_cb) {
stream->read_cb(stream, nread, buf);
} else {
assert(stream->read2_cb);
for (cmsg = CMSG_FIRSTHDR(&msg);
msg.msg_controllen > 0 && cmsg != NULL;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_type == SCM_RIGHTS) {
if (stream->accepted_fd != -1) {
fprintf(stderr, "(libuv) ignoring extra FD received\n");
}
{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
stream->accepted_fd = *pi;
}
} else {
fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
cmsg->cmsg_type);
}
}
if (stream->accepted_fd >= 0) {
stream->read2_cb((uv_pipe_t*)stream, nread, buf,
uv__handle_type(stream->accepted_fd));
} else {
stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_UNKNOWN_HANDLE);
}
}
if (nread < buflen) {
stream->flags |= UV_STREAM_READ_PARTIAL;
return;
}
}
}
}
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
"uv_shutdown (unix) only supports uv_handle_t right now");
assert(uv__stream_fd(stream) >= 0);
if (!(stream->flags & UV_STREAM_WRITABLE) ||
stream->flags & UV_STREAM_SHUT ||
stream->flags & UV_CLOSED ||
stream->flags & UV_CLOSING) {
return -ENOTCONN;
}
uv__req_init(stream->loop, req, UV_SHUTDOWN);
req->handle = stream;
req->cb = cb;
stream->shutdown_req = req;
stream->flags |= UV_STREAM_SHUTTING;
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
return 0;
}
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
stream = container_of(w, uv_stream_t, io_watcher);
assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
assert(!(stream->flags & UV_CLOSING));
if (stream->connect_req) {
uv__stream_connect(stream);
return;
}
assert(uv__stream_fd(stream) >= 0);
if (events & (UV__POLLIN | UV__POLLERR))
uv__read(stream);
if (uv__stream_fd(stream) == -1)
return;
if ((events & UV__POLLHUP) &&
(stream->flags & UV_STREAM_READING) &&
(stream->flags & UV_STREAM_READ_PARTIAL) &&
!(stream->flags & UV_STREAM_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, buf);
}
if (uv__stream_fd(stream) == -1)
return;
if (events & (UV__POLLOUT | UV__POLLERR | UV__POLLHUP)) {
uv__write(stream);
uv__write_callbacks(stream);
}
}
static void uv__stream_connect(uv_stream_t* stream) {
int error;
uv_connect_t* req = stream->connect_req;
socklen_t errorsize = sizeof(int);
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
assert(req);
if (stream->delayed_error) {
error = stream->delayed_error;
stream->delayed_error = 0;
} else {
assert(uv__stream_fd(stream) >= 0);
getsockopt(uv__stream_fd(stream),
SOL_SOCKET,
SO_ERROR,
&error,
&errorsize);
error = -error;
}
if (error == -EINPROGRESS)
return;
stream->connect_req = NULL;
uv__req_unregister(stream->loop, req);
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
if (req->cb)
req->cb(req, error);
}
int uv_write2(uv_write_t* req,
uv_stream_t* stream,
uv_buf_t bufs[],
int bufcnt,
uv_stream_t* send_handle,
uv_write_cb cb) {
int empty_queue;
assert(bufcnt > 0);
assert((stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY) &&
"uv_write (unix) does not yet support other types of streams");
if (uv__stream_fd(stream) < 0)
return -EBADF;
if (send_handle) {
if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
return -EINVAL;
if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
return -EBADF;
}
empty_queue = (stream->write_queue_size == 0);
uv__req_init(stream->loop, req, UV_WRITE);
req->cb = cb;
req->handle = stream;
req->error = 0;
req->send_handle = send_handle;
QUEUE_INIT(&req->queue);
if (bufcnt <= (int) ARRAY_SIZE(req->bufsml))
req->bufs = req->bufsml;
else
req->bufs = malloc(sizeof(uv_buf_t) * bufcnt);
memcpy(req->bufs, bufs, bufcnt * sizeof(uv_buf_t));
req->bufcnt = bufcnt;
req->write_index = 0;
stream->write_queue_size += uv__buf_count(bufs, bufcnt);
QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
if (stream->connect_req) {
}
else if (empty_queue) {
uv__write(stream);
}
else {
assert(!(stream->flags & UV_STREAM_BLOCKING));
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
}
return 0;
}
int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
uv_write_cb cb) {
return uv_write2(req, stream, bufs, bufcnt, NULL, cb);
}
static int uv__read_start_common(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb,
uv_read2_cb read2_cb) {
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
if (stream->flags & UV_CLOSING)
return -EINVAL;
stream->flags |= UV_STREAM_READING;
#if defined(__APPLE__)
if (stream->select != NULL)
uv__stream_osx_interrupt_select(stream);
#endif
assert(uv__stream_fd(stream) >= 0);
assert(alloc_cb);
stream->read_cb = read_cb;
stream->read2_cb = read2_cb;
stream->alloc_cb = alloc_cb;
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
uv__handle_start(stream);
return 0;
}
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
return uv__read_start_common(stream, alloc_cb, read_cb, NULL);
}
int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
uv_read2_cb read_cb) {
return uv__read_start_common(stream, alloc_cb, NULL, read_cb);
}
int uv_read_stop(uv_stream_t* stream) {
assert(!uv__io_active(&stream->io_watcher, UV__POLLOUT) ||
!QUEUE_EMPTY(&stream->write_completed_queue) ||
!QUEUE_EMPTY(&stream->write_queue) ||
stream->shutdown_req != NULL ||
stream->connect_req != NULL);
stream->flags &= ~UV_STREAM_READING;
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
uv__handle_stop(stream);
#if defined(__APPLE__)
if (stream->select != NULL)
uv__stream_osx_interrupt_select(stream);
#endif
stream->read_cb = NULL;
stream->read2_cb = NULL;
stream->alloc_cb = NULL;
return 0;
}
int uv_is_readable(const uv_stream_t* stream) {
return stream->flags & UV_STREAM_READABLE;
}
int uv_is_writable(const uv_stream_t* stream) {
return stream->flags & UV_STREAM_WRITABLE;
}
#if defined(__APPLE__)
int uv___stream_fd(uv_stream_t* handle) {
uv__stream_select_t* s;
assert(handle->type == UV_TCP ||
handle->type == UV_TTY ||
handle->type == UV_NAMED_PIPE);
s = handle->select;
if (s != NULL)
return s->fd;
return handle->io_watcher.fd;
}
#endif
void uv__stream_close(uv_stream_t* handle) {
#if defined(__APPLE__)
if (handle->select != NULL) {
uv__stream_select_t* s;
s = handle->select;
uv_sem_post(&s->close_sem);
uv_sem_post(&s->async_sem);
uv__stream_osx_interrupt_select(handle);
uv_thread_join(&s->thread);
uv_sem_destroy(&s->close_sem);
uv_sem_destroy(&s->async_sem);
close(s->fake_fd);
close(s->int_fd);
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
handle->select = NULL;
}
#endif
uv__io_close(handle->loop, &handle->io_watcher);
uv_read_stop(handle);
uv__handle_stop(handle);
close(handle->io_watcher.fd);
handle->io_watcher.fd = -1;
if (handle->accepted_fd >= 0) {
close(handle->accepted_fd);
handle->accepted_fd = -1;
}
assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
}
int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
assert(0 && "implement me");
abort();
return 0;
}