EV-Memcached

 view release on metacpan or  search on metacpan

src/EV__Memcached.xs  view on Meta::CPAN

    int fd;
    int connected;
    int connecting;

    /* IO watchers */
    ev_io rio, wio;
    int reading, writing;

    /* Buffers */
    char *rbuf;
    size_t rbuf_len, rbuf_cap;
    char *wbuf;
    size_t wbuf_len, wbuf_off, wbuf_cap;

    /* Callbacks */
    SV *on_error;
    SV *on_connect;
    SV *on_disconnect;

    /* Command queue */
    ngx_queue_t cb_queue;
    ngx_queue_t wait_queue;
    int pending_count;
    int waiting_count;
    int max_pending;       /* 0 = unlimited */
    uint32_t next_opaque;

    /* Reconnection */
    char *host;
    int port;
    char *path;
    int reconnect;
    int reconnect_delay_ms;
    int max_reconnect_attempts;
    int reconnect_attempts;
    ev_timer reconnect_timer;
    int reconnect_timer_active;
    int intentional_disconnect;
    int resume_waiting_on_reconnect;

    /* Timeouts */
    int connect_timeout_ms;
    ev_timer connect_timer;
    int connect_timer_active;
    int command_timeout_ms;
    ev_timer cmd_timer;
    int cmd_timer_active;

    /* Flow control */
    int waiting_timeout_ms;
    ev_timer waiting_timer;
    int waiting_timer_active;

    /* Safety */
    int callback_depth;
    int in_cb_cleanup;
    int in_wait_cleanup;

    /* Options */
    int priority;
    int keepalive;

    /* SASL auth */
    char *username;
    char *password;
};

/* ================================================================
 * Shared error strings (initialized in BOOT)
 * ================================================================ */

static SV *err_skipped = NULL;
static SV *err_disconnected = NULL;
static SV *err_waiting_timeout = NULL;

/* ================================================================
 * Forward declarations
 * ================================================================ */

static void io_cb(EV_P_ ev_io *w, int revents);
static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents);
static void waiting_timer_cb(EV_P_ ev_timer *w, int revents);
static void connect_timeout_cb(EV_P_ ev_timer *w, int revents);
static void cmd_timeout_cb(EV_P_ ev_timer *w, int revents);
static void arm_cmd_timer(ev_mc_t *self);
static void disarm_cmd_timer(ev_mc_t *self);
static uint32_t mc_enqueue_cmd(pTHX_ ev_mc_t *self,
    uint8_t opcode, const char *key, STRLEN key_len,
    const char *value, STRLEN value_len,
    const char *extras, uint8_t extras_len,
    uint64_t cas, int cmd, int quiet, SV *cb);
static void start_reading(ev_mc_t *self);
static void stop_reading(ev_mc_t *self);
static void start_writing(ev_mc_t *self);
static void stop_writing(ev_mc_t *self);
static void start_connect(pTHX_ ev_mc_t *self);
static void cleanup_connection(pTHX_ ev_mc_t *self);
static void emit_error(pTHX_ ev_mc_t *self, const char *msg);
static void handle_disconnect(pTHX_ ev_mc_t *self, const char *reason);
static void schedule_reconnect(pTHX_ ev_mc_t *self);
static void send_next_waiting(pTHX_ ev_mc_t *self);
static int check_destroyed(ev_mc_t *self);
static void cancel_pending(pTHX_ ev_mc_t *self, SV *err_sv);
static void cancel_waiting(pTHX_ ev_mc_t *self, SV *err_sv);

/* ================================================================
 * Binary protocol helpers (portable, no unaligned access)
 * ================================================================ */

static void mc_write_u16(char *buf, uint16_t val) {
    val = htons(val);
    memcpy(buf, &val, 2);
}

static void mc_write_u32(char *buf, uint32_t val) {
    val = htonl(val);
    memcpy(buf, &val, 4);
}

