Alien-uv
view release on metacpan or search on metacpan
libuv/src/win/pipe.c view on Meta::CPAN
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include <assert.h>
#include <io.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "handle-inl.h"
#include "internal.h"
#include "req-inl.h"
#include "stream-inl.h"
#include "uv-common.h"
#include "uv.h"
#include <aclapi.h>
#include <accctrl.h>
/* A zero-size buffer for use by uv_pipe_read */
static char uv_zero_[] = "";
/* Null uv_buf_t */
static const uv_buf_t uv_null_buf_ = { 0, NULL };
/* The timeout that the pipe will wait for the remote end to write data when
* the local ends wants to shut it down. */
static const int64_t eof_timeout = 50; /* ms */
static const int default_pending_pipe_instances = 4;
/* Pipe prefix */
static char pipe_prefix[] = "\\\\?\\pipe";
static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
/* IPC incoming xfer queue item. */
typedef struct {
uv__ipc_socket_xfer_type_t xfer_type;
uv__ipc_socket_xfer_info_t xfer_info;
QUEUE member;
} uv__ipc_xfer_queue_item_t;
/* IPC frame header flags. */
/* clang-format off */
enum {
UV__IPC_FRAME_HAS_DATA = 0x01,
UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02,
UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04,
/* These are combinations of the flags above. */
UV__IPC_FRAME_XFER_FLAGS = 0x06,
UV__IPC_FRAME_VALID_FLAGS = 0x07
};
/* clang-format on */
/* IPC frame header. */
typedef struct {
uint32_t flags;
uint32_t reserved1; /* Ignored. */
uint32_t data_length; /* Must be zero if there is no data. */
uint32_t reserved2; /* Must be zero. */
} uv__ipc_frame_header_t;
/* To implement the IPC protocol correctly, these structures must have exactly
* the right size. */
STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
/* Coalesced write request. */
typedef struct {
uv_write_t req; /* Internal heap-allocated write request. */
uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
} uv__coalesced_write_t;
static void eof_timer_init(uv_pipe_t* pipe);
static void eof_timer_start(uv_pipe_t* pipe);
static void eof_timer_stop(uv_pipe_t* pipe);
static void eof_timer_cb(uv_timer_t* timer);
static void eof_timer_destroy(uv_pipe_t* pipe);
static void eof_timer_close_cb(uv_handle_t* handle);
static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
}
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
handle->reqs_pending = 0;
handle->handle = INVALID_HANDLE_VALUE;
handle->name = NULL;
handle->pipe.conn.ipc_remote_pid = 0;
handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
handle->pipe.conn.ipc_xfer_queue_length = 0;
handle->ipc = ipc;
handle->pipe.conn.non_overlapped_writes_tail = NULL;
return 0;
}
static void uv_pipe_connection_init(uv_pipe_t* handle) {
uv_connection_init((uv_stream_t*) handle);
handle->read_req.data = handle;
handle->pipe.conn.eof_timer = NULL;
assert(!(handle->flags & UV_HANDLE_PIPESERVER));
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
handle->pipe.conn.readfile_thread_handle = NULL;
InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
}
}
static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
HANDLE pipeHandle;
/*
* Assume that we have a duplex pipe first, so attempt to
* connect with GENERIC_READ | GENERIC_WRITE.
*/
pipeHandle = CreateFileW(name,
GENERIC_READ | GENERIC_WRITE,
0,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
*duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
return pipeHandle;
}
/*
* If the pipe is not duplex CreateFileW fails with
* ERROR_ACCESS_DENIED. In that case try to connect
* as a read-only or write-only.
*/
if (GetLastError() == ERROR_ACCESS_DENIED) {
pipeHandle = CreateFileW(name,
GENERIC_READ | FILE_WRITE_ATTRIBUTES,
0,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
*duplex_flags = UV_HANDLE_READABLE;
return pipeHandle;
}
}
libuv/src/win/pipe.c view on Meta::CPAN
NULL,
&req->u.io.overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
return GetLastError();
}
if (result) {
/* Request completed immediately. */
req->u.io.queued_bytes = 0;
} else {
/* Request queued by the kernel. */
req->u.io.queued_bytes = write_buf.len;
handle->write_queue_size += req->u.io.queued_bytes;
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->u.io.overlapped.hEvent, post_completion_write_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
return GetLastError();
}
}
}
REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
return 0;
}
static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
/* If the both ends of the IPC pipe are owned by the same process,
* the remote end pid may not yet be set. If so, do it here.
* TODO: this is weird; it'd probably better to use a handshake. */
if (*pid == 0)
*pid = GetCurrentProcessId();
return *pid;
}
int uv__pipe_write_ipc(uv_loop_t* loop,
uv_write_t* req,
uv_pipe_t* handle,
const uv_buf_t data_bufs[],
size_t data_buf_count,
uv_stream_t* send_handle,
uv_write_cb cb) {
uv_buf_t stack_bufs[6];
uv_buf_t* bufs;
size_t buf_count, buf_index;
uv__ipc_frame_header_t frame_header;
uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
uv__ipc_socket_xfer_info_t xfer_info;
uint64_t data_length;
size_t i;
int err;
/* Compute the combined size of data buffers. */
data_length = 0;
for (i = 0; i < data_buf_count; i++)
data_length += data_bufs[i].len;
if (data_length > UINT32_MAX)
return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
/* Prepare the frame's socket xfer payload. */
if (send_handle != NULL) {
uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
/* Verify that `send_handle` it is indeed a tcp handle. */
if (send_tcp_handle->type != UV_TCP)
return ERROR_NOT_SUPPORTED;
/* Export the tcp handle. */
err = uv__tcp_xfer_export(send_tcp_handle,
uv__pipe_get_ipc_remote_pid(handle),
&xfer_type,
&xfer_info);
if (err != 0)
return err;
}
/* Compute the number of uv_buf_t's required. */
buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
if (send_handle != NULL)
buf_count += 1; /* One extra for the socket xfer information. */
/* Use the on-stack buffer array if it is big enough; otherwise allocate
* space for it on the heap. */
if (buf_count < ARRAY_SIZE(stack_bufs)) {
/* Use on-stack buffer array. */
bufs = stack_bufs;
} else {
/* Use heap-allocated buffer array. */
bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
if (bufs == NULL)
return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
}
buf_index = 0;
/* Initialize frame header and add it to the buffers list. */
memset(&frame_header, 0, sizeof frame_header);
bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
if (send_handle != NULL) {
/* Add frame header flags. */
switch (xfer_type) {
case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
break;
case UV__IPC_SOCKET_XFER_TCP_SERVER:
frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
break;
default:
assert(0); /* Unreachable. */
}
/* Add xfer info buffer. */
bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
}
if (data_length > 0) {
/* Update frame header. */
frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
frame_header.data_length = (uint32_t) data_length;
/* Add data buffers to buffers list. */
for (i = 0; i < data_buf_count; i++)
bufs[buf_index++] = data_bufs[i];
}
/* Write buffers. We set the `always_copy` flag, so it is not a problem that
* some of the written data lives on the stack. */
err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
/* If we had to heap-allocate the bufs array, free it now. */
if (bufs != stack_bufs) {
uv__free(bufs);
}
return err;
}
int uv__pipe_write(uv_loop_t* loop,
uv_write_t* req,
uv_pipe_t* handle,
const uv_buf_t bufs[],
size_t nbufs,
uv_stream_t* send_handle,
uv_write_cb cb) {
if (handle->ipc) {
/* IPC pipe write: use framing protocol. */
return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
} else {
/* Non-IPC pipe write: put data on the wire directly. */
assert(send_handle == NULL);
return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
}
}
static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
uv_buf_t buf) {
/* If there is an eof timer running, we don't need it any more, so discard
* it. */
eof_timer_destroy(handle);
handle->flags &= ~UV_HANDLE_READABLE;
uv_read_stop((uv_stream_t*) handle);
handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
}
static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
uv_buf_t buf) {
/* If there is an eof timer running, we don't need it any more, so discard
* it. */
eof_timer_destroy(handle);
uv_read_stop((uv_stream_t*) handle);
handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
}
libuv/src/win/pipe.c view on Meta::CPAN
/* Read an exact number of bytes from a pipe. If an error or end-of-file is
* encountered before the requested number of bytes are read, an error is
* returned. */
static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
DWORD bytes_read, bytes_read_now;
bytes_read = 0;
while (bytes_read < count) {
if (!ReadFile(h,
(char*) buffer + bytes_read,
count - bytes_read,
&bytes_read_now,
NULL)) {
return GetLastError();
}
bytes_read += bytes_read_now;
}
assert(bytes_read == count);
return 0;
}
static DWORD uv__pipe_read_data(uv_loop_t* loop,
uv_pipe_t* handle,
DWORD suggested_bytes,
DWORD max_bytes) {
DWORD bytes_read;
uv_buf_t buf;
/* Ask the user for a buffer to read data into. */
buf = uv_buf_init(NULL, 0);
handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
if (buf.base == NULL || buf.len == 0) {
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
return 0; /* Break out of read loop. */
}
/* Ensure we read at most the smaller of:
* (a) the length of the user-allocated buffer.
* (b) the maximum data length as specified by the `max_bytes` argument.
*/
if (max_bytes > buf.len)
max_bytes = buf.len;
/* Read into the user buffer. */
if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
return 0; /* Break out of read loop. */
}
/* Call the read callback. */
handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
return bytes_read;
}
static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
int err;
if (*data_remaining > 0) {
/* Read frame data payload. */
DWORD bytes_read =
uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
*data_remaining -= bytes_read;
return bytes_read;
} else {
/* Start of a new IPC frame. */
uv__ipc_frame_header_t frame_header;
uint32_t xfer_flags;
uv__ipc_socket_xfer_type_t xfer_type;
uv__ipc_socket_xfer_info_t xfer_info;
/* Read the IPC frame header. */
err = uv__pipe_read_exactly(
handle->handle, &frame_header, sizeof frame_header);
if (err)
goto error;
/* Validate that flags are valid. */
if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
goto invalid;
/* Validate that reserved2 is zero. */
if (frame_header.reserved2 != 0)
goto invalid;
/* Parse xfer flags. */
xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
/* Socket coming -- determine the type. */
xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
? UV__IPC_SOCKET_XFER_TCP_CONNECTION
: UV__IPC_SOCKET_XFER_TCP_SERVER;
} else if (xfer_flags == 0) {
/* No socket. */
xfer_type = UV__IPC_SOCKET_XFER_NONE;
} else {
/* Invalid flags. */
goto invalid;
}
/* Parse data frame information. */
if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
*data_remaining = frame_header.data_length;
} else if (frame_header.data_length != 0) {
/* Data length greater than zero but data flag not set -- invalid. */
goto invalid;
}
/* If no socket xfer info follows, return here. Data will be read in a
* subsequent invocation of uv__pipe_read_ipc(). */
if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
return sizeof frame_header; /* Number of bytes read. */
/* Read transferred socket information. */
err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
if (err)
goto error;
/* Store the pending socket info. */
uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
/* Return number of bytes read. */
return sizeof frame_header + sizeof xfer_info;
}
invalid:
/* Invalid frame. */
err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
error:
uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
return 0; /* Break out of read loop. */
}
void uv_process_pipe_read_req(uv_loop_t* loop,
uv_pipe_t* handle,
uv_req_t* req) {
assert(handle->type == UV_NAMED_PIPE);
handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
DECREASE_PENDING_REQ_COUNT(handle);
eof_timer_stop(handle);
/* At this point, we're done with bookkeeping. If the user has stopped
* reading the pipe in the meantime, there is nothing left to do, since there
* is no callback that we can call. */
if (!(handle->flags & UV_HANDLE_READING))
return;
if (!REQ_SUCCESS(req)) {
/* An error occurred doing the zero-read. */
DWORD err = GET_REQ_ERROR(req);
/* If the read was cancelled by uv__pipe_interrupt_read(), the request may
* indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
* the user; we'll start a new zero-read at the end of this function. */
if (err != ERROR_OPERATION_ABORTED)
uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
} else {
/* The zero-read completed without error, indicating there is data
* available in the kernel buffer. */
DWORD avail;
/* Get the number of bytes available. */
avail = 0;
if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
/* Read until we've either read all the bytes available, or the 'reading'
* flag is cleared. */
while (avail > 0 && handle->flags & UV_HANDLE_READING) {
/* Depending on the type of pipe, read either IPC frames or raw data. */
DWORD bytes_read =
handle->ipc ? uv__pipe_read_ipc(loop, handle)
: uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
/* If no bytes were read, treat this as an indication that an error
* occurred, and break out of the read loop. */
if (bytes_read == 0)
break;
/* It is possible that more bytes were read than we thought were
* available. To prevent `avail` from underflowing, break out of the loop
* if this is the case. */
if (bytes_read > avail)
break;
/* Recompute the number of bytes available. */
avail -= bytes_read;
}
}
/* Start another zero-read request if necessary. */
if ((handle->flags & UV_HANDLE_READING) &&
!(handle->flags & UV_HANDLE_READ_PENDING)) {
uv_pipe_queue_read(loop, handle);
}
}
void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_write_t* req) {
int err;
assert(handle->type == UV_NAMED_PIPE);
assert(handle->write_queue_size >= req->u.io.queued_bytes);
handle->write_queue_size -= req->u.io.queued_bytes;
UNREGISTER_HANDLE_REQ(loop, handle, req);
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (req->wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(req->wait_handle);
req->wait_handle = INVALID_HANDLE_VALUE;
}
if (req->event_handle) {
CloseHandle(req->event_handle);
req->event_handle = NULL;
}
}
err = GET_REQ_ERROR(req);
/* If this was a coalesced write, extract pointer to the user_provided
* uv_write_t structure so we can pass the expected pointer to the callback,
* then free the heap-allocated write req. */
if (req->coalesced) {
uv__coalesced_write_t* coalesced_write =
container_of(req, uv__coalesced_write_t, req);
req = coalesced_write->user_req;
uv__free(coalesced_write);
( run in 0.752 second using v1.01-cache-2.11-cpan-df04353d9ac )