Alien-uv
view release on metacpan or search on metacpan
libuv/src/unix/stream.c view on Meta::CPAN
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "internal.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <unistd.h>
#include <limits.h> /* IOV_MAX */
#if defined(__APPLE__)
# include <sys/event.h>
# include <sys/time.h>
# include <sys/select.h>
/* Forward declaration */
typedef struct uv__stream_select_s uv__stream_select_t;
struct uv__stream_select_s {
uv_stream_t* stream;
uv_thread_t thread;
uv_sem_t close_sem;
uv_sem_t async_sem;
uv_async_t async;
int events;
int fake_fd;
int int_fd;
int fd;
fd_set* sread;
size_t sread_sz;
fd_set* swrite;
size_t swrite_sz;
};
/* Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
* EPROTOTYPE can be returned while trying to write to a socket that is
* shutting down. If we retry the write, we should get the expected EPIPE
* instead.
*/
# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE)
# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
(errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \
(errno == EMSGSIZE && send_handle != NULL))
#else
# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR)
# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
(errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
#endif /* defined(__APPLE__) */
static void uv__stream_connect(uv_stream_t*);
static void uv__write(uv_stream_t* stream);
static void uv__read(uv_stream_t* stream);
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
static void uv__write_callbacks(uv_stream_t* stream);
static size_t uv__write_req_size(uv_write_t* req);
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;
uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
stream->close_cb = NULL;
stream->connection_cb = NULL;
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
stream->write_queue_size = 0;
if (loop->emfile_fd == -1) {
err = uv__open_cloexec("/dev/null", O_RDONLY);
if (err < 0)
/* In the rare case that "/dev/null" isn't mounted open "/"
* instead.
*/
err = uv__open_cloexec("/", O_RDONLY);
if (err >= 0)
loop->emfile_fd = err;
}
#if defined(__APPLE__)
stream->select = NULL;
#endif /* defined(__APPLE_) */
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
#if defined(__APPLE__)
/* Notify select() thread about state change */
uv__stream_select_t* s;
int r;
s = stream->select;
if (s == NULL)
return;
/* Interrupt select() loop
* NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
* emit read event on other side
*/
do
r = write(s->fake_fd, "x", 1);
while (r == -1 && errno == EINTR);
assert(r == 1);
#else /* !defined(__APPLE__) */
/* No-op on any other platform */
#endif /* !defined(__APPLE__) */
}
#if defined(__APPLE__)
static void uv__stream_osx_select(void* arg) {
uv_stream_t* stream;
uv__stream_select_t* s;
char buf[1024];
int events;
int fd;
int r;
int max_fd;
stream = arg;
s = stream->select;
fd = s->fd;
if (fd > s->int_fd)
max_fd = fd;
else
max_fd = s->int_fd;
while (1) {
/* Terminate on semaphore */
if (uv_sem_trywait(&s->close_sem) == 0)
break;
/* Watch fd using select(2) */
memset(s->sread, 0, s->sread_sz);
memset(s->swrite, 0, s->swrite_sz);
if (uv__io_active(&stream->io_watcher, POLLIN))
FD_SET(fd, s->sread);
if (uv__io_active(&stream->io_watcher, POLLOUT))
FD_SET(fd, s->swrite);
FD_SET(s->int_fd, s->sread);
/* Wait indefinitely for fd events */
r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
if (r == -1) {
if (errno == EINTR)
continue;
/* XXX: Possible?! */
abort();
}
/* Ignore timeouts */
if (r == 0)
continue;
/* Empty socketpair's buffer in case of interruption */
if (FD_ISSET(s->int_fd, s->sread))
while (1) {
r = read(s->int_fd, buf, sizeof(buf));
if (r == sizeof(buf))
continue;
if (r != -1)
break;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
abort();
}
/* Handle events */
events = 0;
if (FD_ISSET(fd, s->sread))
events |= POLLIN;
if (FD_ISSET(fd, s->swrite))
events |= POLLOUT;
assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
if (events != 0) {
ACCESS_ONCE(int, s->events) = events;
uv_async_send(&s->async);
uv_sem_wait(&s->async_sem);
/* Should be processed at this stage */
assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
}
}
}
static void uv__stream_osx_select_cb(uv_async_t* handle) {
uv__stream_select_t* s;
uv_stream_t* stream;
int events;
s = container_of(handle, uv__stream_select_t, async);
stream = s->stream;
/* Get and reset stream's events */
events = s->events;
ACCESS_ONCE(int, s->events) = 0;
assert(events != 0);
assert(events == (events & (POLLIN | POLLOUT)));
/* Invoke callback on event-loop */
if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
if (stream->flags & UV_HANDLE_CLOSING)
return;
/* NOTE: It is important to do it here, otherwise `select()` might be called
* before the actual `uv__read()`, leading to the blocking syscall
*/
uv_sem_post(&s->async_sem);
}
static void uv__stream_osx_cb_close(uv_handle_t* async) {
uv__stream_select_t* s;
s = container_of(async, uv__stream_select_t, async);
uv__free(s);
}
int uv__stream_try_select(uv_stream_t* stream, int* fd) {
/*
* kqueue doesn't work with some files from /dev mount on osx.
* select(2) in separate thread for those fds
*/
struct kevent filter[1];
struct kevent events[1];
struct timespec timeout;
uv__stream_select_t* s;
int fds[2];
int err;
int ret;
int kq;
int old_fd;
int max_fd;
size_t sread_sz;
size_t swrite_sz;
kq = kqueue();
if (kq == -1) {
perror("(libuv) kqueue()");
return UV__ERR(errno);
}
EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
/* Use small timeout, because we only want to capture EINVALs */
timeout.tv_sec = 0;
timeout.tv_nsec = 1;
do
ret = kevent(kq, filter, 1, events, 1, &timeout);
while (ret == -1 && errno == EINTR);
uv__close(kq);
if (ret == -1)
return UV__ERR(errno);
if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
return 0;
/* At this point we definitely know that this fd won't work with kqueue */
/*
* Create fds for io watcher and to interrupt the select() loop.
* NOTE: do it ahead of malloc below to allocate enough space for fd_sets
*/
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
return UV__ERR(errno);
max_fd = *fd;
if (fds[1] > max_fd)
max_fd = fds[1];
sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
swrite_sz = sread_sz;
s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
if (s == NULL) {
err = UV_ENOMEM;
goto failed_malloc;
}
s->events = 0;
s->fd = *fd;
s->sread = (fd_set*) ((char*) s + sizeof(*s));
s->sread_sz = sread_sz;
s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
s->swrite_sz = swrite_sz;
err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
if (err)
goto failed_async_init;
s->async.flags |= UV_HANDLE_INTERNAL;
uv__handle_unref(&s->async);
err = uv_sem_init(&s->close_sem, 0);
if (err != 0)
goto failed_close_sem_init;
err = uv_sem_init(&s->async_sem, 0);
if (err != 0)
goto failed_async_sem_init;
s->fake_fd = fds[0];
s->int_fd = fds[1];
old_fd = *fd;
s->stream = stream;
stream->select = s;
*fd = s->fake_fd;
err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
if (err != 0)
goto failed_thread_create;
return 0;
failed_thread_create:
s->stream = NULL;
stream->select = NULL;
*fd = old_fd;
uv_sem_destroy(&s->async_sem);
failed_async_sem_init:
uv_sem_destroy(&s->close_sem);
failed_close_sem_init:
uv__close(fds[0]);
uv__close(fds[1]);
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
return err;
failed_async_init:
uv__free(s);
failed_malloc:
uv__close(fds[0]);
uv__close(fds[1]);
return err;
}
#endif /* defined(__APPLE__) */
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
#if defined(__APPLE__)
int enable;
#endif
if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
return UV_EBUSY;
assert(fd >= 0);
stream->flags |= flags;
if (stream->type == UV_TCP) {
if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
return UV__ERR(errno);
/* TODO Use delay the user passed in. */
if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
uv__tcp_keepalive(fd, 1, 60)) {
return UV__ERR(errno);
}
}
#if defined(__APPLE__)
enable = 1;
if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
errno != ENOTSOCK &&
errno != EINVAL) {
return UV__ERR(errno);
}
#endif
stream->io_watcher.fd = fd;
return 0;
}
libuv/src/unix/stream.c view on Meta::CPAN
done:
/* Process queued fds */
if (server->queued_fds != NULL) {
uv__stream_queued_fds_t* queued_fds;
queued_fds = server->queued_fds;
/* Read first */
server->accepted_fd = queued_fds->fds[0];
/* All read, free */
assert(queued_fds->offset > 0);
if (--queued_fds->offset == 0) {
uv__free(queued_fds);
server->queued_fds = NULL;
} else {
/* Shift rest */
memmove(queued_fds->fds,
queued_fds->fds + 1,
queued_fds->offset * sizeof(*queued_fds->fds));
}
} else {
server->accepted_fd = -1;
if (err == 0)
uv__io_start(server->loop, &server->io_watcher, POLLIN);
}
return err;
}
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int err;
switch (stream->type) {
case UV_TCP:
err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
break;
case UV_NAMED_PIPE:
err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
break;
default:
err = UV_EINVAL;
}
if (err == 0)
uv__handle_start(stream);
return err;
}
static void uv__drain(uv_stream_t* stream) {
uv_shutdown_t* req;
int err;
assert(QUEUE_EMPTY(&stream->write_queue));
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
/* Shutdown? */
if ((stream->flags & UV_HANDLE_SHUTTING) &&
!(stream->flags & UV_HANDLE_CLOSING) &&
!(stream->flags & UV_HANDLE_SHUT)) {
assert(stream->shutdown_req);
req = stream->shutdown_req;
stream->shutdown_req = NULL;
stream->flags &= ~UV_HANDLE_SHUTTING;
uv__req_unregister(stream->loop, req);
err = 0;
if (shutdown(uv__stream_fd(stream), SHUT_WR))
err = UV__ERR(errno);
if (err == 0)
stream->flags |= UV_HANDLE_SHUT;
if (req->cb != NULL)
req->cb(req, err);
}
}
static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
if (n == 1)
return write(fd, vec->iov_base, vec->iov_len);
else
return writev(fd, vec, n);
}
static size_t uv__write_req_size(uv_write_t* req) {
size_t size;
assert(req->bufs != NULL);
size = uv__count_bufs(req->bufs + req->write_index,
req->nbufs - req->write_index);
assert(req->handle->write_queue_size >= size);
return size;
}
/* Returns 1 if all write request data has been written, or 0 if there is still
* more data to write.
*
* Note: the return value only says something about the *current* request.
* There may still be other write requests sitting in the queue.
*/
static int uv__write_req_update(uv_stream_t* stream,
uv_write_t* req,
size_t n) {
uv_buf_t* buf;
size_t len;
assert(n <= stream->write_queue_size);
stream->write_queue_size -= n;
libuv/src/unix/stream.c view on Meta::CPAN
goto error;
}
fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
memset(&scratch, 0, sizeof(scratch));
assert(fd_to_send >= 0);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_flags = 0;
msg.msg_control = &scratch.alias;
msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
/* silence aliasing warning */
{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
*pi = fd_to_send;
}
do
n = sendmsg(uv__stream_fd(stream), &msg, 0);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
/* Ensure the handle isn't sent again in case this is a partial write. */
if (n >= 0)
req->send_handle = NULL;
} else {
do
n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
}
if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
err = UV__ERR(errno);
goto error;
}
if (n >= 0 && uv__write_req_update(stream, req, n)) {
uv__write_req_finish(req);
return; /* TODO(bnoordhuis) Start trying to write the next request. */
}
/* If this is a blocking stream, try again. */
if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
goto start;
/* We're not done. */
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
/* Notify select() thread about state change */
uv__stream_osx_interrupt_select(stream);
return;
error:
req->error = err;
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
if (!uv__io_active(&stream->io_watcher, POLLIN))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
static void uv__write_callbacks(uv_stream_t* stream) {
uv_write_t* req;
QUEUE* q;
QUEUE pq;
if (QUEUE_EMPTY(&stream->write_completed_queue))
return;
QUEUE_MOVE(&stream->write_completed_queue, &pq);
while (!QUEUE_EMPTY(&pq)) {
/* Pop a req off write_completed_queue. */
q = QUEUE_HEAD(&pq);
req = QUEUE_DATA(q, uv_write_t, queue);
QUEUE_REMOVE(q);
uv__req_unregister(stream->loop, req);
if (req->bufs != NULL) {
stream->write_queue_size -= uv__write_req_size(req);
if (req->bufs != req->bufsml)
uv__free(req->bufs);
req->bufs = NULL;
}
/* NOTE: call callback AFTER freeing the request data. */
if (req->cb)
req->cb(req, req->error);
}
}
uv_handle_type uv__handle_type(int fd) {
struct sockaddr_storage ss;
socklen_t sslen;
socklen_t len;
int type;
memset(&ss, 0, sizeof(ss));
sslen = sizeof(ss);
if (getsockname(fd, (struct sockaddr*)&ss, &sslen))
return UV_UNKNOWN_HANDLE;
len = sizeof type;
if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
return UV_UNKNOWN_HANDLE;
if (type == SOCK_STREAM) {
#if defined(_AIX) || defined(__DragonFly__)
/* on AIX/DragonFly the getsockname call returns an empty sa structure
* for sockets of type AF_UNIX. For all other types it will
* return a properly filled in structure.
*/
if (sslen == 0)
return UV_NAMED_PIPE;
#endif
switch (ss.ss_family) {
case AF_UNIX:
return UV_NAMED_PIPE;
case AF_INET:
case AF_INET6:
return UV_TCP;
}
}
if (type == SOCK_DGRAM &&
(ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
return UV_UDP;
return UV_UNKNOWN_HANDLE;
}
static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
stream->flags |= UV_HANDLE_READ_EOF;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
stream->read_cb(stream, UV_EOF, buf);
stream->flags &= ~UV_HANDLE_READING;
}
static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
uv__stream_queued_fds_t* queued_fds;
unsigned int queue_size;
queued_fds = stream->queued_fds;
if (queued_fds == NULL) {
queue_size = 8;
queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
sizeof(*queued_fds));
if (queued_fds == NULL)
return UV_ENOMEM;
queued_fds->size = queue_size;
queued_fds->offset = 0;
stream->queued_fds = queued_fds;
/* Grow */
} else if (queued_fds->size == queued_fds->offset) {
queue_size = queued_fds->size + 8;
queued_fds = uv__realloc(queued_fds,
(queue_size - 1) * sizeof(*queued_fds->fds) +
sizeof(*queued_fds));
/*
* Allocation failure, report back.
* NOTE: if it is fatal - sockets will be closed in uv__stream_close
*/
if (queued_fds == NULL)
return UV_ENOMEM;
queued_fds->size = queue_size;
stream->queued_fds = queued_fds;
}
/* Put fd in a queue */
queued_fds->fds[queued_fds->offset++] = fd;
return 0;
}
#define UV__CMSG_FD_COUNT 64
#define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
struct cmsghdr* cmsg;
for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
char* start;
char* end;
int err;
void* pv;
int* pi;
unsigned int i;
unsigned int count;
libuv/src/unix/stream.c view on Meta::CPAN
int count;
int err;
int is_ipc;
stream->flags &= ~UV_HANDLE_READ_PARTIAL;
/* Prevent loop starvation when the data comes in as fast as (or faster than)
* we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
*/
count = 32;
is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
/* XXX: Maybe instead of having UV_HANDLE_READING we just test if
* tcp->read_cb is NULL or not?
*/
while (stream->read_cb
&& (stream->flags & UV_HANDLE_READING)
&& (count-- > 0)) {
assert(stream->alloc_cb != NULL);
buf = uv_buf_init(NULL, 0);
stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
if (buf.base == NULL || buf.len == 0) {
/* User indicates it can't or won't handle the read. */
stream->read_cb(stream, UV_ENOBUFS, &buf);
return;
}
assert(buf.base != NULL);
assert(uv__stream_fd(stream) >= 0);
if (!is_ipc) {
do {
nread = read(uv__stream_fd(stream), buf.base, buf.len);
}
while (nread < 0 && errno == EINTR);
} else {
/* ipc uses recvmsg */
msg.msg_flags = 0;
msg.msg_iov = (struct iovec*) &buf;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
/* Set up to receive a descriptor even if one isn't in the message */
msg.msg_controllen = sizeof(cmsg_space);
msg.msg_control = cmsg_space;
do {
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
}
while (nread < 0 && errno == EINTR);
}
if (nread < 0) {
/* Error */
if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* Wait for the next one. */
if (stream->flags & UV_HANDLE_READING) {
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__stream_osx_interrupt_select(stream);
}
stream->read_cb(stream, 0, &buf);
#if defined(__CYGWIN__) || defined(__MSYS__)
} else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
uv__stream_eof(stream, &buf);
return;
#endif
} else {
/* Error. User should call uv_close(). */
stream->read_cb(stream, UV__ERR(errno), &buf);
if (stream->flags & UV_HANDLE_READING) {
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
}
return;
} else if (nread == 0) {
uv__stream_eof(stream, &buf);
return;
} else {
/* Successful read */
ssize_t buflen = buf.len;
if (is_ipc) {
err = uv__stream_recv_cmsg(stream, &msg);
if (err != 0) {
stream->read_cb(stream, err, &buf);
return;
}
}
#if defined(__MVS__)
if (is_ipc && msg.msg_controllen > 0) {
uv_buf_t blankbuf;
int nread;
struct iovec *old;
blankbuf.base = 0;
blankbuf.len = 0;
old = msg.msg_iov;
msg.msg_iov = (struct iovec*) &blankbuf;
nread = 0;
do {
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
err = uv__stream_recv_cmsg(stream, &msg);
if (err != 0) {
stream->read_cb(stream, err, &buf);
msg.msg_iov = old;
return;
}
} while (nread == 0 && msg.msg_controllen > 0);
msg.msg_iov = old;
}
#endif
stream->read_cb(stream, nread, &buf);
/* Return if we didn't fill the buffer, there is no more data to read. */
if (nread < buflen) {
stream->flags |= UV_HANDLE_READ_PARTIAL;
return;
}
}
}
}
#ifdef __clang__
# pragma clang diagnostic pop
#endif
#undef UV__CMSG_FD_COUNT
#undef UV__CMSG_FD_SIZE
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
assert(stream->type == UV_TCP ||
stream->type == UV_TTY ||
stream->type == UV_NAMED_PIPE);
if (!(stream->flags & UV_HANDLE_WRITABLE) ||
stream->flags & UV_HANDLE_SHUT ||
stream->flags & UV_HANDLE_SHUTTING ||
uv__is_closing(stream)) {
return UV_ENOTCONN;
}
assert(uv__stream_fd(stream) >= 0);
/* Initialize request */
uv__req_init(stream->loop, req, UV_SHUTDOWN);
req->handle = stream;
req->cb = cb;
stream->shutdown_req = req;
stream->flags |= UV_HANDLE_SHUTTING;
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
return 0;
}
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
stream = container_of(w, uv_stream_t, io_watcher);
assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
assert(!(stream->flags & UV_HANDLE_CLOSING));
if (stream->connect_req) {
uv__stream_connect(stream);
return;
}
assert(uv__stream_fd(stream) >= 0);
/* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
if (events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream);
if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */
/* Short-circuit iff POLLHUP is set, the user is still interested in read
* events and uv__read() reported a partial read but not EOF. If the EOF
* flag is set, uv__read() called read_cb with err=UV_EOF and we don't
* have to do anything. If the partial read flag is not set, we can't
* report the EOF yet because there is still data to read.
*/
if ((events & POLLHUP) &&
(stream->flags & UV_HANDLE_READING) &&
(stream->flags & UV_HANDLE_READ_PARTIAL) &&
!(stream->flags & UV_HANDLE_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, &buf);
}
if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */
if (events & (POLLOUT | POLLERR | POLLHUP)) {
uv__write(stream);
uv__write_callbacks(stream);
/* Write queue drained. */
if (QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
}
/**
* We get called here from directly following a call to connect(2).
* In order to determine if we've errored out or succeeded must call
libuv/src/unix/stream.c view on Meta::CPAN
*/
if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
return UV_EBADF;
#if defined(__CYGWIN__) || defined(__MSYS__)
/* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
return UV_ENOSYS;
#endif
}
/* It's legal for write_queue_size > 0 even when the write_queue is empty;
* it means there are error-state requests in the write_completed_queue that
* will touch up write_queue_size later, see also uv__write_req_finish().
* We could check that write_queue is empty instead but that implies making
* a write() syscall when we know that the handle is in error mode.
*/
empty_queue = (stream->write_queue_size == 0);
/* Initialize the req */
uv__req_init(stream->loop, req, UV_WRITE);
req->cb = cb;
req->handle = stream;
req->error = 0;
req->send_handle = send_handle;
QUEUE_INIT(&req->queue);
req->bufs = req->bufsml;
if (nbufs > ARRAY_SIZE(req->bufsml))
req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
if (req->bufs == NULL)
return UV_ENOMEM;
memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
req->nbufs = nbufs;
req->write_index = 0;
stream->write_queue_size += uv__count_bufs(bufs, nbufs);
/* Append the request to write_queue. */
QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
/* If the queue was empty when this function began, we should attempt to
* do the write immediately. Otherwise start the write_watcher and wait
* for the fd to become writable.
*/
if (stream->connect_req) {
/* Still connecting, do nothing. */
}
else if (empty_queue) {
uv__write(stream);
}
else {
/*
* blocking streams should never have anything in the queue.
* if this assert fires then somehow the blocking stream isn't being
* sufficiently flushed in uv__write.
*/
assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
return 0;
}
/* The buffers to be written must remain valid until the callback is called.
* This is not required for the uv_buf_t array.
*/
int uv_write(uv_write_t* req,
uv_stream_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_write_cb cb) {
return uv_write2(req, handle, bufs, nbufs, NULL, cb);
}
void uv_try_write_cb(uv_write_t* req, int status) {
/* Should not be called */
abort();
}
int uv_try_write(uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs) {
int r;
int has_pollout;
size_t written;
size_t req_size;
uv_write_t req;
/* Connecting or already writing some data */
if (stream->connect_req != NULL || stream->write_queue_size != 0)
return UV_EAGAIN;
has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
if (r != 0)
return r;
/* Remove not written bytes from write queue size */
written = uv__count_bufs(bufs, nbufs);
if (req.bufs != NULL)
req_size = uv__write_req_size(&req);
else
req_size = 0;
written -= req_size;
stream->write_queue_size -= req_size;
/* Unqueue request, regardless of immediateness */
QUEUE_REMOVE(&req.queue);
uv__req_unregister(stream->loop, &req);
if (req.bufs != req.bufsml)
uv__free(req.bufs);
req.bufs = NULL;
/* Do not poll for writable, if we wasn't before calling this */
if (!has_pollout) {
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
if (written == 0 && req_size != 0)
return UV_EAGAIN;
else
return written;
}
int uv_read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
if (stream->flags & UV_HANDLE_CLOSING)
return UV_EINVAL;
if (!(stream->flags & UV_HANDLE_READABLE))
return -ENOTCONN;
/* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
* expresses the desired state of the user.
*/
stream->flags |= UV_HANDLE_READING;
/* TODO: try to do the read inline? */
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
* not start the IO watcher.
*/
assert(uv__stream_fd(stream) >= 0);
assert(alloc_cb);
stream->read_cb = read_cb;
stream->alloc_cb = alloc_cb;
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
uv__handle_start(stream);
uv__stream_osx_interrupt_select(stream);
return 0;
}
int uv_read_stop(uv_stream_t* stream) {
if (!(stream->flags & UV_HANDLE_READING))
return 0;
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
return 0;
}
int uv_is_readable(const uv_stream_t* stream) {
return !!(stream->flags & UV_HANDLE_READABLE);
}
int uv_is_writable(const uv_stream_t* stream) {
return !!(stream->flags & UV_HANDLE_WRITABLE);
}
#if defined(__APPLE__)
int uv___stream_fd(const uv_stream_t* handle) {
const uv__stream_select_t* s;
assert(handle->type == UV_TCP ||
handle->type == UV_TTY ||
handle->type == UV_NAMED_PIPE);
s = handle->select;
if (s != NULL)
return s->fd;
return handle->io_watcher.fd;
}
#endif /* defined(__APPLE__) */
void uv__stream_close(uv_stream_t* handle) {
unsigned int i;
uv__stream_queued_fds_t* queued_fds;
#if defined(__APPLE__)
/* Terminate select loop first */
if (handle->select != NULL) {
uv__stream_select_t* s;
s = handle->select;
uv_sem_post(&s->close_sem);
uv_sem_post(&s->async_sem);
uv__stream_osx_interrupt_select(handle);
uv_thread_join(&s->thread);
uv_sem_destroy(&s->close_sem);
uv_sem_destroy(&s->async_sem);
uv__close(s->fake_fd);
uv__close(s->int_fd);
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
handle->select = NULL;
}
#endif /* defined(__APPLE__) */
uv__io_close(handle->loop, &handle->io_watcher);
uv_read_stop(handle);
uv__handle_stop(handle);
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if (handle->io_watcher.fd != -1) {
/* Don't close stdio file descriptors. Nothing good comes from it. */
if (handle->io_watcher.fd > STDERR_FILENO)
uv__close(handle->io_watcher.fd);
handle->io_watcher.fd = -1;
}
if (handle->accepted_fd != -1) {
uv__close(handle->accepted_fd);
handle->accepted_fd = -1;
}
/* Close all queued fds */
if (handle->queued_fds != NULL) {
queued_fds = handle->queued_fds;
for (i = 0; i < queued_fds->offset; i++)
uv__close(queued_fds->fds[i]);
uv__free(handle->queued_fds);
handle->queued_fds = NULL;
}
assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
}
int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
/* Don't need to check the file descriptor, uv__nonblock()
* will fail with EBADF if it's not valid.
*/
return uv__nonblock(uv__stream_fd(handle), !blocking);
}
( run in 1.431 second using v1.01-cache-2.11-cpan-fa01517f264 )