Alien-uv

 view release on metacpan or  search on metacpan

libuv/src/win/pipe.c  view on Meta::CPAN

/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to
 * deal in the Software without restriction, including without limitation the
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 * sell copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 * IN THE SOFTWARE.
 */

#include <assert.h>
#include <io.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "handle-inl.h"
#include "internal.h"
#include "req-inl.h"
#include "stream-inl.h"
#include "uv-common.h"
#include "uv.h"

#include <aclapi.h>
#include <accctrl.h>

/* A zero-size buffer for use by uv_pipe_read */
static char uv_zero_[] = "";

/* Null uv_buf_t */
static const uv_buf_t uv_null_buf_ = { 0, NULL };

/* The timeout that the pipe will wait for the remote end to write data when
 * the local ends wants to shut it down. */
static const int64_t eof_timeout = 50; /* ms */

static const int default_pending_pipe_instances = 4;

/* Pipe prefix */
static char pipe_prefix[] = "\\\\?\\pipe";
static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;

/* IPC incoming xfer queue item. */
typedef struct {
  uv__ipc_socket_xfer_type_t xfer_type;
  uv__ipc_socket_xfer_info_t xfer_info;
  QUEUE member;
} uv__ipc_xfer_queue_item_t;

/* IPC frame header flags. */
/* clang-format off */
enum {
  UV__IPC_FRAME_HAS_DATA                = 0x01,
  UV__IPC_FRAME_HAS_SOCKET_XFER         = 0x02,
  UV__IPC_FRAME_XFER_IS_TCP_CONNECTION  = 0x04,
  /* These are combinations of the flags above. */
  UV__IPC_FRAME_XFER_FLAGS              = 0x06,
  UV__IPC_FRAME_VALID_FLAGS             = 0x07
};
/* clang-format on */

/* IPC frame header. */
typedef struct {
  uint32_t flags;
  uint32_t reserved1;   /* Ignored. */
  uint32_t data_length; /* Must be zero if there is no data. */
  uint32_t reserved2;   /* Must be zero. */
} uv__ipc_frame_header_t;

/* To implement the IPC protocol correctly, these structures must have exactly
 * the right size. */
STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);

/* Coalesced write request. */
typedef struct {
  uv_write_t req;       /* Internal heap-allocated write request. */
  uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
} uv__coalesced_write_t;


static void eof_timer_init(uv_pipe_t* pipe);
static void eof_timer_start(uv_pipe_t* pipe);
static void eof_timer_stop(uv_pipe_t* pipe);
static void eof_timer_cb(uv_timer_t* timer);
static void eof_timer_destroy(uv_pipe_t* pipe);
static void eof_timer_close_cb(uv_handle_t* handle);


static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
  snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
}


int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
  uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);

  handle->reqs_pending = 0;
  handle->handle = INVALID_HANDLE_VALUE;
  handle->name = NULL;
  handle->pipe.conn.ipc_remote_pid = 0;
  handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
  QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
  handle->pipe.conn.ipc_xfer_queue_length = 0;
  handle->ipc = ipc;
  handle->pipe.conn.non_overlapped_writes_tail = NULL;

  return 0;
}


static void uv_pipe_connection_init(uv_pipe_t* handle) {
  uv_connection_init((uv_stream_t*) handle);
  handle->read_req.data = handle;
  handle->pipe.conn.eof_timer = NULL;
  assert(!(handle->flags & UV_HANDLE_PIPESERVER));
  if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
    handle->pipe.conn.readfile_thread_handle = NULL;
    InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
  }
}


static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
  HANDLE pipeHandle;

  /*
   * Assume that we have a duplex pipe first, so attempt to
   * connect with GENERIC_READ | GENERIC_WRITE.
   */
  pipeHandle = CreateFileW(name,
                           GENERIC_READ | GENERIC_WRITE,
                           0,
                           NULL,
                           OPEN_EXISTING,
                           FILE_FLAG_OVERLAPPED,
                           NULL);
  if (pipeHandle != INVALID_HANDLE_VALUE) {
    *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
    return pipeHandle;
  }

  /*
   * If the pipe is not duplex CreateFileW fails with
   * ERROR_ACCESS_DENIED.  In that case try to connect
   * as a read-only or write-only.
   */
  if (GetLastError() == ERROR_ACCESS_DENIED) {
    pipeHandle = CreateFileW(name,
                             GENERIC_READ | FILE_WRITE_ATTRIBUTES,
                             0,
                             NULL,
                             OPEN_EXISTING,
                             FILE_FLAG_OVERLAPPED,
                             NULL);

    if (pipeHandle != INVALID_HANDLE_VALUE) {
      *duplex_flags = UV_HANDLE_READABLE;
      return pipeHandle;
    }
  }

