Arcus-Client

 view release on metacpan or  search on metacpan

Base.xs  view on Meta::CPAN

#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"

#include "ppport.h"
#include "libmemcached/memcached.h"

#define MIN_THREAD 1

typedef struct arcus_st {
  memcached_st *proxy;
  memcached_st *global;
  memcached_pool_st *pool;
  int max_thread;
  int cur_thread;
  bool main_proxy;
} Arcus_API;

enum arcus_op {
  ARCUS_SET,
  ARCUS_CAS,
  ARCUS_ADD,
  ARCUS_REPLACE,
  ARCUS_APPEND,
  ARCUS_PREPEND,
  ARCUS_INCR,
  ARCUS_DECR,
  ARCUS_GET,
  ARCUS_GETS
};

const char *ARCUS_OP_NAME[] = {
  "set",
  "cas",
  "add",
  "replace",
  "append",
  "prepend",
  "increment",
  "decrement",
  "get",
  "gets"
};

static inline SV *safe_sv(pTHX_ SV *sv)
{
  SvGETMAGIC(sv);
  if (SvOK(sv)) {
    return sv;
  }
  return NULL;
}

static inline SV **safe_av_fetch(pTHX_ AV *av, SSize_t key, I32 lval)
{
  SV **v = av_fetch(av, key, lval);
  if (v && SvOK(*v)) {
    return v;
  }
  return NULL;
}

static inline void destroy_arcus_api(Arcus_API *arcus)
{
  if (arcus->pool) {
    arcus_pool_close(arcus->pool);
    memcached_pool_destroy(arcus->pool);
    arcus->pool = NULL;
  }
  if (arcus->global) {
    memcached_free(arcus->global);
    arcus->global = NULL;
  }
  if (arcus->main_proxy && arcus->proxy) {
    arcus_proxy_close(arcus->proxy);
    memcached_free(arcus->proxy);
    arcus->proxy = NULL;
  }
}

static void initialize_arcus_api(pTHX_ Arcus_API *arcus, HV *conf)
{
  char *zk_address = NULL;
  char *service_code = NULL;
  int max_thread = -1;

  SV **ps = hv_fetchs(conf, "zk_address", 0);
  if (ps) {
    SvGETMAGIC(*ps);
  }
  if (ps && SvOK(*ps)) {
    if (!SvROK(*ps) || SvTYPE(SvRV(*ps)) != SVt_PVAV) {
      croak("zk_address argument is not array reference.");
    }

    AV *av = (AV *) SvRV(*ps);
    SV *zk = newSVpvn("", 0);
    int i, size = av_count(av);

    for (i = 0; i < size; i++) {
      SV **elem = av_fetch(av, i, 0);
      if (elem && SvOK(*elem)) {
        if (i > 0) {
          sv_catpv(zk, ",");
        }
        sv_catpv(zk, SvPV_nolen(*elem));
      }
    }

    zk_address = SvPV_nolen(zk);
  }
  if (zk_address == NULL) {
    memcached_free(arcus->proxy);
    memcached_free(arcus->global);
    croak("zk_address argument is invalid.");
  }

  ps = hv_fetchs(conf, "service_code", 0);
  if (ps) {
    SvGETMAGIC(*ps);
  }
  if (ps && SvOK(*ps)) {
    service_code = SvPV_nolen(*ps);
  }
  if (service_code == NULL) {
    memcached_free(arcus->proxy);
    memcached_free(arcus->global);
    croak("service_code argument is invalid.");
  }

  ps = hv_fetchs(conf, "max_thread", 0);
  if (ps) {
    SvGETMAGIC(*ps);
  }
  if (ps && SvOK(*ps)) {
    if (SvIOK(*ps)) {
      max_thread = SvIV(*ps);
    }
  }
  if (max_thread < MIN_THREAD) {
    croak("max_thread argument is invalid. it should be greater than or equal to %d.", MIN_THREAD);
  }
  arcus->max_thread = max_thread;

  arcus_return_t ret = arcus_proxy_create(arcus->proxy, zk_address, service_code);
  if (ret != ARCUS_SUCCESS) {
    memcached_free(arcus->proxy);
    memcached_free(arcus->global);
    croak("failed to create the arcus proxy object: %d (%s)", ret, arcus_strerror(ret));
  }

  ps = hv_fetchs(conf, "connect_timeout", 0);
  if (ps) {
    SvGETMAGIC(*ps);
  }
  if (ps && SvOK(*ps)) {
    if (SvIOK(*ps) || SvNOK(*ps)) {
      memcached_behavior_set(arcus->global, MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT, SvNV(*ps) * 1000);
    } else {
      warn("connect_timeout argument is invalid. it is ignored.");
    }
  }

  ps = hv_fetchs(conf, "io_timeout", 0);
  if (ps) {
    SvGETMAGIC(*ps);
  }
  if (ps && SvOK(*ps)) {
    if (SvIOK(*ps) || SvNOK(*ps)) {
      memcached_behavior_set(arcus->global, MEMCACHED_BEHAVIOR_POLL_TIMEOUT, (uint64_t) (SvNV(*ps) * 1000));
    } else {
      warn("io_timeout argument is invalid. it is ignored.");
    }
  }

  ps = hv_fetchs(conf, "nowait", 0);
  if (ps) {
    SvGETMAGIC(*ps);
  }
  if (ps && SvOK(*ps)) {
    memcached_behavior_set(arcus->global, MEMCACHED_BEHAVIOR_NOREPLY, SvTRUE(*ps));
  }

  ps = hv_fetchs(conf, "hash_namespace", 0);
  if (ps) {
    SvGETMAGIC(*ps);
  }
  if (ps && SvOK(*ps)) {
    memcached_behavior_set(arcus->global, MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY, SvTRUE(*ps));
  }

  ps = hv_fetchs(conf, "namespace", 0);
  if (ps) {
    SvGETMAGIC(*ps);
  }
  if (ps && SvOK(*ps)) {
    STRLEN name_len;
    char *namespace = SvPV(*ps, name_len);

    if (namespace && name_len > 0) {
      memcached_set_namespace(arcus->global, namespace, name_len);
    } else if (!namespace && name_len == 0) {
      warn("namespace argument is invalid. it is ignored.");
    }
  }

  arcus->pool = memcached_pool_create(arcus->global, MIN_THREAD, arcus->max_thread);
  if (arcus->pool == NULL) {
    arcus_proxy_close(arcus->proxy);
    memcached_free(arcus->proxy);
    memcached_free(arcus->global);
    croak("failed to create the memcached pool object.");
  }

  ret = arcus_proxy_connect(arcus->global, arcus->pool, arcus->proxy);
  if (ret != ARCUS_SUCCESS) {
    arcus_pool_close(arcus->pool);
    memcached_pool_destroy(arcus->pool);
    arcus_proxy_close(arcus->proxy);
    memcached_free(arcus->proxy);
    memcached_free(arcus->global);
    croak("failed to connect to the proxy: %d (%s)", ret, arcus_strerror(ret));
  }
}

MODULE = Arcus::Base		PACKAGE = Arcus::Base

void
new(class, self)
  char *class
  SV *self
  PPCODE:
  Arcus_API *arcus = NULL;
  if (SvROK(self) && SvTYPE(SvRV(self)) == SVt_PVHV) {
    EXTEND(SP, 1);
    Newx(arcus, 1, Arcus_API);
    arcus->global = memcached_create(NULL);
    if (arcus->global == NULL) {
      croak("Failed to create the global memcached object");
    }
    arcus->proxy = memcached_create(NULL);
    if (arcus->proxy == NULL) {
      croak("Failed to create the proxy memcached object");
    }
    arcus->global = memcached_create(NULL);
    arcus->cur_thread = 0;
    arcus->main_proxy = true;
    initialize_arcus_api(aTHX_ arcus, (HV *) SvRV(self));
    SV* sv = newSV(0);
    sv_setref_pv(sv, class, (void*)arcus);
    mXPUSHs(sv);
  } else {
    arcus = (Arcus_API *)SvUV(self);
  }
  arcus->cur_thread++;

void
DESTROY(arcus)
  Arcus_API *arcus
  CODE:
  arcus->cur_thread--;
  if (!arcus->cur_thread) {
    destroy_arcus_api(arcus);
  }

void
connect_proxy(arcus)
  Arcus_API *arcus
  CODE:
  arcus->cur_thread = 1;
  arcus->main_proxy = false;
  arcus->global = memcached_clone(NULL, arcus->global);
  if (arcus->global == NULL) {
    croak("Failed to create the global memcached object");
  }
  arcus->pool = memcached_pool_create(arcus->global, MIN_THREAD, arcus->max_thread);
  if (arcus->pool == NULL) {
    memcached_free(arcus->global);
    arcus->global = NULL;
    croak("Failed to create the memcached pool object");
  }
  arcus_return_t arcus_ret = arcus_proxy_connect(arcus->global, arcus->pool, arcus->proxy);
  if (arcus_ret != ARCUS_SUCCESS) {
    arcus->proxy = NULL;
    destroy_arcus_api(arcus);
    croak("Failed to connect : %d (%s)", arcus_ret, arcus_strerror(arcus_ret));
  }

SV *
cas(arcus, key, cas, value, ...)
  Arcus_API *arcus
  SV *key
  SV *cas
  SV *value
  PREINIT:
  time_t exptime = 0;
  int flags = 0;
  int arg = 4;
  CODE:
  RETVAL = NULL;
  memcached_return_t ret;
  STRLEN key_length, value_length;
  char *key_ptr = NULL;
  uint64_t cas_value = 0;
  char *value_ptr = NULL;
  SV *sv;

  memcached_st *mc = memcached_pool_pop(arcus->pool, true, &ret);
  if (mc == NULL) {
    warn("Failed to create the memcached object : %d (%s)", ret, memcached_strerror(NULL, ret));
    goto do_return;
  }

  if (!SvOK(key) || (key_ptr = SvPV(key, key_length)) == NULL) {
    warn("key argument is invalid.");
    goto do_return;
  }
  if (!SvIOK(cas) || (cas_value = SvIV(cas)) == 0) {
    warn("cas argument is invalid.");
    goto do_return;
  }
  if (!SvOK(value) || (value_ptr = SvPV(value, value_length)) == NULL) {
    warn("value argument is invalid.");
    goto do_return;
  }

  if (items > arg && (sv = safe_sv(aTHX_ ST(arg++))) != NULL) {
    if (!SvIOK(sv)) {
      warn("exptime argument is invalid.");
      goto do_return;
    }
    exptime = (time_t) SvIV(sv);
  }
  if (items > arg && (sv = safe_sv(aTHX_ ST(arg++))) != NULL) {
    if (!SvIOK(sv)) {
      warn("flags argument is invalid.");



( run in 0.877 second using v1.01-cache-2.11-cpan-39bf76dae61 )