EV-Memcached

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

    - Bug fixes, polish, expanded tests

0.01  2026-04-06
    - Initial release
    - Pure XS binary protocol, no external C library
    - Core commands: get/gets, set/add/replace/cas, delete, incr/decr,
      append/prepend, touch/gat/gats, flush, version, noop, quit, stats
    - Multi-get (mget/mgets), fire-and-forget set/flush (SETQ/FLUSHQ)
    - SASL PLAIN auth (manual + auto-auth on connect/reconnect)
    - Pipelining, flow control, reconnection, connect/command timeouts
    - TCP, Unix socket, keepalive, EV event loop integration

README  view on Meta::CPAN


    path => $str
        Unix socket path. Mutually exclusive with "host".

    loop => $ev_loop
        EV loop to attach to. Default: "EV::default_loop".

    priority => $num (-2 to +2)
        EV watcher priority. Higher = serviced before other EV watchers.

    keepalive => $seconds
        TCP keepalive idle time. Set to 0 to disable. Ignored on Unix sockets.

   Timeouts and flow control
    connect_timeout => $ms
        Abort an in-progress non-blocking connect after this many milliseconds.
        0 = no timeout (default). Does not apply to Unix sockets or to
        immediately-completing localhost connects.

    command_timeout => $ms
        Disconnect with "command timeout" error if no response arrives within
        this interval. The timer resets on every response from the server. 0 =

README  view on Meta::CPAN

  pending_count
    Number of commands sent and awaiting a response.

  waiting_count
    Number of commands held in the local waiting queue (because the connection
    is not ready, SASL is in progress, or "max_pending" is saturated).

ACCESSORS
    Every option from "new" has a getter/setter of the same name. Calling
    without arguments reads the current value; with one argument it writes and
    (where meaningful, e.g. "keepalive") takes effect immediately.

    connect_timeout([$ms])
    command_timeout([$ms])
    max_pending([$num])
    waiting_timeout([$ms])
    resume_waiting_on_reconnect([$bool])
    priority([$num])
    keepalive([$seconds])
    "reconnect_enabled"
        Read-only; configure via "reconnect".

    "reconnect($enable, [$delay_ms], [$max_attempts])"
        Reconfigure auto-reconnect at runtime.

    on_error([$cb])
    on_connect([$cb])
    on_disconnect([$cb])
        Get/set the corresponding handler. Pass "undef" to clear.

README  view on Meta::CPAN

    If $mc goes out of scope while commands are in flight or queued, every
    pending and waiting callback fires once with "(undef, "disconnected")". This
    holds whether you call "disconnect" first or simply drop the reference.

    The clean shutdown idiom is:

        $mc->disconnect;   # drains queues, fires on_disconnect
        undef $mc;

    If a callback closes over $mc (a common mistake -- every reference inside a
    callback closure keeps the object alive), break the cycle before dropping
    the outer reference:

        $mc->on_error(undef);
        $mc->on_connect(undef);
        $mc->on_disconnect(undef);
        undef $mc;

    DESTROY is reentrant-safe: if a callback fired during teardown drops the
    last external reference to a separate "EV::Memcached", that object's DESTROY
    is correctly deferred and run once unwound.

lib/EV/Memcached.pm  view on Meta::CPAN

Unix socket path. Mutually exclusive with C<host>.

=item loop => $ev_loop

EV loop to attach to. Default: C<EV::default_loop>.

=item priority => $num (-2 to +2)

EV watcher priority. Higher = serviced before other EV watchers.

=item keepalive => $seconds

TCP keepalive idle time. Set to 0 to disable. Ignored on Unix sockets.

=back

=head3 Timeouts and flow control

=over

=item connect_timeout => $ms

Abort an in-progress non-blocking connect after this many milliseconds.

lib/EV/Memcached.pm  view on Meta::CPAN

=head2 waiting_count

Number of commands held in the local waiting queue (because the
connection is not ready, SASL is in progress, or C<max_pending> is
saturated).

=head1 ACCESSORS

Every option from C<new> has a getter/setter of the same name. Calling
without arguments reads the current value; with one argument it writes
and (where meaningful, e.g. C<keepalive>) takes effect immediately.

