Redis-Jet

 view release on metacpan or  search on metacpan

lib/Redis/Jet.xs  view on Meta::CPAN

disconnect_socket (pTHX_ Redis_Jet * self) {
  self->fileno = 0;
  if ( hv_exists(self->bucket, "socket", strlen("socket")) ) {
    (void)hv_delete(self->bucket, "socket", strlen("socket"), 0);
  }
}

MODULE = Redis::Jet    PACKAGE = Redis::Jet

PROTOTYPES: DISABLE

Redis_Jet *
_new(class, args)
    char * class
    SV * args
  PREINIT:
    Redis_Jet * self;
    STRLEN server_len;
    char * s;
    SV **server_ssv;
  CODE:
    Newxz(self, sizeof(Redis_Jet), Redis_Jet);
    if ( SvTYPE(SvRV(args)) == SVt_PVHV) {
      server_ssv = hv_fetch((HV *)SvRV(args), "server", strlen("server"),0);
      if ( server_ssv ) {
        self->server = newSVsv(*server_ssv);
      }
      else {
        self->server = newSVpvs("127.0.0.1:6379");
      }
      self->utf8 = hv_fetch_iv_positive_number(aTHX_ (HV *)SvRV(args), "utf8", 0);
      self->connect_timeout = hv_fetch_nv_positive_number(aTHX_ (HV *)SvRV(args), "connect_timeout", 10);
      self->io_timeout = hv_fetch_nv_positive_number(aTHX_ (HV *)SvRV(args), "io_timeout", 10);
      self->noreply = hv_fetch_iv_positive_number(aTHX_ (HV *)SvRV(args), "noreply", 0);
      self->reconnect_attempts = hv_fetch_iv_positive_number(aTHX_ (HV *)SvRV(args), "reconnect_attempts", 0);
      self->reconnect_delay = hv_fetch_nv_positive_number(aTHX_ (HV *)SvRV(args), "reconnect_delay", 10);
      self->bucket = newHV();
    }
    else {
      croak("Not a hash reference");
    }
    self->request_buf_len = 0;
    self->read_buf_len = 0;
    self->response_st_len = 0;
    RETVAL = self;
  OUTPUT:
    RETVAL

HV *
get_bucket(self)
    Redis_Jet * self
  CODE:
    RETVAL = self->bucket;
  OUTPUT:
    RETVAL

SV *
get_server(self)
    Redis_Jet * self
  PREINIT:
  PPCODE:
    XPUSHs(self->server);


double
get_connect_timeout(self)
    Redis_Jet * self
  CODE:
    RETVAL = self->connect_timeout;
  OUTPUT:
    RETVAL

double
get_io_timeout(self)
    Redis_Jet * self
  CODE:
    RETVAL = self->io_timeout;
  OUTPUT:
    RETVAL

int
get_utf8(self)
    Redis_Jet * self
  CODE:
    RETVAL = self->utf8;
  OUTPUT:
    RETVAL

int
get_noreply(self)
    Redis_Jet * self
  CODE:
    RETVAL = self->noreply;
  OUTPUT:
    RETVAL

int
set_fileno(self,fileno)
    Redis_Jet * self
    int fileno
  CODE:
    RETVAL = self->fileno = fileno;
  OUTPUT:
    RETVAL

SV *
_destroy(self)
    Redis_Jet * self
  CODE:
    if ( self->request_buf_len != 0 ) {
      Safefree(self->request_buf);
    }
    if ( self->response_st_len != 0 ) {
      Safefree(self->response_st);
    }
    if ( self->read_buf_len != 0 ) {
      Safefree(self->read_buf);
    }
    disconnect_socket(aTHX_ self);
    SvREFCNT_dec((SV*)self->server);
    SvREFCNT_dec((SV*)self->bucket);