static void mc_write_u64(char *buf, uint64_t val) {
    uint32_t hi = htonl((uint32_t)(val >> 32));

src/EV__Memcached.xs  view on Meta::CPAN

}

static int try_write(ev_mc_t *self) {
    while (self->wbuf_off < self->wbuf_len) {
        ssize_t n = write(self->fd, self->wbuf + self->wbuf_off,
                          self->wbuf_len - self->wbuf_off);
        if (n > 0) {
            self->wbuf_off += n;
        } else if (n < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
                return 0; /* try again later */
            return -1; /* error */
        }
    }
    /* All written */
    self->wbuf_len = 0;
    self->wbuf_off = 0;
    stop_writing(self);
    return 1;
}

static void on_connect_complete(pTHX_ ev_mc_t *self) {
    int err = 0;
    socklen_t len = sizeof(err);
    if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0 || err != 0) {
        char errbuf[128];
        snprintf(errbuf, sizeof(errbuf), "connect failed: %s",
                 strerror(err ? err : errno));
        close(self->fd);
        self->fd = -1;
        self->connecting = 0;
        stop_writing(self);
        if (self->connect_timer_active) {
            ev_timer_stop(self->loop, &self->connect_timer);
            self->connect_timer_active = 0;
        }

        self->callback_depth++;
        emit_error(aTHX_ self, errbuf);
        self->callback_depth--;
        if (check_destroyed(self)) return;

        if (!self->intentional_disconnect && self->reconnect)
            schedule_reconnect(aTHX_ self);
        return;
    }

    self->connecting = 0;
    self->connected = 1;
    self->reconnect_attempts = 0;

    stop_writing(self);
    if (self->connect_timer_active) {
        ev_timer_stop(self->loop, &self->connect_timer);
        self->connect_timer_active = 0;
    }

    /* Setup read watcher */
    start_reading(self);

    /* TCP keepalive */
    if (self->keepalive > 0) {
        int one = 1;
        setsockopt(self->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
#ifdef TCP_KEEPIDLE
        setsockopt(self->fd, IPPROTO_TCP, TCP_KEEPIDLE,
                   &self->keepalive, sizeof(self->keepalive));
#endif
    }

    mc_send_sasl_auth(aTHX_ self, NULL);

    emit_connect(aTHX_ self);
    if (check_destroyed(self)) return;

    /* Defer waiting queue drain until SASL auth completes (if active) */
    if (!self->username)
        send_next_waiting(aTHX_ self);
}

static void io_cb(EV_P_ ev_io *w, int revents) {
    ev_mc_t *self = (ev_mc_t *)w->data;
    (void)loop;

    if (self->magic != MC_MAGIC_ALIVE) return;

    self->callback_depth++;

    if (self->connecting) {
        if (revents & EV_WRITE) {
            on_connect_complete(aTHX_ self);
        }
        self->callback_depth--;
        check_destroyed(self);
        return;
    }

    if (revents & EV_READ) {
        on_readable(aTHX_ self);
        if (self->magic != MC_MAGIC_ALIVE) {
            self->callback_depth--;
            check_destroyed(self);
            return;
        }
    }

    if (revents & EV_WRITE) {
        int rv = try_write(self);
        if (rv < 0) {
            char errbuf[128];
            snprintf(errbuf, sizeof(errbuf), "write error: %s", strerror(errno));
            handle_disconnect(aTHX_ self, errbuf);
        }
    }

    self->callback_depth--;
    check_destroyed(self);
}

/* ================================================================
 * Connection
 * ================================================================ */