=over

=item C<connect_timeout([$ms])>

=item C<command_timeout([$ms])>

=item C<max_pending([$num])>

=item C<waiting_timeout([$ms])>

=item C<resume_waiting_on_reconnect([$bool])>

=item C<priority([$num])>

=item C<keepalive([$seconds])>

=item C<reconnect_enabled>

Read-only; configure via C<reconnect>.

=item C<reconnect($enable, [$delay_ms], [$max_attempts])>

Reconfigure auto-reconnect at runtime.

=item C<on_error([$cb])>

lib/EV/Memcached.pm  view on Meta::CPAN

every pending and waiting callback fires once with
C<(undef, "disconnected")>. This holds whether you call C<disconnect>
first or simply drop the reference.

The clean shutdown idiom is:

    $mc->disconnect;   # drains queues, fires on_disconnect
    undef $mc;

If a callback closes over C<$mc> (a common mistake -- every reference
inside a callback closure keeps the object alive), break the cycle
before dropping the outer reference:

    $mc->on_error(undef);
    $mc->on_connect(undef);
    $mc->on_disconnect(undef);
    undef $mc;

DESTROY is reentrant-safe: if a callback fired during teardown drops
the last external reference to a separate C<EV::Memcached>, that
object's DESTROY is correctly deferred and run once unwound.

src/EV__Memcached.xs  view on Meta::CPAN

    ev_timer waiting_timer;
    int waiting_timer_active;

    /* Safety */
    int callback_depth;
    int in_cb_cleanup;
    int in_wait_cleanup;

    /* Options */
    int priority;
    int keepalive;

    /* SASL auth */
    char *username;
    char *password;
};

/* ================================================================
 * Shared error strings (initialized in BOOT)
 * ================================================================ */

src/EV__Memcached.xs  view on Meta::CPAN

    uint64_t cas, int cmd, int quiet, SV *cb);
static void start_reading(ev_mc_t *self);
static void stop_reading(ev_mc_t *self);
static void start_writing(ev_mc_t *self);
static void stop_writing(ev_mc_t *self);
static void start_connect(pTHX_ ev_mc_t *self);
static void cleanup_connection(pTHX_ ev_mc_t *self);
static void emit_error(pTHX_ ev_mc_t *self, const char *msg);
static void handle_disconnect(pTHX_ ev_mc_t *self, const char *reason);
static void schedule_reconnect(pTHX_ ev_mc_t *self);
static void apply_keepalive(ev_mc_t *self);
static void report_connect_error(pTHX_ ev_mc_t *self, const char *errbuf);
static void finish_connect_success(pTHX_ ev_mc_t *self);
static void mc_send_sasl_auth(pTHX_ ev_mc_t *self, SV *cb);
static void stop_connect_timer(ev_mc_t *self);
static void stop_reconnect_timer(ev_mc_t *self);
static void stop_waiting_timer(ev_mc_t *self);
static void send_next_waiting(pTHX_ ev_mc_t *self);
static int check_destroyed(ev_mc_t *self);
static void cancel_pending(pTHX_ ev_mc_t *self, SV *err_sv);
static void cancel_waiting(pTHX_ ev_mc_t *self, SV *err_sv);

src/EV__Memcached.xs  view on Meta::CPAN

}

static void emit_connect(pTHX_ ev_mc_t *self) {
    invoke_handler(aTHX_ self, self->on_connect, NULL, "on_connect");
}

static void emit_disconnect(pTHX_ ev_mc_t *self) {
    invoke_handler(aTHX_ self, self->on_disconnect, NULL, "on_disconnect");
}

