EV-Memcached
view release on metacpan or search on metacpan
src/EV__Memcached.xs view on Meta::CPAN
int fd;
int connected;
int connecting;
/* IO watchers */
ev_io rio, wio;
int reading, writing;
/* Buffers */
char *rbuf;
size_t rbuf_len, rbuf_cap;
char *wbuf;
size_t wbuf_len, wbuf_off, wbuf_cap;
/* Callbacks */
SV *on_error;
SV *on_connect;
SV *on_disconnect;
/* Command queue */
ngx_queue_t cb_queue;
ngx_queue_t wait_queue;
int pending_count;
int waiting_count;
int max_pending; /* 0 = unlimited */
uint32_t next_opaque;
/* Reconnection */
char *host;
int port;
char *path;
int reconnect;
int reconnect_delay_ms;
int max_reconnect_attempts;
int reconnect_attempts;
ev_timer reconnect_timer;
int reconnect_timer_active;
int intentional_disconnect;
int resume_waiting_on_reconnect;
/* Timeouts */
int connect_timeout_ms;
ev_timer connect_timer;
int connect_timer_active;
int command_timeout_ms;
ev_timer cmd_timer;
int cmd_timer_active;
/* Flow control */
int waiting_timeout_ms;
ev_timer waiting_timer;
int waiting_timer_active;
/* Safety */
int callback_depth;
int in_cb_cleanup;
int in_wait_cleanup;
/* Options */
int priority;
int keepalive;
/* SASL auth */
char *username;
char *password;
};
/* ================================================================
* Shared error strings (initialized in BOOT)
* ================================================================ */
static SV *err_skipped = NULL;
static SV *err_disconnected = NULL;
static SV *err_waiting_timeout = NULL;
/* ================================================================
* Forward declarations
* ================================================================ */
static void io_cb(EV_P_ ev_io *w, int revents);
static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents);
static void waiting_timer_cb(EV_P_ ev_timer *w, int revents);
static void connect_timeout_cb(EV_P_ ev_timer *w, int revents);
static void cmd_timeout_cb(EV_P_ ev_timer *w, int revents);
static void arm_cmd_timer(ev_mc_t *self);
static void disarm_cmd_timer(ev_mc_t *self);
static uint32_t mc_enqueue_cmd(pTHX_ ev_mc_t *self,
uint8_t opcode, const char *key, STRLEN key_len,
const char *value, STRLEN value_len,
const char *extras, uint8_t extras_len,
uint64_t cas, int cmd, int quiet, SV *cb);
static void start_reading(ev_mc_t *self);
static void stop_reading(ev_mc_t *self);
static void start_writing(ev_mc_t *self);
static void stop_writing(ev_mc_t *self);
static void start_connect(pTHX_ ev_mc_t *self);
static void cleanup_connection(pTHX_ ev_mc_t *self);
static void emit_error(pTHX_ ev_mc_t *self, const char *msg);
static void handle_disconnect(pTHX_ ev_mc_t *self, const char *reason);
static void schedule_reconnect(pTHX_ ev_mc_t *self);
static void send_next_waiting(pTHX_ ev_mc_t *self);
static int check_destroyed(ev_mc_t *self);
static void cancel_pending(pTHX_ ev_mc_t *self, SV *err_sv);
static void cancel_waiting(pTHX_ ev_mc_t *self, SV *err_sv);
/* ================================================================
* Binary protocol helpers (portable, no unaligned access)
* ================================================================ */
static void mc_write_u16(char *buf, uint16_t val) {
val = htons(val);
memcpy(buf, &val, 2);
}
static void mc_write_u32(char *buf, uint32_t val) {
val = htonl(val);
memcpy(buf, &val, 4);
}
static void mc_write_u64(char *buf, uint64_t val) {
uint32_t hi = htonl((uint32_t)(val >> 32));
src/EV__Memcached.xs view on Meta::CPAN
}
static int try_write(ev_mc_t *self) {
while (self->wbuf_off < self->wbuf_len) {
ssize_t n = write(self->fd, self->wbuf + self->wbuf_off,
self->wbuf_len - self->wbuf_off);
if (n > 0) {
self->wbuf_off += n;
} else if (n < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
return 0; /* try again later */
return -1; /* error */
}
}
/* All written */
self->wbuf_len = 0;
self->wbuf_off = 0;
stop_writing(self);
return 1;
}
static void on_connect_complete(pTHX_ ev_mc_t *self) {
int err = 0;
socklen_t len = sizeof(err);
if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0 || err != 0) {
char errbuf[128];
snprintf(errbuf, sizeof(errbuf), "connect failed: %s",
strerror(err ? err : errno));
close(self->fd);
self->fd = -1;
self->connecting = 0;
stop_writing(self);
if (self->connect_timer_active) {
ev_timer_stop(self->loop, &self->connect_timer);
self->connect_timer_active = 0;
}
self->callback_depth++;
emit_error(aTHX_ self, errbuf);
self->callback_depth--;
if (check_destroyed(self)) return;
if (!self->intentional_disconnect && self->reconnect)
schedule_reconnect(aTHX_ self);
return;
}
self->connecting = 0;
self->connected = 1;
self->reconnect_attempts = 0;
stop_writing(self);
if (self->connect_timer_active) {
ev_timer_stop(self->loop, &self->connect_timer);
self->connect_timer_active = 0;
}
/* Setup read watcher */
start_reading(self);
/* TCP keepalive */
if (self->keepalive > 0) {
int one = 1;
setsockopt(self->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
#ifdef TCP_KEEPIDLE
setsockopt(self->fd, IPPROTO_TCP, TCP_KEEPIDLE,
&self->keepalive, sizeof(self->keepalive));
#endif
}
mc_send_sasl_auth(aTHX_ self, NULL);
emit_connect(aTHX_ self);
if (check_destroyed(self)) return;
/* Defer waiting queue drain until SASL auth completes (if active) */
if (!self->username)
send_next_waiting(aTHX_ self);
}
static void io_cb(EV_P_ ev_io *w, int revents) {
ev_mc_t *self = (ev_mc_t *)w->data;
(void)loop;
if (self->magic != MC_MAGIC_ALIVE) return;
self->callback_depth++;
if (self->connecting) {
if (revents & EV_WRITE) {
on_connect_complete(aTHX_ self);
}
self->callback_depth--;
check_destroyed(self);
return;
}
if (revents & EV_READ) {
on_readable(aTHX_ self);
if (self->magic != MC_MAGIC_ALIVE) {
self->callback_depth--;
check_destroyed(self);
return;
}
}
if (revents & EV_WRITE) {
int rv = try_write(self);
if (rv < 0) {
char errbuf[128];
snprintf(errbuf, sizeof(errbuf), "write error: %s", strerror(errno));
handle_disconnect(aTHX_ self, errbuf);
}
}
self->callback_depth--;
check_destroyed(self);
}
/* ================================================================
* Connection
* ================================================================ */
static void start_connect(pTHX_ ev_mc_t *self) {
int fd, ret;
if (self->path) {
src/EV__Memcached.xs view on Meta::CPAN
if (ret != 0) {
char errbuf[256];
snprintf(errbuf, sizeof(errbuf), "getaddrinfo: %s", gai_strerror(ret));
self->callback_depth++;
emit_error(aTHX_ self, errbuf);
self->callback_depth--;
if (check_destroyed(self)) return;
if (!self->intentional_disconnect && self->reconnect)
schedule_reconnect(aTHX_ self);
return;
}
fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (fd < 0) {
freeaddrinfo(res);
char errbuf[128];
snprintf(errbuf, sizeof(errbuf), "socket: %s", strerror(errno));
self->callback_depth++;
emit_error(aTHX_ self, errbuf);
self->callback_depth--;
if (check_destroyed(self)) return;
if (!self->intentional_disconnect && self->reconnect)
schedule_reconnect(aTHX_ self);
return;
}
/* non-blocking */
{
int fl = fcntl(fd, F_GETFL);
if (fl < 0 || fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
freeaddrinfo(res);
close(fd);
emit_error(aTHX_ self, "fcntl O_NONBLOCK failed");
return;
}
}
/* TCP_NODELAY */
{
int one = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
}
self->fd = fd;
ret = connect(fd, res->ai_addr, res->ai_addrlen);
freeaddrinfo(res);
}
if (ret == 0) {
/* Connected immediately (localhost) */
self->connected = 1;
ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
self->rio.data = (void *)self;
ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
self->wio.data = (void *)self;
ev_set_priority(&self->rio, self->priority);
ev_set_priority(&self->wio, self->priority);
start_reading(self);
/* TCP keepalive */
if (self->keepalive > 0 && !self->path) {
int one = 1;
setsockopt(self->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
#ifdef TCP_KEEPIDLE
setsockopt(self->fd, IPPROTO_TCP, TCP_KEEPIDLE,
&self->keepalive, sizeof(self->keepalive));
#endif
}
self->reconnect_attempts = 0;
mc_send_sasl_auth(aTHX_ self, NULL);
emit_connect(aTHX_ self);
if (check_destroyed(self)) return;
if (!self->username)
send_next_waiting(aTHX_ self);
return;
}
if (errno != EINPROGRESS) {
char errbuf[128];
snprintf(errbuf, sizeof(errbuf), "connect: %s", strerror(errno));
close(self->fd);
self->fd = -1;
self->callback_depth++;
emit_error(aTHX_ self, errbuf);
self->callback_depth--;
if (check_destroyed(self)) return;
if (!self->intentional_disconnect && self->reconnect)
schedule_reconnect(aTHX_ self);
return;
}
/* In progress - wait for writability */
self->connecting = 1;
ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
self->rio.data = (void *)self;
ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
self->wio.data = (void *)self;
ev_set_priority(&self->rio, self->priority);
ev_set_priority(&self->wio, self->priority);
start_writing(self);
if (self->connect_timeout_ms > 0) {
ev_tstamp delay = (ev_tstamp)self->connect_timeout_ms / 1000.0;
ev_timer_init(&self->connect_timer, connect_timeout_cb, delay, 0.0);
self->connect_timer.data = (void *)self;
ev_timer_start(self->loop, &self->connect_timer);
self->connect_timer_active = 1;
}
}
/* ================================================================
* XS interface
* ================================================================ */
MODULE = EV::Memcached PACKAGE = EV::Memcached
BOOT:
{
I_EV_API("EV::Memcached");
err_skipped = newSVpvs("skipped");
SvREADONLY_on(err_skipped);
err_disconnected = newSVpvs("disconnected");
SvREADONLY_on(err_disconnected);
err_waiting_timeout = newSVpvs("waiting timeout");
SvREADONLY_on(err_waiting_timeout);
}
EV::Memcached
new(char *class, ...)
CODE:
{
PERL_UNUSED_VAR(class);
if ((items - 1) % 2 != 0) croak("odd number of arguments");
Newxz(RETVAL, 1, ev_mc_t);
RETVAL->magic = MC_MAGIC_ALIVE;
RETVAL->fd = -1;
RETVAL->port = 11211;
RETVAL->next_opaque = 1; /* reserve 0 for fire-and-forget quiet ops */
ngx_queue_init(&RETVAL->cb_queue);
ngx_queue_init(&RETVAL->wait_queue);
Newx(RETVAL->rbuf, BUF_INIT_SIZE, char);
RETVAL->rbuf_cap = BUF_INIT_SIZE;
Newx(RETVAL->wbuf, BUF_INIT_SIZE, char);
RETVAL->wbuf_cap = BUF_INIT_SIZE;
/* Default error handler: die */
RETVAL->on_error = eval_pv("sub { die @_ }", TRUE);
SvREFCNT_inc_simple_void_NN(RETVAL->on_error);
/* Parse options */
SV *host_sv = NULL, *path_sv = NULL;
int port = 11211;
int do_reconnect = 0, reconnect_delay = 1000, max_reconnect_attempts = 0;
RETVAL->loop = EV_DEFAULT;
int i;
for (i = 1; i < items; i += 2) {
const char *k = SvPV_nolen(ST(i));
SV *v = ST(i + 1);
if (strEQ(k, "host")) host_sv = v;
else if (strEQ(k, "port")) port = SvIV(v);
else if (strEQ(k, "path")) path_sv = v;
else if (strEQ(k, "on_error")) {
CLEAR_HANDLER(RETVAL->on_error);
if (SvOK(v) && SvROK(v)) RETVAL->on_error = newSVsv(v);
}
else if (strEQ(k, "on_connect")) {
if (SvOK(v) && SvROK(v)) RETVAL->on_connect = newSVsv(v);
}
else if (strEQ(k, "on_disconnect")) {
if (SvOK(v) && SvROK(v)) RETVAL->on_disconnect = newSVsv(v);
}
else if (strEQ(k, "max_pending")) RETVAL->max_pending = SvIV(v);
else if (strEQ(k, "waiting_timeout")) RETVAL->waiting_timeout_ms = SvIV(v);
else if (strEQ(k, "connect_timeout")) RETVAL->connect_timeout_ms = SvIV(v);
else if (strEQ(k, "command_timeout")) RETVAL->command_timeout_ms = SvIV(v);
else if (strEQ(k, "resume_waiting_on_reconnect")) RETVAL->resume_waiting_on_reconnect = SvTRUE(v) ? 1 : 0;
else if (strEQ(k, "priority")) RETVAL->priority = SvIV(v);
else if (strEQ(k, "keepalive")) RETVAL->keepalive = SvIV(v);
else if (strEQ(k, "reconnect")) do_reconnect = SvTRUE(v) ? 1 : 0;
else if (strEQ(k, "reconnect_delay")) reconnect_delay = SvIV(v);
else if (strEQ(k, "max_reconnect_attempts")) max_reconnect_attempts = SvIV(v);
else if (strEQ(k, "username")) {
if (SvOK(v)) RETVAL->username = savepv(SvPV_nolen(v));
}
else if (strEQ(k, "password")) {
if (SvOK(v)) RETVAL->password = savepv(SvPV_nolen(v));
}
else if (strEQ(k, "loop")) {
RETVAL->loop = (struct ev_loop *)SvPVX(SvRV(v));
}
}
if (host_sv && path_sv) {
Safefree(RETVAL->rbuf);
Safefree(RETVAL->wbuf);
CLEAR_HANDLER(RETVAL->on_error);
CLEAR_HANDLER(RETVAL->on_connect);
CLEAR_HANDLER(RETVAL->on_disconnect);
if (RETVAL->username) Safefree(RETVAL->username);
if (RETVAL->password) Safefree(RETVAL->password);
Safefree(RETVAL);
croak("cannot specify both 'host' and 'path'");
}
RETVAL->port = port;
if (do_reconnect) {
RETVAL->reconnect = 1;
RETVAL->reconnect_delay_ms = reconnect_delay >= 0 ? reconnect_delay : 0;
RETVAL->max_reconnect_attempts = max_reconnect_attempts >= 0 ? max_reconnect_attempts : 0;
}
if (host_sv && SvOK(host_sv)) {
RETVAL->host = savepv(SvPV_nolen(host_sv));
start_connect(aTHX_ RETVAL);
}
else if (path_sv && SvOK(path_sv)) {
RETVAL->path = savepv(SvPV_nolen(path_sv));
start_connect(aTHX_ RETVAL);
}
}
OUTPUT:
RETVAL
void
DESTROY(EV::Memcached self)
CODE:
{
if (self->magic == MC_MAGIC_FREED) return;
/* If we're inside a callback, defer destruction.
* check_destroyed() in io_cb/timer_cb will Safefree after unwind. */
if (self->callback_depth > 0) {
self->magic = MC_MAGIC_FREED;
/* Stop watchers */
stop_reading(self);
stop_writing(self);
if (self->connect_timer_active) {
src/EV__Memcached.xs view on Meta::CPAN
arm_cmd_timer(self);
}
RETVAL = self->command_timeout_ms;
}
OUTPUT:
RETVAL
void
reconnect(EV::Memcached self, int enable, int delay_ms = 1000, int max_attempts = 0)
CODE:
{
self->reconnect = enable ? 1 : 0;
self->reconnect_delay_ms = delay_ms >= 0 ? delay_ms : 0;
self->max_reconnect_attempts = max_attempts >= 0 ? max_attempts : 0;
if (!enable) {
self->reconnect_attempts = 0;
if (self->reconnect_timer_active) {
ev_timer_stop(self->loop, &self->reconnect_timer);
self->reconnect_timer_active = 0;
}
}
}
int
reconnect_enabled(EV::Memcached self)
CODE:
RETVAL = self->reconnect;
OUTPUT:
RETVAL
int
priority(EV::Memcached self, ...)
CODE:
{
if (items > 1) {
self->priority = SvIV(ST(1));
if (self->priority < -2) self->priority = -2;
if (self->priority > 2) self->priority = 2;
/* Apply to active watchers */
if (self->reading) {
ev_io_stop(self->loop, &self->rio);
ev_set_priority(&self->rio, self->priority);
ev_io_start(self->loop, &self->rio);
} else {
ev_set_priority(&self->rio, self->priority);
}
if (self->writing) {
ev_io_stop(self->loop, &self->wio);
ev_set_priority(&self->wio, self->priority);
ev_io_start(self->loop, &self->wio);
} else {
ev_set_priority(&self->wio, self->priority);
}
}
RETVAL = self->priority;
}
OUTPUT:
RETVAL
int
keepalive(EV::Memcached self, ...)
CODE:
{
if (items > 1) {
self->keepalive = SvIV(ST(1));
if (self->keepalive < 0) self->keepalive = 0;
/* Apply to current connection if active */
if (self->connected && self->fd >= 0 && self->keepalive > 0) {
int one = 1;
setsockopt(self->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
#ifdef TCP_KEEPIDLE
setsockopt(self->fd, IPPROTO_TCP, TCP_KEEPIDLE,
&self->keepalive, sizeof(self->keepalive));
#endif
}
}
RETVAL = self->keepalive;
}
OUTPUT:
RETVAL
void
skip_pending(EV::Memcached self)
CODE:
{
cancel_pending_impl(aTHX_ self, err_skipped, 1);
}
void
skip_waiting(EV::Memcached self)
CODE:
{
cancel_waiting(aTHX_ self, err_skipped);
}
( run in 0.848 second using v1.01-cache-2.11-cpan-39bf76dae61 )