static void start_connect(pTHX_ ev_mc_t *self) {
    int fd, ret;

    if (self->path) {

src/EV__Memcached.xs  view on Meta::CPAN

        if (ret != 0) {
            char errbuf[256];
            snprintf(errbuf, sizeof(errbuf), "getaddrinfo: %s", gai_strerror(ret));
            self->callback_depth++;
            emit_error(aTHX_ self, errbuf);
            self->callback_depth--;
            if (check_destroyed(self)) return;
            if (!self->intentional_disconnect && self->reconnect)
                schedule_reconnect(aTHX_ self);
            return;
        }

        fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
        if (fd < 0) {
            freeaddrinfo(res);
            char errbuf[128];
            snprintf(errbuf, sizeof(errbuf), "socket: %s", strerror(errno));
            self->callback_depth++;
            emit_error(aTHX_ self, errbuf);
            self->callback_depth--;
            if (check_destroyed(self)) return;
            if (!self->intentional_disconnect && self->reconnect)
                schedule_reconnect(aTHX_ self);
            return;
        }

        /* non-blocking */
        {
            int fl = fcntl(fd, F_GETFL);
            if (fl < 0 || fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
                freeaddrinfo(res);
                close(fd);
                emit_error(aTHX_ self, "fcntl O_NONBLOCK failed");
                return;
            }
        }

        /* TCP_NODELAY */
        {
            int one = 1;
            setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
        }

        self->fd = fd;
        ret = connect(fd, res->ai_addr, res->ai_addrlen);
        freeaddrinfo(res);
    }

    if (ret == 0) {
        /* Connected immediately (localhost) */
        self->connected = 1;
        ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
        self->rio.data = (void *)self;
        ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
        self->wio.data = (void *)self;
        ev_set_priority(&self->rio, self->priority);
        ev_set_priority(&self->wio, self->priority);

        start_reading(self);

        /* TCP keepalive */
        if (self->keepalive > 0 && !self->path) {
            int one = 1;
            setsockopt(self->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
#ifdef TCP_KEEPIDLE
            setsockopt(self->fd, IPPROTO_TCP, TCP_KEEPIDLE,
                       &self->keepalive, sizeof(self->keepalive));
#endif
        }

        self->reconnect_attempts = 0;

        mc_send_sasl_auth(aTHX_ self, NULL);

        emit_connect(aTHX_ self);
        if (check_destroyed(self)) return;
        if (!self->username)
            send_next_waiting(aTHX_ self);
        return;
    }

    if (errno != EINPROGRESS) {
        char errbuf[128];
        snprintf(errbuf, sizeof(errbuf), "connect: %s", strerror(errno));
        close(self->fd);
        self->fd = -1;
        self->callback_depth++;
        emit_error(aTHX_ self, errbuf);
        self->callback_depth--;
        if (check_destroyed(self)) return;
        if (!self->intentional_disconnect && self->reconnect)
            schedule_reconnect(aTHX_ self);
        return;
    }

    /* In progress - wait for writability */
    self->connecting = 1;
    ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
    self->rio.data = (void *)self;
    ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
    self->wio.data = (void *)self;
    ev_set_priority(&self->rio, self->priority);
    ev_set_priority(&self->wio, self->priority);
    start_writing(self);

    if (self->connect_timeout_ms > 0) {
        ev_tstamp delay = (ev_tstamp)self->connect_timeout_ms / 1000.0;
        ev_timer_init(&self->connect_timer, connect_timeout_cb, delay, 0.0);
        self->connect_timer.data = (void *)self;
        ev_timer_start(self->loop, &self->connect_timer);
        self->connect_timer_active = 1;
    }
}

/* ================================================================
 * XS interface
 * ================================================================ */

MODULE = EV::Memcached  PACKAGE = EV::Memcached

BOOT:
{
    I_EV_API("EV::Memcached");
    err_skipped = newSVpvs("skipped");
    SvREADONLY_on(err_skipped);
    err_disconnected = newSVpvs("disconnected");
    SvREADONLY_on(err_disconnected);
    err_waiting_timeout = newSVpvs("waiting timeout");
    SvREADONLY_on(err_waiting_timeout);
}

EV::Memcached
new(char *class, ...)
CODE:
{
    PERL_UNUSED_VAR(class);
    if ((items - 1) % 2 != 0) croak("odd number of arguments");

    Newxz(RETVAL, 1, ev_mc_t);
    RETVAL->magic = MC_MAGIC_ALIVE;
    RETVAL->fd = -1;
    RETVAL->port = 11211;
    RETVAL->next_opaque = 1; /* reserve 0 for fire-and-forget quiet ops */
    ngx_queue_init(&RETVAL->cb_queue);
    ngx_queue_init(&RETVAL->wait_queue);
    Newx(RETVAL->rbuf, BUF_INIT_SIZE, char);
    RETVAL->rbuf_cap = BUF_INIT_SIZE;
    Newx(RETVAL->wbuf, BUF_INIT_SIZE, char);
    RETVAL->wbuf_cap = BUF_INIT_SIZE;

    /* Default error handler: die */
    RETVAL->on_error = eval_pv("sub { die @_ }", TRUE);
    SvREFCNT_inc_simple_void_NN(RETVAL->on_error);

    /* Parse options */
    SV *host_sv = NULL, *path_sv = NULL;
    int port = 11211;
    int do_reconnect = 0, reconnect_delay = 1000, max_reconnect_attempts = 0;
    RETVAL->loop = EV_DEFAULT;
    int i;

    for (i = 1; i < items; i += 2) {
        const char *k = SvPV_nolen(ST(i));
        SV *v = ST(i + 1);

        if (strEQ(k, "host"))                        host_sv = v;
        else if (strEQ(k, "port"))                   port = SvIV(v);
        else if (strEQ(k, "path"))                   path_sv = v;
        else if (strEQ(k, "on_error")) {
            CLEAR_HANDLER(RETVAL->on_error);
            if (SvOK(v) && SvROK(v)) RETVAL->on_error = newSVsv(v);
        }
        else if (strEQ(k, "on_connect")) {
            if (SvOK(v) && SvROK(v)) RETVAL->on_connect = newSVsv(v);
        }
        else if (strEQ(k, "on_disconnect")) {
            if (SvOK(v) && SvROK(v)) RETVAL->on_disconnect = newSVsv(v);
        }
        else if (strEQ(k, "max_pending"))            RETVAL->max_pending = SvIV(v);
        else if (strEQ(k, "waiting_timeout"))        RETVAL->waiting_timeout_ms = SvIV(v);
        else if (strEQ(k, "connect_timeout"))        RETVAL->connect_timeout_ms = SvIV(v);
        else if (strEQ(k, "command_timeout"))        RETVAL->command_timeout_ms = SvIV(v);
        else if (strEQ(k, "resume_waiting_on_reconnect")) RETVAL->resume_waiting_on_reconnect = SvTRUE(v) ? 1 : 0;
        else if (strEQ(k, "priority"))               RETVAL->priority = SvIV(v);
        else if (strEQ(k, "keepalive"))              RETVAL->keepalive = SvIV(v);
        else if (strEQ(k, "reconnect"))              do_reconnect = SvTRUE(v) ? 1 : 0;
        else if (strEQ(k, "reconnect_delay"))        reconnect_delay = SvIV(v);
        else if (strEQ(k, "max_reconnect_attempts")) max_reconnect_attempts = SvIV(v);
        else if (strEQ(k, "username")) {
            if (SvOK(v)) RETVAL->username = savepv(SvPV_nolen(v));
        }
        else if (strEQ(k, "password")) {
            if (SvOK(v)) RETVAL->password = savepv(SvPV_nolen(v));
        }
        else if (strEQ(k, "loop")) {
            RETVAL->loop = (struct ev_loop *)SvPVX(SvRV(v));
        }
    }

    if (host_sv && path_sv) {
        Safefree(RETVAL->rbuf);
        Safefree(RETVAL->wbuf);
        CLEAR_HANDLER(RETVAL->on_error);
        CLEAR_HANDLER(RETVAL->on_connect);
        CLEAR_HANDLER(RETVAL->on_disconnect);
        if (RETVAL->username) Safefree(RETVAL->username);
        if (RETVAL->password) Safefree(RETVAL->password);
        Safefree(RETVAL);
        croak("cannot specify both 'host' and 'path'");
    }

    RETVAL->port = port;
    if (do_reconnect) {
        RETVAL->reconnect = 1;
        RETVAL->reconnect_delay_ms = reconnect_delay >= 0 ? reconnect_delay : 0;
        RETVAL->max_reconnect_attempts = max_reconnect_attempts >= 0 ? max_reconnect_attempts : 0;
    }

    if (host_sv && SvOK(host_sv)) {
        RETVAL->host = savepv(SvPV_nolen(host_sv));
        start_connect(aTHX_ RETVAL);
    }
    else if (path_sv && SvOK(path_sv)) {
        RETVAL->path = savepv(SvPV_nolen(path_sv));
        start_connect(aTHX_ RETVAL);
    }
}
OUTPUT:
    RETVAL

void
DESTROY(EV::Memcached self)
CODE:
{
    if (self->magic == MC_MAGIC_FREED) return;

    /* If we're inside a callback, defer destruction.
     * check_destroyed() in io_cb/timer_cb will Safefree after unwind. */
    if (self->callback_depth > 0) {
        self->magic = MC_MAGIC_FREED;

        /* Stop watchers */
        stop_reading(self);
        stop_writing(self);
        if (self->connect_timer_active) {

src/EV__Memcached.xs  view on Meta::CPAN

            arm_cmd_timer(self);
    }
    RETVAL = self->command_timeout_ms;
}
OUTPUT:
    RETVAL

void
reconnect(EV::Memcached self, int enable, int delay_ms = 1000, int max_attempts = 0)
CODE:
{
    self->reconnect = enable ? 1 : 0;
    self->reconnect_delay_ms = delay_ms >= 0 ? delay_ms : 0;
    self->max_reconnect_attempts = max_attempts >= 0 ? max_attempts : 0;
    if (!enable) {
        self->reconnect_attempts = 0;
        if (self->reconnect_timer_active) {
            ev_timer_stop(self->loop, &self->reconnect_timer);
            self->reconnect_timer_active = 0;
        }
    }
}

int
reconnect_enabled(EV::Memcached self)
CODE:
    RETVAL = self->reconnect;
OUTPUT:
    RETVAL

int
priority(EV::Memcached self, ...)
CODE:
{
    if (items > 1) {
        self->priority = SvIV(ST(1));
        if (self->priority < -2) self->priority = -2;
        if (self->priority > 2) self->priority = 2;
        /* Apply to active watchers */
        if (self->reading) {
            ev_io_stop(self->loop, &self->rio);
            ev_set_priority(&self->rio, self->priority);
            ev_io_start(self->loop, &self->rio);
        } else {
            ev_set_priority(&self->rio, self->priority);
        }
        if (self->writing) {
            ev_io_stop(self->loop, &self->wio);
            ev_set_priority(&self->wio, self->priority);
            ev_io_start(self->loop, &self->wio);
        } else {
            ev_set_priority(&self->wio, self->priority);
        }
    }
    RETVAL = self->priority;
}
OUTPUT:
    RETVAL

int
keepalive(EV::Memcached self, ...)
CODE:
{
    if (items > 1) {
        self->keepalive = SvIV(ST(1));
        if (self->keepalive < 0) self->keepalive = 0;
        /* Apply to current connection if active */
        if (self->connected && self->fd >= 0 && self->keepalive > 0) {
            int one = 1;
            setsockopt(self->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
#ifdef TCP_KEEPIDLE
            setsockopt(self->fd, IPPROTO_TCP, TCP_KEEPIDLE,
                       &self->keepalive, sizeof(self->keepalive));
#endif
        }
    }
    RETVAL = self->keepalive;
}
OUTPUT:
    RETVAL

void
skip_pending(EV::Memcached self)
CODE:
{
    cancel_pending_impl(aTHX_ self, err_skipped, 1);
}

void
skip_waiting(EV::Memcached self)
CODE:
{
    cancel_waiting(aTHX_ self, err_skipped);
}



( run in 0.848 second using v1.01-cache-2.11-cpan-39bf76dae61 )