libuv/src/win/pipe.c  view on Meta::CPAN

                       NULL,
                       &req->u.io.overlapped);

    if (!result && GetLastError() != ERROR_IO_PENDING) {
      return GetLastError();
    }

    if (result) {
      /* Request completed immediately. */
      req->u.io.queued_bytes = 0;
    } else {
      /* Request queued by the kernel. */
      req->u.io.queued_bytes = write_buf.len;
      handle->write_queue_size += req->u.io.queued_bytes;
    }

    if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
      req->event_handle = CreateEvent(NULL, 0, 0, NULL);
      if (!req->event_handle) {
        uv_fatal_error(GetLastError(), "CreateEvent");
      }
      if (!RegisterWaitForSingleObject(&req->wait_handle,
          req->u.io.overlapped.hEvent, post_completion_write_wait, (void*) req,
          INFINITE, WT_EXECUTEINWAITTHREAD)) {
        return GetLastError();
      }
    }
  }

  REGISTER_HANDLE_REQ(loop, handle, req);
  handle->reqs_pending++;
  handle->stream.conn.write_reqs_pending++;

  return 0;
}


static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
  DWORD* pid = &handle->pipe.conn.ipc_remote_pid;

  /* If the both ends of the IPC pipe are owned by the same process,
   * the remote end pid may not yet be set. If so, do it here.
   * TODO: this is weird; it'd probably better to use a handshake. */
  if (*pid == 0)
    *pid = GetCurrentProcessId();

  return *pid;
}


int uv__pipe_write_ipc(uv_loop_t* loop,
                       uv_write_t* req,
                       uv_pipe_t* handle,
                       const uv_buf_t data_bufs[],
                       size_t data_buf_count,
                       uv_stream_t* send_handle,
                       uv_write_cb cb) {
  uv_buf_t stack_bufs[6];
  uv_buf_t* bufs;
  size_t buf_count, buf_index;
  uv__ipc_frame_header_t frame_header;
  uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
  uv__ipc_socket_xfer_info_t xfer_info;
  uint64_t data_length;
  size_t i;
  int err;

  /* Compute the combined size of data buffers. */
  data_length = 0;
  for (i = 0; i < data_buf_count; i++)
    data_length += data_bufs[i].len;
  if (data_length > UINT32_MAX)
    return WSAENOBUFS; /* Maps to UV_ENOBUFS. */

  /* Prepare the frame's socket xfer payload. */
  if (send_handle != NULL) {
    uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;

    /* Verify that `send_handle` it is indeed a tcp handle. */
    if (send_tcp_handle->type != UV_TCP)
      return ERROR_NOT_SUPPORTED;

    /* Export the tcp handle. */
    err = uv__tcp_xfer_export(send_tcp_handle,
                              uv__pipe_get_ipc_remote_pid(handle),
                              &xfer_type,
                              &xfer_info);
    if (err != 0)
      return err;
  }

  /* Compute the number of uv_buf_t's required. */
  buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
  if (send_handle != NULL)
    buf_count += 1; /* One extra for the socket xfer information. */

  /* Use the on-stack buffer array if it is big enough; otherwise allocate
   * space for it on the heap. */
  if (buf_count < ARRAY_SIZE(stack_bufs)) {
    /* Use on-stack buffer array. */
    bufs = stack_bufs;
  } else {
    /* Use heap-allocated buffer array. */
    bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
    if (bufs == NULL)
      return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
  }
  buf_index = 0;

  /* Initialize frame header and add it to the buffers list. */
  memset(&frame_header, 0, sizeof frame_header);
  bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);

  if (send_handle != NULL) {
    /* Add frame header flags. */
    switch (xfer_type) {
      case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
        frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
                              UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
        break;
      case UV__IPC_SOCKET_XFER_TCP_SERVER:
        frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
        break;
      default:
        assert(0);  /* Unreachable. */
    }
    /* Add xfer info buffer. */
    bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
  }

  if (data_length > 0) {
    /* Update frame header. */
    frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
    frame_header.data_length = (uint32_t) data_length;
    /* Add data buffers to buffers list. */
    for (i = 0; i < data_buf_count; i++)
      bufs[buf_index++] = data_bufs[i];
  }

  /* Write buffers. We set the `always_copy` flag, so it is not a problem that
   * some of the written data lives on the stack. */
  err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);

  /* If we had to heap-allocate the bufs array, free it now. */
  if (bufs != stack_bufs) {
    uv__free(bufs);
  }

  return err;
}