static void apply_keepalive(ev_mc_t *self) {
    if (self->keepalive <= 0 || self->path) return;
    int one = 1;
    setsockopt(self->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
#ifdef TCP_KEEPIDLE
    setsockopt(self->fd, IPPROTO_TCP, TCP_KEEPIDLE,
               &self->keepalive, sizeof(self->keepalive));
#endif
}

/* Common tail for synchronous connect-failure paths in start_connect:
   emit error, run pending callbacks, and arm reconnect if configured.
   Caller returns immediately after invoking. */
static void report_connect_error(pTHX_ ev_mc_t *self, const char *errbuf) {
    self->callback_depth++;
    emit_error(aTHX_ self, errbuf);
    self->callback_depth--;

src/EV__Memcached.xs  view on Meta::CPAN

}

/* Shared post-connect-success path used by both on_connect_complete (after
   async EINPROGRESS resolves) and start_connect (when connect(2) returns
   immediately). Caller must already have set self->connected = 1 and
   stopped/initialized the io watchers as required by its path. */
static void finish_connect_success(pTHX_ ev_mc_t *self) {
    self->reconnect_attempts = 0;

    start_reading(self);
    apply_keepalive(self);

    mc_send_sasl_auth(aTHX_ self, NULL);

    emit_connect(aTHX_ self);
    if (check_destroyed(self)) return;

    /* Drain wait_queue immediately unless we are waiting for SASL_AUTH
       to complete; the SASL response handler calls send_next_waiting. */
    if (!self->username || !self->password)
        send_next_waiting(aTHX_ self);

src/EV__Memcached.xs  view on Meta::CPAN

        }
        else if (strEQ(k, "on_disconnect")) {
            if (SvOK(v) && SvROK(v)) RETVAL->on_disconnect = newSVsv(v);
        }
        else if (strEQ(k, "max_pending"))            RETVAL->max_pending = SvIV(v);
        else if (strEQ(k, "waiting_timeout"))        RETVAL->waiting_timeout_ms = SvIV(v);
        else if (strEQ(k, "connect_timeout"))        RETVAL->connect_timeout_ms = SvIV(v);
        else if (strEQ(k, "command_timeout"))        RETVAL->command_timeout_ms = SvIV(v);
        else if (strEQ(k, "resume_waiting_on_reconnect")) RETVAL->resume_waiting_on_reconnect = SvTRUE(v) ? 1 : 0;
        else if (strEQ(k, "priority"))               RETVAL->priority = SvIV(v);
        else if (strEQ(k, "keepalive"))              RETVAL->keepalive = SvIV(v);
        else if (strEQ(k, "reconnect"))              do_reconnect = SvTRUE(v) ? 1 : 0;
        else if (strEQ(k, "reconnect_delay"))        reconnect_delay = SvIV(v);
        else if (strEQ(k, "max_reconnect_attempts")) max_reconnect_attempts = SvIV(v);
        else if (strEQ(k, "username")) {
            if (SvOK(v)) RETVAL->username = savepv(SvPV_nolen(v));
        }
        else if (strEQ(k, "password")) {
            if (SvOK(v)) RETVAL->password = savepv(SvPV_nolen(v));
        }
        else if (strEQ(k, "loop")) {

src/EV__Memcached.xs  view on Meta::CPAN

        } else {
            ev_set_priority(&self->wio, self->priority);
        }
    }
    RETVAL = self->priority;
}
OUTPUT:
    RETVAL

int
keepalive(EV::Memcached self, ...)
CODE:
{
    if (items > 1) {
        self->keepalive = SvIV(ST(1));
        if (self->keepalive < 0) self->keepalive = 0;
        if (self->connected && self->fd >= 0)
            apply_keepalive(self);
    }
    RETVAL = self->keepalive;
}
OUTPUT:
    RETVAL

void
skip_pending(EV::Memcached self)
CODE:
{
    self->callback_depth++;
    cancel_pending_impl(aTHX_ self, err_skipped, 1);

t/04_features.t  view on Meta::CPAN

    run_ev();
    $mc->disconnect;
}

# --- new() constructor in XS ---
{
    my $mc = EV::Memcached->new(
        host            => $host,
        port            => $port,
        max_pending     => 10,
        keepalive       => 5,
        priority        => 1,
        connect_timeout => 3000,
        on_error        => sub { diag "error: @_" },
    );
    $mc->on_connect(sub { EV::break });
    my $t = EV::timer 5, 0, sub { fail("timeout"); EV::break };
    EV::run;

    ok($mc->is_connected, "XS new() connected");
    is($mc->max_pending, 10, "XS new() max_pending");



( run in 1.517 second using v1.01-cache-2.11-cpan-99c4e6809bf )