EV-Memcached
view release on metacpan or search on metacpan
- Bug fixes, polish, expanded tests
0.01 2026-04-06
- Initial release
- Pure XS binary protocol, no external C library
- Core commands: get/gets, set/add/replace/cas, delete, incr/decr,
append/prepend, touch/gat/gats, flush, version, noop, quit, stats
- Multi-get (mget/mgets), fire-and-forget set/flush (SETQ/FLUSHQ)
- SASL PLAIN auth (manual + auto-auth on connect/reconnect)
- Pipelining, flow control, reconnection, connect/command timeouts
- TCP, Unix socket, keepalive, EV event loop integration
path => $str
Unix socket path. Mutually exclusive with "host".
loop => $ev_loop
EV loop to attach to. Default: "EV::default_loop".
priority => $num (-2 to +2)
EV watcher priority. Higher = serviced before other EV watchers.
keepalive => $seconds
TCP keepalive idle time. Set to 0 to disable. Ignored on Unix sockets.
Timeouts and flow control
connect_timeout => $ms
Abort an in-progress non-blocking connect after this many milliseconds.
0 = no timeout (default). Does not apply to Unix sockets or to
immediately-completing localhost connects.
command_timeout => $ms
Disconnect with "command timeout" error if no response arrives within
this interval. The timer resets on every response from the server. 0 =
pending_count
Number of commands sent and awaiting a response.
waiting_count
Number of commands held in the local waiting queue (because the connection
is not ready, SASL is in progress, or "max_pending" is saturated).
ACCESSORS
Every option from "new" has a getter/setter of the same name. Calling
without arguments reads the current value; with one argument it writes and
(where meaningful, e.g. "keepalive") takes effect immediately.
connect_timeout([$ms])
command_timeout([$ms])
max_pending([$num])
waiting_timeout([$ms])
resume_waiting_on_reconnect([$bool])
priority([$num])
keepalive([$seconds])
"reconnect_enabled"
Read-only; configure via "reconnect".
"reconnect($enable, [$delay_ms], [$max_attempts])"
Reconfigure auto-reconnect at runtime.
on_error([$cb])
on_connect([$cb])
on_disconnect([$cb])
Get/set the corresponding handler. Pass "undef" to clear.
If $mc goes out of scope while commands are in flight or queued, every
pending and waiting callback fires once with "(undef, "disconnected")". This
holds whether you call "disconnect" first or simply drop the reference.
The clean shutdown idiom is:
$mc->disconnect; # drains queues, fires on_disconnect
undef $mc;
If a callback closes over $mc (a common mistake -- every reference inside a
callback closure keeps the object alive), break the cycle before dropping
the outer reference:
$mc->on_error(undef);
$mc->on_connect(undef);
$mc->on_disconnect(undef);
undef $mc;
DESTROY is reentrant-safe: if a callback fired during teardown drops the
last external reference to a separate "EV::Memcached", that object's DESTROY
is correctly deferred and run once unwound.
lib/EV/Memcached.pm view on Meta::CPAN
Unix socket path. Mutually exclusive with C<host>.
=item loop => $ev_loop
EV loop to attach to. Default: C<EV::default_loop>.
=item priority => $num (-2 to +2)
EV watcher priority. Higher = serviced before other EV watchers.
=item keepalive => $seconds
TCP keepalive idle time. Set to 0 to disable. Ignored on Unix sockets.
=back
=head3 Timeouts and flow control
=over
=item connect_timeout => $ms
Abort an in-progress non-blocking connect after this many milliseconds.
lib/EV/Memcached.pm view on Meta::CPAN
=head2 waiting_count
Number of commands held in the local waiting queue (because the
connection is not ready, SASL is in progress, or C<max_pending> is
saturated).
=head1 ACCESSORS
Every option from C<new> has a getter/setter of the same name. Calling
without arguments reads the current value; with one argument it writes
and (where meaningful, e.g. C<keepalive>) takes effect immediately.
=over
=item C<connect_timeout([$ms])>
=item C<command_timeout([$ms])>
=item C<max_pending([$num])>
=item C<waiting_timeout([$ms])>
=item C<resume_waiting_on_reconnect([$bool])>
=item C<priority([$num])>
=item C<keepalive([$seconds])>
=item C<reconnect_enabled>
Read-only; configure via C<reconnect>.
=item C<reconnect($enable, [$delay_ms], [$max_attempts])>
Reconfigure auto-reconnect at runtime.
=item C<on_error([$cb])>
lib/EV/Memcached.pm view on Meta::CPAN
every pending and waiting callback fires once with
C<(undef, "disconnected")>. This holds whether you call C<disconnect>
first or simply drop the reference.
The clean shutdown idiom is:
$mc->disconnect; # drains queues, fires on_disconnect
undef $mc;
If a callback closes over C<$mc> (a common mistake -- every reference
inside a callback closure keeps the object alive), break the cycle
before dropping the outer reference:
$mc->on_error(undef);
$mc->on_connect(undef);
$mc->on_disconnect(undef);
undef $mc;
DESTROY is reentrant-safe: if a callback fired during teardown drops
the last external reference to a separate C<EV::Memcached>, that
object's DESTROY is correctly deferred and run once unwound.
src/EV__Memcached.xs view on Meta::CPAN
ev_timer waiting_timer;
int waiting_timer_active;
/* Safety */
int callback_depth;
int in_cb_cleanup;
int in_wait_cleanup;
/* Options */
int priority;
int keepalive;
/* SASL auth */
char *username;
char *password;
};
/* ================================================================
* Shared error strings (initialized in BOOT)
* ================================================================ */
src/EV__Memcached.xs view on Meta::CPAN
uint64_t cas, int cmd, int quiet, SV *cb);
static void start_reading(ev_mc_t *self);
static void stop_reading(ev_mc_t *self);
static void start_writing(ev_mc_t *self);
static void stop_writing(ev_mc_t *self);
static void start_connect(pTHX_ ev_mc_t *self);
static void cleanup_connection(pTHX_ ev_mc_t *self);
static void emit_error(pTHX_ ev_mc_t *self, const char *msg);
static void handle_disconnect(pTHX_ ev_mc_t *self, const char *reason);
static void schedule_reconnect(pTHX_ ev_mc_t *self);
static void apply_keepalive(ev_mc_t *self);
static void report_connect_error(pTHX_ ev_mc_t *self, const char *errbuf);
static void finish_connect_success(pTHX_ ev_mc_t *self);
static void mc_send_sasl_auth(pTHX_ ev_mc_t *self, SV *cb);
static void stop_connect_timer(ev_mc_t *self);
static void stop_reconnect_timer(ev_mc_t *self);
static void stop_waiting_timer(ev_mc_t *self);
static void send_next_waiting(pTHX_ ev_mc_t *self);
static int check_destroyed(ev_mc_t *self);
static void cancel_pending(pTHX_ ev_mc_t *self, SV *err_sv);
static void cancel_waiting(pTHX_ ev_mc_t *self, SV *err_sv);
src/EV__Memcached.xs view on Meta::CPAN
}
static void emit_connect(pTHX_ ev_mc_t *self) {
invoke_handler(aTHX_ self, self->on_connect, NULL, "on_connect");
}
static void emit_disconnect(pTHX_ ev_mc_t *self) {
invoke_handler(aTHX_ self, self->on_disconnect, NULL, "on_disconnect");
}
static void apply_keepalive(ev_mc_t *self) {
if (self->keepalive <= 0 || self->path) return;
int one = 1;
setsockopt(self->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
#ifdef TCP_KEEPIDLE
setsockopt(self->fd, IPPROTO_TCP, TCP_KEEPIDLE,
&self->keepalive, sizeof(self->keepalive));
#endif
}
/* Common tail for synchronous connect-failure paths in start_connect:
emit error, run pending callbacks, and arm reconnect if configured.
Caller returns immediately after invoking. */
static void report_connect_error(pTHX_ ev_mc_t *self, const char *errbuf) {
self->callback_depth++;
emit_error(aTHX_ self, errbuf);
self->callback_depth--;
src/EV__Memcached.xs view on Meta::CPAN
}
/* Shared post-connect-success path used by both on_connect_complete (after
async EINPROGRESS resolves) and start_connect (when connect(2) returns
immediately). Caller must already have set self->connected = 1 and
stopped/initialized the io watchers as required by its path. */
static void finish_connect_success(pTHX_ ev_mc_t *self) {
self->reconnect_attempts = 0;
start_reading(self);
apply_keepalive(self);
mc_send_sasl_auth(aTHX_ self, NULL);
emit_connect(aTHX_ self);
if (check_destroyed(self)) return;
/* Drain wait_queue immediately unless we are waiting for SASL_AUTH
to complete; the SASL response handler calls send_next_waiting. */
if (!self->username || !self->password)
send_next_waiting(aTHX_ self);
src/EV__Memcached.xs view on Meta::CPAN
}
else if (strEQ(k, "on_disconnect")) {
if (SvOK(v) && SvROK(v)) RETVAL->on_disconnect = newSVsv(v);
}
else if (strEQ(k, "max_pending")) RETVAL->max_pending = SvIV(v);
else if (strEQ(k, "waiting_timeout")) RETVAL->waiting_timeout_ms = SvIV(v);
else if (strEQ(k, "connect_timeout")) RETVAL->connect_timeout_ms = SvIV(v);
else if (strEQ(k, "command_timeout")) RETVAL->command_timeout_ms = SvIV(v);
else if (strEQ(k, "resume_waiting_on_reconnect")) RETVAL->resume_waiting_on_reconnect = SvTRUE(v) ? 1 : 0;
else if (strEQ(k, "priority")) RETVAL->priority = SvIV(v);
else if (strEQ(k, "keepalive")) RETVAL->keepalive = SvIV(v);
else if (strEQ(k, "reconnect")) do_reconnect = SvTRUE(v) ? 1 : 0;
else if (strEQ(k, "reconnect_delay")) reconnect_delay = SvIV(v);
else if (strEQ(k, "max_reconnect_attempts")) max_reconnect_attempts = SvIV(v);
else if (strEQ(k, "username")) {
if (SvOK(v)) RETVAL->username = savepv(SvPV_nolen(v));
}
else if (strEQ(k, "password")) {
if (SvOK(v)) RETVAL->password = savepv(SvPV_nolen(v));
}
else if (strEQ(k, "loop")) {
src/EV__Memcached.xs view on Meta::CPAN
} else {
ev_set_priority(&self->wio, self->priority);
}
}
RETVAL = self->priority;
}
OUTPUT:
RETVAL
int
keepalive(EV::Memcached self, ...)
CODE:
{
if (items > 1) {
self->keepalive = SvIV(ST(1));
if (self->keepalive < 0) self->keepalive = 0;
if (self->connected && self->fd >= 0)
apply_keepalive(self);
}
RETVAL = self->keepalive;
}
OUTPUT:
RETVAL
void
skip_pending(EV::Memcached self)
CODE:
{
self->callback_depth++;
cancel_pending_impl(aTHX_ self, err_skipped, 1);
t/04_features.t view on Meta::CPAN
run_ev();
$mc->disconnect;
}
# --- new() constructor in XS ---
{
my $mc = EV::Memcached->new(
host => $host,
port => $port,
max_pending => 10,
keepalive => 5,
priority => 1,
connect_timeout => 3000,
on_error => sub { diag "error: @_" },
);
$mc->on_connect(sub { EV::break });
my $t = EV::timer 5, 0, sub { fail("timeout"); EV::break };
EV::run;
ok($mc->is_connected, "XS new() connected");
is($mc->max_pending, 10, "XS new() max_pending");
( run in 1.517 second using v1.01-cache-2.11-cpan-99c4e6809bf )