int uv__pipe_write(uv_loop_t* loop,
                   uv_write_t* req,
                   uv_pipe_t* handle,
                   const uv_buf_t bufs[],
                   size_t nbufs,
                   uv_stream_t* send_handle,
                   uv_write_cb cb) {
  if (handle->ipc) {
    /* IPC pipe write: use framing protocol. */
    return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
  } else {
    /* Non-IPC pipe write: put data on the wire directly. */
    assert(send_handle == NULL);
    return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
  }
}


static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
    uv_buf_t buf) {
  /* If there is an eof timer running, we don't need it any more, so discard
   * it. */
  eof_timer_destroy(handle);

  handle->flags &= ~UV_HANDLE_READABLE;
  uv_read_stop((uv_stream_t*) handle);

  handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
}


static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
    uv_buf_t buf) {
  /* If there is an eof timer running, we don't need it any more, so discard
   * it. */
  eof_timer_destroy(handle);

  uv_read_stop((uv_stream_t*) handle);

  handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
}

libuv/src/win/pipe.c  view on Meta::CPAN

/* Read an exact number of bytes from a pipe. If an error or end-of-file is
 * encountered before the requested number of bytes are read, an error is
 * returned. */
static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
  DWORD bytes_read, bytes_read_now;

  bytes_read = 0;
  while (bytes_read < count) {
    if (!ReadFile(h,
                  (char*) buffer + bytes_read,
                  count - bytes_read,
                  &bytes_read_now,
                  NULL)) {
      return GetLastError();
    }

    bytes_read += bytes_read_now;
  }

  assert(bytes_read == count);
  return 0;
}


static DWORD uv__pipe_read_data(uv_loop_t* loop,
                                uv_pipe_t* handle,
                                DWORD suggested_bytes,
                                DWORD max_bytes) {
  DWORD bytes_read;
  uv_buf_t buf;

  /* Ask the user for a buffer to read data into. */
  buf = uv_buf_init(NULL, 0);
  handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
  if (buf.base == NULL || buf.len == 0) {
    handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
    return 0; /* Break out of read loop. */
  }

  /* Ensure we read at most the smaller of:
   *   (a) the length of the user-allocated buffer.
   *   (b) the maximum data length as specified by the `max_bytes` argument.
   */
  if (max_bytes > buf.len)
    max_bytes = buf.len;

  /* Read into the user buffer. */
  if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
    uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
    return 0; /* Break out of read loop. */
  }

  /* Call the read callback. */
  handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);

  return bytes_read;
}


static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
  uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
  int err;

  if (*data_remaining > 0) {
    /* Read frame data payload. */
    DWORD bytes_read =
        uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
    *data_remaining -= bytes_read;
    return bytes_read;

  } else {
    /* Start of a new IPC frame. */
    uv__ipc_frame_header_t frame_header;
    uint32_t xfer_flags;
    uv__ipc_socket_xfer_type_t xfer_type;
    uv__ipc_socket_xfer_info_t xfer_info;

    /* Read the IPC frame header. */
    err = uv__pipe_read_exactly(
        handle->handle, &frame_header, sizeof frame_header);
    if (err)
      goto error;

    /* Validate that flags are valid. */
    if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
      goto invalid;
    /* Validate that reserved2 is zero. */
    if (frame_header.reserved2 != 0)
      goto invalid;

    /* Parse xfer flags. */
    xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
    if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
      /* Socket coming -- determine the type. */
      xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
                      ? UV__IPC_SOCKET_XFER_TCP_CONNECTION
                      : UV__IPC_SOCKET_XFER_TCP_SERVER;
    } else if (xfer_flags == 0) {
      /* No socket. */
      xfer_type = UV__IPC_SOCKET_XFER_NONE;
    } else {
      /* Invalid flags. */
      goto invalid;
    }

    /* Parse data frame information. */
    if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
      *data_remaining = frame_header.data_length;
    } else if (frame_header.data_length != 0) {
      /* Data length greater than zero but data flag not set -- invalid. */
      goto invalid;
    }

    /* If no socket xfer info follows, return here. Data will be read in a
     * subsequent invocation of uv__pipe_read_ipc(). */
    if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
      return sizeof frame_header; /* Number of bytes read. */

    /* Read transferred socket information. */
    err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
    if (err)
      goto error;

    /* Store the pending socket info. */
    uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);

    /* Return number of bytes read. */
    return sizeof frame_header + sizeof xfer_info;
  }

