EV-Gearman

 view release on metacpan or  search on metacpan

src/EV__Gearman.xs  view on Meta::CPAN

CODE:
{
    if (self->connected || self->connecting) croak("already connected");
    stop_reconnect_timer(self);
    Safefree(self->path); self->path = savepv(path);
    Safefree(self->host); self->host = NULL;
    self->intentional_disconnect = 0;
    start_connect(aTHX_ self);
}

void
disconnect(EV::Gearman self)
CODE:
{
    self->intentional_disconnect = 1;
    self->worker_active = 0;
    stop_reconnect_timer(self);
    self->callback_depth++;
    if (self->connected || self->connecting) {
        handle_disconnect(aTHX_ self, NULL);
    } else {
        cancel_waiting(aTHX_ self, err_disconnected);
    }
    self->callback_depth--;
    check_destroyed(self);
}

int
is_connected(EV::Gearman self)
CODE:
    RETVAL = self->connected || self->connecting;
OUTPUT:
    RETVAL

int
pending_count(EV::Gearman self)
CODE:
    RETVAL = self->pending_count;
OUTPUT:
    RETVAL

int
waiting_count(EV::Gearman self)
CODE:
    RETVAL = self->waiting_count;
OUTPUT:
    RETVAL

int
active_count(EV::Gearman self)
CODE:
    RETVAL = self->active_count_cached;
OUTPUT:
    RETVAL

# Internal: current (read, write) buffer capacities in bytes. Used by
# the test suite to assert that oversized buffers are released after a
# large packet drains. Not part of the public API.
void
_buf_caps(EV::Gearman self)
PPCODE:
{
    EXTEND(SP, 2);
    mPUSHu(self->rbuf_cap);
    mPUSHu(self->wbuf_cap);
}

SV *
on_error(EV::Gearman self, ...)
CODE:
    RETVAL = handler_accessor(aTHX_ &self->on_error, items,
                              items > 1 ? ST(1) : &PL_sv_undef);
OUTPUT:
    RETVAL

SV *
on_connect(EV::Gearman self, ...)
CODE:
    RETVAL = handler_accessor(aTHX_ &self->on_connect, items,
                              items > 1 ? ST(1) : &PL_sv_undef);
OUTPUT:
    RETVAL

SV *
on_disconnect(EV::Gearman self, ...)
CODE:
    RETVAL = handler_accessor(aTHX_ &self->on_disconnect, items,
                              items > 1 ? ST(1) : &PL_sv_undef);
OUTPUT:
    RETVAL

void
echo(EV::Gearman self, SV *data_sv, SV *cb_sv = &PL_sv_undef)
CODE:
{
    GM_CROAK_UNLESS_ALIVE(self);
    STRLEN dlen;
    const char *data = SvPV(data_sv, dlen);
    ev_gm_req_t *r = alloc_req(CB_ECHO, cb_sv);
    enqueue_packet(aTHX_ self, GM_CMD_ECHO_REQ, data, dlen, r);
}

void
_submit_internal(EV::Gearman self, int cmd_idx, SV *func_sv, SV *workload_sv, SV *unique_sv, SV *opts_sv, SV *cb_sv)
CODE:
{
    /* cmd_idx mapping: 0=submit_job, 1=high, 2=low,
                        3=submit_job_bg, 4=high_bg, 5=low_bg */
    static const uint32_t cmd_map[] = {
        GM_CMD_SUBMIT_JOB, GM_CMD_SUBMIT_JOB_HIGH, GM_CMD_SUBMIT_JOB_LOW,
        GM_CMD_SUBMIT_JOB_BG, GM_CMD_SUBMIT_JOB_HIGH_BG, GM_CMD_SUBMIT_JOB_LOW_BG
    };
    if (cmd_idx < 0 || cmd_idx > 5) croak("invalid submit cmd index");
    GM_CROAK_UNLESS_ALIVE(self);
    int is_bg = (cmd_idx >= 3);

    STRLEN flen, wlen, ulen = 0;
    const char *fname = SvPV(func_sv, flen);
    const char *wload = SvPV(workload_sv, wlen);
    const char *uniq = "";
    if (SvOK(unique_sv)) uniq = SvPV(unique_sv, ulen);



( run in 1.164 second using v1.01-cache-2.11-cpan-5511b514fd6 )