lib/Redis/Jet.xs  view on Meta::CPAN

    readed = 0;
    while ( buf_len > 0 ) {
      data_sv = newSV(0);
      (void)SvUPGRADE(data_sv, SVt_PV);
      error_sv = newSV(0);
      (void)SvUPGRADE(error_sv, SVt_PV);
      ret = _parse_message(aTHX_ buf, buf_len, data_sv, error_sv, ix);
      if ( ret == -1 ) {
        XSRETURN_UNDEF;
      }
      else if ( ret == -2 ) {
        break;
      }
      else {
        data_av = newAV();
        av_push(data_av, data_sv);
        if ( SvOK(error_sv) ) {
          av_push(data_av, error_sv);
        } else {
          sv_2mortal(error_sv);
        }
        av_push(res_av, newRV_noinc((SV *) data_av));
        readed += ret;
        buf_len -= ret;
        buf = &buf[ret];
      }
    }
    RETVAL = newSViv(readed);
  OUTPUT:
    RETVAL

SV *
command(self,...)
    Redis_Jet * self
  ALIAS:
    Redis::Jet::command = 0
    Redis::Jet::pipeline = 1
  PREINIT:
    AV * data_av;
    SV * data_sv;
    SV * error_sv;
    ssize_t i, j;
    long int ret;
    /* build */
    int args_offset = 1;
    int fig;
    int connect_retry = 0;
    ssize_t pipeline_len = 1;
    ssize_t request_len = 0;
    STRLEN request_arg_len = 0;
    char * request_arg;
    AV * request_arg_list;
    /* send */
    ssize_t written;
    char * write_buf;
    /* response */
    ssize_t readed;
    ssize_t parse_offset;
    ssize_t parsed_response;
    long int parse_result;
  PPCODE:
    /* init */
    if ( self->request_buf_len == 0 ) {
      Newx(self->request_buf, REQUEST_BUF_SIZE, char);
      self->request_buf_len = REQUEST_BUF_SIZE;
    }
    if ( self->read_buf_len == 0 ) {
      Newx(self->read_buf, READ_MAX, char);
      self->read_buf_len = READ_MAX;
    }
    if ( self->response_st_len == 0 ) {
      Newx(self->response_st, sizeof(struct jet_response_st)*10, struct jet_response_st);
      self->response_st_len = 30;
    }
    DO_CONNECT:
    /* connect */
    if ( self->fileno == 0 ) {
      {
        dSP;
        ENTER;
        SAVETMPS;
        PUSHMARK(SP);
        XPUSHs(ST(0));
        PUTBACK;
        call_method("connect", G_DISCARD);
        FREETMPS;
        LEAVE;
      }
      if ( self->fileno == 0 ) {
        /* connection error */
        disconnect_socket(aTHX_ self);
        if ( self->reconnect_attempts > connect_retry ) {
          connect_retry++;
          usleep(self->reconnect_delay*1000000); /* micro-sec */
          goto DO_CONNECT;
        }
        if ( PIPELINE(ix) ) {
          pipeline_len = items - args_offset;
          EXTEND(SP, pipeline_len);
          for (i=0; i<pipeline_len; i++) {
            data_av = newAV();
            (void)av_push(data_av, &PL_sv_undef);
            (void)av_push(data_av, newSVpvf("failed to connect server: %s",
              ( errno != 0 ) ? strerror(errno) : "timeout"));
            PUSHs( sv_2mortal(newRV_noinc((SV *) data_av)) );
          }
        }
        else {
          EXTEND(SP, 2);
          PUSHs(&PL_sv_undef);
          PUSHs(sv_2mortal(newSVpvf("failed to connect server: %s",
              ( errno != 0 ) ? strerror(errno) : "timeout")));
        }
        goto COMMAND_DONE;
      }
    }

    /* connection successful */
    connect_retry = 0;

    /* char * s = SvPV_nolen(ST(1)); */



( run in 0.910 second using v1.01-cache-2.11-cpan-5511b514fd6 )