invalid:
  /* Invalid frame. */
  err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */

error:
  uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
  return 0; /* Break out of read loop. */
}


void uv_process_pipe_read_req(uv_loop_t* loop,
                              uv_pipe_t* handle,
                              uv_req_t* req) {
  assert(handle->type == UV_NAMED_PIPE);

  handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
  DECREASE_PENDING_REQ_COUNT(handle);
  eof_timer_stop(handle);

  /* At this point, we're done with bookkeeping. If the user has stopped
   * reading the pipe in the meantime, there is nothing left to do, since there
   * is no callback that we can call. */
  if (!(handle->flags & UV_HANDLE_READING))
    return;

  if (!REQ_SUCCESS(req)) {
    /* An error occurred doing the zero-read. */
    DWORD err = GET_REQ_ERROR(req);

    /* If the read was cancelled by uv__pipe_interrupt_read(), the request may
     * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
     * the user; we'll start a new zero-read at the end of this function. */
    if (err != ERROR_OPERATION_ABORTED)
      uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);

  } else {
    /* The zero-read completed without error, indicating there is data
     * available in the kernel buffer. */
    DWORD avail;

    /* Get the number of bytes available. */
    avail = 0;
    if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
      uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);

    /* Read until we've either read all the bytes available, or the 'reading'
     * flag is cleared. */
    while (avail > 0 && handle->flags & UV_HANDLE_READING) {
      /* Depending on the type of pipe, read either IPC frames or raw data. */
      DWORD bytes_read =
          handle->ipc ? uv__pipe_read_ipc(loop, handle)
                      : uv__pipe_read_data(loop, handle, avail, (DWORD) -1);

      /* If no bytes were read, treat this as an indication that an error
       * occurred, and break out of the read loop. */
      if (bytes_read == 0)
        break;

      /* It is possible that more bytes were read than we thought were
       * available. To prevent `avail` from underflowing, break out of the loop
       * if this is the case. */
      if (bytes_read > avail)
        break;

      /* Recompute the number of bytes available. */
      avail -= bytes_read;
    }
  }

  /* Start another zero-read request if necessary. */
  if ((handle->flags & UV_HANDLE_READING) &&
      !(handle->flags & UV_HANDLE_READ_PENDING)) {
    uv_pipe_queue_read(loop, handle);
  }
}


void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
    uv_write_t* req) {
  int err;

  assert(handle->type == UV_NAMED_PIPE);

  assert(handle->write_queue_size >= req->u.io.queued_bytes);
  handle->write_queue_size -= req->u.io.queued_bytes;

  UNREGISTER_HANDLE_REQ(loop, handle, req);

  if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
    if (req->wait_handle != INVALID_HANDLE_VALUE) {
      UnregisterWait(req->wait_handle);
      req->wait_handle = INVALID_HANDLE_VALUE;
    }
    if (req->event_handle) {
      CloseHandle(req->event_handle);
      req->event_handle = NULL;
    }
  }

  err = GET_REQ_ERROR(req);

  /* If this was a coalesced write, extract pointer to the user_provided
   * uv_write_t structure so we can pass the expected pointer to the callback,
   * then free the heap-allocated write req. */
  if (req->coalesced) {
    uv__coalesced_write_t* coalesced_write =
        container_of(req, uv__coalesced_write_t, req);
    req = coalesced_write->user_req;
    uv__free(coalesced_write);



( run in 0.752 second using v1.01-cache-2.11-cpan-df04353d9ac )