Arcus-Client
view release on metacpan or search on metacpan
#include "ppport.h"
#include "libmemcached/memcached.h"
#define MIN_THREAD 1
typedef struct arcus_st {
memcached_st *proxy;
memcached_st *global;
memcached_pool_st *pool;
int max_thread;
int cur_thread;
bool main_proxy;
} Arcus_API;
enum arcus_op {
ARCUS_SET,
ARCUS_CAS,
ARCUS_ADD,
ARCUS_REPLACE,
ARCUS_APPEND,
ARCUS_PREPEND,
arcus_proxy_close(arcus->proxy);
memcached_free(arcus->proxy);
arcus->proxy = NULL;
}
}
static void initialize_arcus_api(pTHX_ Arcus_API *arcus, HV *conf)
{
char *zk_address = NULL;
char *service_code = NULL;
int max_thread = -1;
SV **ps = hv_fetchs(conf, "zk_address", 0);
if (ps) {
SvGETMAGIC(*ps);
}
if (ps && SvOK(*ps)) {
if (!SvROK(*ps) || SvTYPE(SvRV(*ps)) != SVt_PVAV) {
croak("zk_address argument is not array reference.");
}
}
if (ps && SvOK(*ps)) {
service_code = SvPV_nolen(*ps);
}
if (service_code == NULL) {
memcached_free(arcus->proxy);
memcached_free(arcus->global);
croak("service_code argument is invalid.");
}
ps = hv_fetchs(conf, "max_thread", 0);
if (ps) {
SvGETMAGIC(*ps);
}
if (ps && SvOK(*ps)) {
if (SvIOK(*ps)) {
max_thread = SvIV(*ps);
}
}
if (max_thread < MIN_THREAD) {
croak("max_thread argument is invalid. it should be greater than or equal to %d.", MIN_THREAD);
}
arcus->max_thread = max_thread;
arcus_return_t ret = arcus_proxy_create(arcus->proxy, zk_address, service_code);
if (ret != ARCUS_SUCCESS) {
memcached_free(arcus->proxy);
memcached_free(arcus->global);
croak("failed to create the arcus proxy object: %d (%s)", ret, arcus_strerror(ret));
}
ps = hv_fetchs(conf, "connect_timeout", 0);
if (ps) {
STRLEN name_len;
char *namespace = SvPV(*ps, name_len);
if (namespace && name_len > 0) {
memcached_set_namespace(arcus->global, namespace, name_len);
} else if (!namespace && name_len == 0) {
warn("namespace argument is invalid. it is ignored.");
}
}
arcus->pool = memcached_pool_create(arcus->global, MIN_THREAD, arcus->max_thread);
if (arcus->pool == NULL) {
arcus_proxy_close(arcus->proxy);
memcached_free(arcus->proxy);
memcached_free(arcus->global);
croak("failed to create the memcached pool object.");
}
ret = arcus_proxy_connect(arcus->global, arcus->pool, arcus->proxy);
if (ret != ARCUS_SUCCESS) {
arcus_pool_close(arcus->pool);
Newx(arcus, 1, Arcus_API);
arcus->global = memcached_create(NULL);
if (arcus->global == NULL) {
croak("Failed to create the global memcached object");
}
arcus->proxy = memcached_create(NULL);
if (arcus->proxy == NULL) {
croak("Failed to create the proxy memcached object");
}
arcus->global = memcached_create(NULL);
arcus->cur_thread = 0;
arcus->main_proxy = true;
initialize_arcus_api(aTHX_ arcus, (HV *) SvRV(self));
SV* sv = newSV(0);
sv_setref_pv(sv, class, (void*)arcus);
mXPUSHs(sv);
} else {
arcus = (Arcus_API *)SvUV(self);
}
arcus->cur_thread++;
void
DESTROY(arcus)
Arcus_API *arcus
CODE:
arcus->cur_thread--;
if (!arcus->cur_thread) {
destroy_arcus_api(arcus);
}
void
connect_proxy(arcus)
Arcus_API *arcus
CODE:
arcus->cur_thread = 1;
arcus->main_proxy = false;
arcus->global = memcached_clone(NULL, arcus->global);
if (arcus->global == NULL) {
croak("Failed to create the global memcached object");
}
arcus->pool = memcached_pool_create(arcus->global, MIN_THREAD, arcus->max_thread);
if (arcus->pool == NULL) {
memcached_free(arcus->global);
arcus->global = NULL;
croak("Failed to create the memcached pool object");
}
arcus_return_t arcus_ret = arcus_proxy_connect(arcus->global, arcus->pool, arcus->proxy);
if (arcus_ret != ARCUS_SUCCESS) {
arcus->proxy = NULL;
destroy_arcus_api(arcus);
croak("Failed to connect : %d (%s)", arcus_ret, arcus_strerror(arcus_ret));
t/arithmetic.t
t/big-value.t
t/commands.t
t/delete.t
t/fork.t
t/get.t
t/magic.t
t/multi-op.t
t/serialize.t
t/store.t
t/thread.t
Base.xs
Changes
cpanfile
Makefile.PL
MANIFEST
ppport.h
README.md
typemap
META.yml Module YAML meta-data (added by MakeMaker)
lib/Arcus/Client.pm view on Meta::CPAN
warn "failed to deserialize." if ($@);
}
}
return ($value);
};
sub new {
my ($class, $args) = @_;
$args->{connect_timeout} //= 1.0; # second
$args->{io_timeout} //= 0.8; # second
$args->{max_thread} //= 64;
$args->{nowait} = 0; # TODO: Fix a C Client's noreply flags (issue: #3)
$args->{hash_namespace} = 1;
$args->{namespace} //= "";
$args->{serialize_methods} //= [ \&Storable::nfreeze, \&Storable::thaw ];
$args->{compress_threshold} //= -1;
$args->{compress_ratio} //= 0.8;
$args->{compress_methods} //= [ \&Compress::Zlib::memGzip,
\&Compress::Zlib::memGunzip ] if $HAVE_ZLIB;
my $arcus = $class->SUPER::new($args);
bless($arcus, $class);
NO_TAINT_SUPPORT|5.017006||Viu
not_a_number|5.005000||Viu
NOTE3|5.027001||Viu
NOTHING|5.003007||Viu
NOTHING_t8|5.035004||Viu
NOTHING_t8_p8|5.033003||Viu
NOTHING_t8_pb|5.033003||Viu
NOTHING_tb|5.035004||Viu
NOTHING_tb_p8|5.033003||Viu
NOTHING_tb_pb|5.033003||Viu
nothreadhook|5.008000|5.008000|
notify_parser_that_changed_to_utf8|5.025010||Viu
not_incrementable|5.021002||Viu
NOT_IN_PAD|5.005000||Viu
NOT_REACHED|5.019006|5.003007|poVnu
NPOSIXA|5.017003||Viu
NPOSIXA_t8|5.035004||Viu
NPOSIXA_t8_p8|5.033003||Viu
NPOSIXA_t8_pb|5.033003||Viu
NPOSIXA_tb|5.035004||Viu
NPOSIXA_tb_p8|5.033003||Viu
PerlProc_popen_list|5.007001||Viu
PerlProc_setgid|5.005000||Viu
PerlProc_setjmp|5.005000||Viu
PerlProc_setuid|5.005000||Viu
PerlProc_signal|5.005000||Viu
PerlProc_sleep|5.005000||Viu
PerlProc_spawnvp|5.008000||Viu
PerlProc_times|5.005000||Viu
PerlProc_wait|5.005000||Viu
PerlProc_waitpid|5.005000||Viu
perl_pthread_mutex_lock|5.023006||Viu
perl_pthread_mutex_unlock|5.023006||Viu
PERL_PV_ESCAPE_ALL|5.009004|5.003007|p
PERL_PV_ESCAPE_DWIM|5.019008||Viu
PERL_PV_ESCAPE_DWIM_ALL_HEX|||Viu
PERL_PV_ESCAPE_FIRSTCHAR|5.009004|5.003007|p
PERL_PV_ESCAPE_NOBACKSLASH|5.009004|5.003007|p
PERL_PV_ESCAPE_NOCLEAR|5.009004|5.003007|p
PERL_PV_ESCAPE_NONASCII|5.013009|5.013009|
PERL_PV_ESCAPE_QUOTE|5.009004|5.003007|p
PERL_PV_ESCAPE_RE|5.009005|5.003007|p
PERL_PV_ESCAPE_UNI|5.009004|5.003007|p
PL_sv_no|5.004005|5.003007|p
PL_sv_root|5.005000||Viu
PL_sv_serial|5.010001||Viu
PL_sv_undef|5.004005|5.003007|p
PL_sv_yes|5.004005|5.003007|p
PL_sv_zero|5.027003|5.027003|
PL_sys_intern|5.005000||Viu
PL_tainted|5.005000|5.003007|poVnu
PL_tainting|5.005000|5.003007|poVnu
PL_taint_warn|5.007003||Viu
PL_threadhook|5.008000||Viu
PL_tmps_floor|5.005000||Viu
PL_tmps_ix|5.005000||Viu
PL_tmps_max|5.005000||Viu
PL_tmps_stack|5.005000||Viu
PL_tokenbuf||5.003007|ponu
PL_top_env|5.005000||Viu
PL_toptarget|5.005000||Viu
PL_TR_SPECIAL_HANDLING_UTF8|5.031006||Viu
PL_underlying_numeric_obj|5.027009||Viu
PL_unicode|5.008001||Viu
PRUNE_tb|5.035004||Viu
PRUNE_tb_p8|5.033003||Viu
PRUNE_tb_pb|5.033003||Viu
PSEUDO|5.009004||Viu
PSEUDO_t8|5.035004||Viu
PSEUDO_t8_p8|5.033003||Viu
PSEUDO_t8_pb|5.033003||Viu
PSEUDO_tb|5.035004||Viu
PSEUDO_tb_p8|5.033003||Viu
PSEUDO_tb_pb|5.033003||Viu
pthread_addr_t|5.005000||Viu
PTHREAD_ATFORK|5.007002||Viu
pthread_attr_init|5.006000||Viu
PTHREAD_ATTR_SETDETACHSTATE|5.006000||Viu
pthread_condattr_default|5.005000||Viu
PTHREAD_CREATE|5.006000||Viu
pthread_create|5.008001||Viu
PTHREAD_CREATE_JOINABLE|5.005000||Viu
PTHREAD_GETSPECIFIC|5.007002||Viu
PTHREAD_GETSPECIFIC_INT|5.006000||Viu
pthread_key_create|5.005000||Viu
pthread_keycreate|5.008001||Viu
pthread_mutexattr_default|5.005000||Viu
pthread_mutexattr_init|5.005000||Viu
pthread_mutexattr_settype|5.005000||Viu
pTHX_12|5.019010||Viu
pTHX_1|5.006000||Viu
pTHX_2|5.006000||Viu
pTHX_3|5.006000||Viu
pTHX_4|5.006000||Viu
pTHX|5.006000|5.003007|p
pTHX_5|5.009003||Viu
pTHX_6|5.009003||Viu
pTHX_7|5.009003||Viu
pTHX_8|5.009003||Viu
TARGi|5.023005||Viu
TARGn|5.023005||Viu
TARGu|5.023005||Viu
telldir|5.005000||Viu
T_FMT|5.027010||Viu
T_FMT_AMPM|5.027010||Viu
THIS|5.003007|5.003007|V
THOUSEP|5.027010||Viu
THR|5.005000||Viu
THREAD_CREATE_NEEDS_STACK|5.007002||Viu
thread_locale_init|5.027009|5.027009|xnu
thread_locale_term|5.027009|5.027009|xnu
THREAD_RET_TYPE|5.005000||Viu
tied_method|5.013009||vViu
TIED_METHOD_ARGUMENTS_ON_STACK|5.013009||Viu
TIED_METHOD_MORTALIZE_NOT_NEEDED|5.013009||Viu
TIED_METHOD_SAY|5.013009||Viu
times|5.005000||Viu
Time_t|5.003007|5.003007|Vn
Timeval|5.004000|5.004000|Vn
TM|5.011000||Viu
tmpfile|5.003007||Viu
#endif
#ifndef dTHR
# define dTHR dNOOP
#endif
#ifndef dTHX
# define dTHX dNOOP
#endif
/* Hint: dTHX
For pre-5.6.0 thread compatibility, instead use dTHXR, available only through
ppport.h */
#ifndef dTHXa
# define dTHXa(x) dNOOP
#endif
#ifndef pTHX
# define pTHX void
#endif
#ifndef pTHX_
# define pTHX_
#endif
#ifndef aTHX
# define aTHX
#endif
/* Hint: aTHX
For pre-5.6.0 thread compatibility, instead use aTHXR, available only through
ppport.h */
#ifndef aTHX_
# define aTHX_
#endif
/* Hint: aTHX_
For pre-5.6.0 thread compatibility, instead use aTHXR_, available only
through ppport.h */
#if (PERL_BCDVERSION < 0x5006000)
# ifdef USE_THREADS
# define aTHXR thr
# define aTHXR_ thr,
# else
# define aTHXR
# define aTHXR_
# endif
U32 DPPP_(my_PL_signals) = D_PPP_PERL_SIGNALS_INIT;
#else
extern U32 DPPP_(my_PL_signals);
#endif
#define PL_signals DPPP_(my_PL_signals)
#endif
/* Hint: PL_ppaddr
* Calling an op via PL_ppaddr requires passing a context argument
* for threaded builds. Since the context argument is different for
* 5.005 perls, you can use aTHXR (supplied by ppport.h), which will
* automatically be defined as the correct argument.
*/
#if (PERL_BCDVERSION <= 0x5005005)
/* Replace: 1 */
# define PL_ppaddr ppaddr
# define PL_no_modify no_modify
/* Replace: 0 */
#endif
#if defined(PERL_USE_GCC_BRACE_GROUPS)
# define newRV_noinc(sv) ({ SV *_sv = (SV *)newRV((sv)); SvREFCNT_dec((sv)); _sv; })
#else
# define newRV_noinc(sv) ((PL_Sv = (SV *)newRV((sv))), SvREFCNT_dec((sv)), PL_Sv)
#endif
#endif
/*
* Boilerplate macros for initializing and accessing interpreter-local
* data from C. All statics in extensions should be reworked to use
* this, if you want to make the extension thread-safe. See ext/re/re.xs
* for an example of the use of these macros.
*
* Code that uses these macros is responsible for the following:
* 1. #define MY_CXT_KEY to a unique string, e.g. "DynaLoader_guts"
* 2. Declare a typedef named my_cxt_t that is a structure that contains
* all the data that needs to be interpreter-local.
* 3. Use the START_MY_CXT macro after the declaration of my_cxt_t.
* 4. Use the MY_CXT_INIT macro such that it is called exactly once
* (typically put in the BOOT: section).
* 5. Use the members of the my_cxt_t structure everywhere as
* 6. Use the dMY_CXT macro (a declaration) in all the functions that
* access MY_CXT.
*/
#if defined(MULTIPLICITY) || defined(PERL_OBJECT) || \
defined(PERL_CAPI) || defined(PERL_IMPLICIT_CONTEXT)
#ifndef START_MY_CXT
/* This must appear in all extensions that define a my_cxt_t structure,
* right after the definition (i.e. at file scope). The non-threads
* case below uses it to declare the data as static. */
#define START_MY_CXT
#if (PERL_BCDVERSION < 0x5004068)
/* Fetches the SV that keeps the per-interpreter data. */
#define dMY_CXT_SV \
SV *my_cxt_sv = get_sv(MY_CXT_KEY, FALSE)
#else /* >= perl5.004_68 */
#define dMY_CXT_SV \
SV *my_cxt_sv = *hv_fetch(PL_modglobal, MY_CXT_KEY, \
if (*sp + len <= send && memEQ(*sp, radix, len)) {
*sp += len;
return TRUE;
}
}
#else
/* older perls don't have PL_numeric_radix_sv so the radix
* must manually be requested from locale.h
*/
#include <locale.h>
dTHR; /* needed for older threaded perls */
struct lconv *lc = localeconv();
char *radix = lc->decimal_point;
if (radix && IN_LOCALE) {
STRLEN len = strlen(radix);
if (*sp + len <= send && memEQ(*sp, radix, len)) {
*sp += len;
return TRUE;
}
}
#endif
#!/bin/perl
use strict;
use warnings;
use Test::More;
use Arcus::Client;
use threads;
use threads::shared;
use FindBin;
use lib "$FindBin::Bin";
use ArcusTestCommon;
if (not ArcusTestCommon->is_zk_port_opened()) {
plan skip_all => "zk is not running...";
}
open(STDERR, '>', '/dev/null');
my $cache = ArcusTestCommon->create_client("perl-test:");
unless (ok($cache, "Check Arcus Client Is Created Appropriately")) {
plan skip_all => "arcus client is not created appropriately...";
};
sub cache_worker {
my ($thread_id) = @_;
my $key = "key_$thread_id";
my $value = "value_from_thread_$thread_id";
if ($cache->set($key, $value) and ($cache->get($key) eq $value)) {
return 1;
}
return 0;
}
my @threads;
for my $i (1..5) {
push @threads, threads->create(\&cache_worker, $i);
}
for my $i (0 .. $#threads) {
my $ret = $threads[$i]->join();
ok($ret, "Thread $i Test");
}
for my $i (1..5) {
my $key = "key_$i";
my $value = "value_from_thread_$i";
is($cache->get($key), $value, "Main: get $key");
}
sub update_worker {
my ($thread_id) = @_;
$cache->set("key", $thread_id);
return 1;
}
my @update_threads;
my $num_threads = 5;
for my $i (1..$num_threads) {
push @update_threads, threads->create(\&update_worker, $i);
}
for my $i (0 .. $#update_threads) {
my $ret = $update_threads[$i]->join();
ok($ret, "Thread $i Test");
}
ok(grep { $_ eq $cache->get("key") } (1..$num_threads), "value is one of the expected values");
ok($cache->flush_all, "Flush All");
done_testing();
( run in 0.590 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )