Arcus-Client
view release on metacpan or search on metacpan
#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "ppport.h"
#include "libmemcached/memcached.h"
#define MIN_THREAD 1
typedef struct arcus_st {
memcached_st *proxy;
memcached_st *global;
memcached_pool_st *pool;
int max_thread;
int cur_thread;
bool main_proxy;
} Arcus_API;
enum arcus_op {
ARCUS_SET,
ARCUS_CAS,
ARCUS_ADD,
ARCUS_REPLACE,
ARCUS_APPEND,
ARCUS_PREPEND,
ARCUS_INCR,
ARCUS_DECR,
ARCUS_GET,
ARCUS_GETS
};
const char *ARCUS_OP_NAME[] = {
"set",
"cas",
"add",
"replace",
"append",
"prepend",
"increment",
"decrement",
"get",
"gets"
};
static inline SV *safe_sv(pTHX_ SV *sv)
{
SvGETMAGIC(sv);
if (SvOK(sv)) {
return sv;
}
return NULL;
}
static inline SV **safe_av_fetch(pTHX_ AV *av, SSize_t key, I32 lval)
{
SV **v = av_fetch(av, key, lval);
if (v && SvOK(*v)) {
return v;
}
return NULL;
}
static inline void destroy_arcus_api(Arcus_API *arcus)
{
if (arcus->pool) {
arcus_pool_close(arcus->pool);
memcached_pool_destroy(arcus->pool);
arcus->pool = NULL;
}
if (arcus->global) {
memcached_free(arcus->global);
arcus->global = NULL;
}
if (arcus->main_proxy && arcus->proxy) {
arcus_proxy_close(arcus->proxy);
memcached_free(arcus->proxy);
arcus->proxy = NULL;
}
}
static void initialize_arcus_api(pTHX_ Arcus_API *arcus, HV *conf)
{
char *zk_address = NULL;
char *service_code = NULL;
int max_thread = -1;
SV **ps = hv_fetchs(conf, "zk_address", 0);
if (ps) {
SvGETMAGIC(*ps);
}
if (ps && SvOK(*ps)) {
if (!SvROK(*ps) || SvTYPE(SvRV(*ps)) != SVt_PVAV) {
croak("zk_address argument is not array reference.");
}
AV *av = (AV *) SvRV(*ps);
SV *zk = newSVpvn("", 0);
int i, size = av_count(av);
for (i = 0; i < size; i++) {
SV **elem = av_fetch(av, i, 0);
if (elem && SvOK(*elem)) {
if (i > 0) {
sv_catpv(zk, ",");
}
sv_catpv(zk, SvPV_nolen(*elem));
}
}
zk_address = SvPV_nolen(zk);
}
if (zk_address == NULL) {
memcached_free(arcus->proxy);
memcached_free(arcus->global);
croak("zk_address argument is invalid.");
}
ps = hv_fetchs(conf, "service_code", 0);
if (ps) {
SvGETMAGIC(*ps);
}
if (ps && SvOK(*ps)) {
service_code = SvPV_nolen(*ps);
}
if (service_code == NULL) {
memcached_free(arcus->proxy);
memcached_free(arcus->global);
croak("service_code argument is invalid.");
}
ps = hv_fetchs(conf, "max_thread", 0);
if (ps) {
SvGETMAGIC(*ps);
}
if (ps && SvOK(*ps)) {
if (SvIOK(*ps)) {
max_thread = SvIV(*ps);
}
}
if (max_thread < MIN_THREAD) {
croak("max_thread argument is invalid. it should be greater than or equal to %d.", MIN_THREAD);
}
arcus->max_thread = max_thread;
arcus_return_t ret = arcus_proxy_create(arcus->proxy, zk_address, service_code);
if (ret != ARCUS_SUCCESS) {
memcached_free(arcus->proxy);
memcached_free(arcus->global);
croak("failed to create the arcus proxy object: %d (%s)", ret, arcus_strerror(ret));
}
ps = hv_fetchs(conf, "connect_timeout", 0);
if (ps) {
SvGETMAGIC(*ps);
}
if (ps && SvOK(*ps)) {
if (SvIOK(*ps) || SvNOK(*ps)) {
memcached_behavior_set(arcus->global, MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT, SvNV(*ps) * 1000);
} else {
warn("connect_timeout argument is invalid. it is ignored.");
}
}
ps = hv_fetchs(conf, "io_timeout", 0);
if (ps) {
SvGETMAGIC(*ps);
}
if (ps && SvOK(*ps)) {
if (SvIOK(*ps) || SvNOK(*ps)) {
memcached_behavior_set(arcus->global, MEMCACHED_BEHAVIOR_POLL_TIMEOUT, (uint64_t) (SvNV(*ps) * 1000));
} else {
warn("io_timeout argument is invalid. it is ignored.");
}
}
ps = hv_fetchs(conf, "nowait", 0);
if (ps) {
SvGETMAGIC(*ps);
}
if (ps && SvOK(*ps)) {
memcached_behavior_set(arcus->global, MEMCACHED_BEHAVIOR_NOREPLY, SvTRUE(*ps));
}
ps = hv_fetchs(conf, "hash_namespace", 0);
if (ps) {
SvGETMAGIC(*ps);
}
if (ps && SvOK(*ps)) {
memcached_behavior_set(arcus->global, MEMCACHED_BEHAVIOR_HASH_WITH_PREFIX_KEY, SvTRUE(*ps));
}
ps = hv_fetchs(conf, "namespace", 0);
if (ps) {
SvGETMAGIC(*ps);
}
if (ps && SvOK(*ps)) {
STRLEN name_len;
char *namespace = SvPV(*ps, name_len);
if (namespace && name_len > 0) {
memcached_set_namespace(arcus->global, namespace, name_len);
} else if (!namespace && name_len == 0) {
warn("namespace argument is invalid. it is ignored.");
}
}
arcus->pool = memcached_pool_create(arcus->global, MIN_THREAD, arcus->max_thread);
if (arcus->pool == NULL) {
arcus_proxy_close(arcus->proxy);
memcached_free(arcus->proxy);
memcached_free(arcus->global);
croak("failed to create the memcached pool object.");
}
ret = arcus_proxy_connect(arcus->global, arcus->pool, arcus->proxy);
if (ret != ARCUS_SUCCESS) {
arcus_pool_close(arcus->pool);
memcached_pool_destroy(arcus->pool);
arcus_proxy_close(arcus->proxy);
memcached_free(arcus->proxy);
memcached_free(arcus->global);
croak("failed to connect to the proxy: %d (%s)", ret, arcus_strerror(ret));
}
}
MODULE = Arcus::Base PACKAGE = Arcus::Base
void
new(class, self)
char *class
SV *self
PPCODE:
Arcus_API *arcus = NULL;
if (SvROK(self) && SvTYPE(SvRV(self)) == SVt_PVHV) {
EXTEND(SP, 1);
Newx(arcus, 1, Arcus_API);
arcus->global = memcached_create(NULL);
if (arcus->global == NULL) {
croak("Failed to create the global memcached object");
}
arcus->proxy = memcached_create(NULL);
if (arcus->proxy == NULL) {
croak("Failed to create the proxy memcached object");
}
arcus->global = memcached_create(NULL);
arcus->cur_thread = 0;
arcus->main_proxy = true;
initialize_arcus_api(aTHX_ arcus, (HV *) SvRV(self));
SV* sv = newSV(0);
sv_setref_pv(sv, class, (void*)arcus);
mXPUSHs(sv);
} else {
arcus = (Arcus_API *)SvUV(self);
}
arcus->cur_thread++;
void
DESTROY(arcus)
Arcus_API *arcus
CODE:
arcus->cur_thread--;
if (!arcus->cur_thread) {
destroy_arcus_api(arcus);
}
void
connect_proxy(arcus)
Arcus_API *arcus
CODE:
arcus->cur_thread = 1;
arcus->main_proxy = false;
arcus->global = memcached_clone(NULL, arcus->global);
if (arcus->global == NULL) {
croak("Failed to create the global memcached object");
}
arcus->pool = memcached_pool_create(arcus->global, MIN_THREAD, arcus->max_thread);
if (arcus->pool == NULL) {
memcached_free(arcus->global);
arcus->global = NULL;
croak("Failed to create the memcached pool object");
}
arcus_return_t arcus_ret = arcus_proxy_connect(arcus->global, arcus->pool, arcus->proxy);
if (arcus_ret != ARCUS_SUCCESS) {
arcus->proxy = NULL;
destroy_arcus_api(arcus);
croak("Failed to connect : %d (%s)", arcus_ret, arcus_strerror(arcus_ret));
}
SV *
cas(arcus, key, cas, value, ...)
Arcus_API *arcus
SV *key
SV *cas
SV *value
PREINIT:
time_t exptime = 0;
int flags = 0;
int arg = 4;
CODE:
RETVAL = NULL;
memcached_return_t ret;
STRLEN key_length, value_length;
char *key_ptr = NULL;
uint64_t cas_value = 0;
char *value_ptr = NULL;
SV *sv;
memcached_st *mc = memcached_pool_pop(arcus->pool, true, &ret);
if (mc == NULL) {
warn("Failed to create the memcached object : %d (%s)", ret, memcached_strerror(NULL, ret));
goto do_return;
}
if (!SvOK(key) || (key_ptr = SvPV(key, key_length)) == NULL) {
warn("key argument is invalid.");
goto do_return;
}
if (!SvIOK(cas) || (cas_value = SvIV(cas)) == 0) {
warn("cas argument is invalid.");
goto do_return;
}
if (!SvOK(value) || (value_ptr = SvPV(value, value_length)) == NULL) {
warn("value argument is invalid.");
goto do_return;
}
if (items > arg && (sv = safe_sv(aTHX_ ST(arg++))) != NULL) {
if (!SvIOK(sv)) {
warn("exptime argument is invalid.");
goto do_return;
}
exptime = (time_t) SvIV(sv);
}
if (items > arg && (sv = safe_sv(aTHX_ ST(arg++))) != NULL) {
if (!SvIOK(sv)) {
warn("flags argument is invalid.");
( run in 0.877 second using v1.01-cache-2.11-cpan-39bf76dae61 )