Arcus-Client
view release on metacpan or search on metacpan
567891011121314151617181920212223242526#include "ppport.h"
#include "libmemcached/memcached.h"
#define MIN_THREAD 1
typedef struct arcus_st {
memcached_st
*proxy
;
memcached_st
*global
;
memcached_pool_st
*pool
;
int
max_thread;
int
cur_thread;
bool main_proxy;
} Arcus_API;
enum arcus_op {
ARCUS_SET,
ARCUS_CAS,
ARCUS_ADD,
ARCUS_REPLACE,
ARCUS_APPEND,
ARCUS_PREPEND,
7677787980818283848586878889909192939495
arcus_proxy_close(arcus->proxy);
memcached_free(arcus->proxy);
arcus->proxy = NULL;
}
}
static void initialize_arcus_api(pTHX_ Arcus_API
*arcus
, HV
*conf
)
{
char
*zk_address
= NULL;
char
*service_code
= NULL;
int
max_thread = -1;
SV *
*ps
= hv_fetchs(conf,
"zk_address"
, 0);
if
(ps) {
SvGETMAGIC(
*ps
);
}
if
(ps && SvOK(
*ps
)) {
if
(!SvROK(
*ps
) || SvTYPE(SvRV(
*ps
)) != SVt_PVAV) {
croak(
"zk_address argument is not array reference."
);
}
122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154}
if
(ps && SvOK(
*ps
)) {
service_code = SvPV_nolen(
*ps
);
}
if
(service_code == NULL) {
memcached_free(arcus->proxy);
memcached_free(arcus->global);
croak(
"service_code argument is invalid."
);
}
ps = hv_fetchs(conf,
"max_thread"
, 0);
if
(ps) {
SvGETMAGIC(
*ps
);
}
if
(ps && SvOK(
*ps
)) {
if
(SvIOK(
*ps
)) {
max_thread = SvIV(
*ps
);
}
}
if
(max_thread < MIN_THREAD) {
croak(
"max_thread argument is invalid. it should be greater than or equal to %d."
, MIN_THREAD);
}
arcus->max_thread = max_thread;
arcus_return_t ret = arcus_proxy_create(arcus->proxy, zk_address, service_code);
if
(ret != ARCUS_SUCCESS) {
memcached_free(arcus->proxy);
memcached_free(arcus->global);
croak(
"failed to create the arcus proxy object: %d (%s)"
, ret, arcus_strerror(ret));
}
ps = hv_fetchs(conf,
"connect_timeout"
, 0);
if
(ps) {
198199200201202203204205206207208209210211212213214215216217218
STRLEN name_len;
char
*namespace
= SvPV(
*ps
, name_len);
if
(namespace && name_len > 0) {
memcached_set_namespace(arcus->global, namespace, name_len);
}
else
if
(!namespace && name_len == 0) {
warn
(
"namespace argument is invalid. it is ignored."
);
}
}
arcus->pool = memcached_pool_create(arcus->global, MIN_THREAD, arcus->max_thread);
if
(arcus->pool == NULL) {
arcus_proxy_close(arcus->proxy);
memcached_free(arcus->proxy);
memcached_free(arcus->global);
croak(
"failed to create the memcached pool object."
);
}
ret = arcus_proxy_connect(arcus->global, arcus->pool, arcus->proxy);
if
(ret != ARCUS_SUCCESS) {
arcus_pool_close(arcus->pool);
237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
Newx(arcus, 1, Arcus_API);
arcus->global = memcached_create(NULL);
if
(arcus->global == NULL) {
croak(
"Failed to create the global memcached object"
);
}
arcus->proxy = memcached_create(NULL);
if
(arcus->proxy == NULL) {
croak(
"Failed to create the proxy memcached object"
);
}
arcus->global = memcached_create(NULL);
arcus->cur_thread = 0;
arcus->main_proxy = true;
initialize_arcus_api(aTHX_ arcus, (HV *) SvRV(self));
SV* sv = newSV(0);
sv_setref_pv(sv, class, (void*)arcus);
mXPUSHs(sv);
}
else
{
arcus = (Arcus_API *)SvUV(self);
}
arcus->cur_thread++;
void
DESTROY(arcus)
Arcus_API
*arcus
CODE:
arcus->cur_thread--;
if
(!arcus->cur_thread) {
destroy_arcus_api(arcus);
}
void
connect_proxy(arcus)
Arcus_API
*arcus
CODE:
arcus->cur_thread = 1;
arcus->main_proxy = false;
arcus->global = memcached_clone(NULL, arcus->global);
if
(arcus->global == NULL) {
croak(
"Failed to create the global memcached object"
);
}
arcus->pool = memcached_pool_create(arcus->global, MIN_THREAD, arcus->max_thread);
if
(arcus->pool == NULL) {
memcached_free(arcus->global);
arcus->global = NULL;
croak(
"Failed to create the memcached pool object"
);
}
arcus_return_t arcus_ret = arcus_proxy_connect(arcus->global, arcus->pool, arcus->proxy);
if
(arcus_ret != ARCUS_SUCCESS) {
arcus->proxy = NULL;
destroy_arcus_api(arcus);
croak(
"Failed to connect : %d (%s)"
, arcus_ret, arcus_strerror(arcus_ret));
101112131415161718192021222324252627282930t/arithmetic.t
t/big-value.t
t/commands.t
t/
delete
.t
t/
fork
.t
t/get.t
t/magic.t
t/multi-op.t
t/serialize.t
t/store.t
t/thread.t
Base.xs
Changes
cpanfile
Makefile.PL
MANIFEST
ppport.h
README.md
typemap
META.yml Module YAML meta-data (added by MakeMaker)
lib/Arcus/Client.pm view on Meta::CPAN
737475767778798081828384858687888990919293
warn
"failed to deserialize."
if
($@);
}
}
return
(
$value
);
};
sub
new {
my
(
$class
,
$args
) =
@_
;
$args
->{connect_timeout} //= 1.0;
# second
$args
->{io_timeout} //= 0.8;
# second
$args
->{max_thread} //= 64;
$args
->{nowait} = 0;
# TODO: Fix a C Client's noreply flags (issue: #3)
$args
->{hash_namespace} = 1;
$args
->{namespace} //=
""
;
$args
->{serialize_methods} //= [ \
&Storable::nfreeze
, \
&Storable::thaw
];
$args
->{compress_threshold} //= -1;
$args
->{compress_ratio} //= 0.8;
$args
->{compress_methods} //= [ \
&Compress::Zlib::memGzip
,
\
&Compress::Zlib::memGunzip
]
if
$HAVE_ZLIB
;
my
$arcus
=
$class
->SUPER::new(
$args
);
bless
(
$arcus
,
$class
);
571257135714571557165717571857195720572157225723572457255726572757285729573057315732NO_TAINT_SUPPORT|5.017006||Viu
not_a_number|5.005000||Viu
NOTE3|5.027001||Viu
NOTHING|5.003007||Viu
NOTHING_t8|5.035004||Viu
NOTHING_t8_p8|5.033003||Viu
NOTHING_t8_pb|5.033003||Viu
NOTHING_tb|5.035004||Viu
NOTHING_tb_p8|5.033003||Viu
NOTHING_tb_pb|5.033003||Viu
nothreadhook|5.008000|5.008000|
notify_parser_that_changed_to_utf8|5.025010||Viu
not_incrementable|5.021002||Viu
NOT_IN_PAD|5.005000||Viu
NOT_REACHED|5.019006|5.003007|poVnu
NPOSIXA|5.017003||Viu
NPOSIXA_t8|5.035004||Viu
NPOSIXA_t8_p8|5.033003||Viu
NPOSIXA_t8_pb|5.033003||Viu
NPOSIXA_tb|5.035004||Viu
NPOSIXA_tb_p8|5.033003||Viu
6868686968706871687268736874687568766877687868796880688168826883688468856886688768886889PerlProc_popen_list|5.007001||Viu
PerlProc_setgid|5.005000||Viu
PerlProc_setjmp|5.005000||Viu
PerlProc_setuid|5.005000||Viu
PerlProc_signal|5.005000||Viu
PerlProc_sleep|5.005000||Viu
PerlProc_spawnvp|5.008000||Viu
PerlProc_times|5.005000||Viu
PerlProc_wait|5.005000||Viu
PerlProc_waitpid|5.005000||Viu
perl_pthread_mutex_lock|5.023006||Viu
perl_pthread_mutex_unlock|5.023006||Viu
PERL_PV_ESCAPE_ALL|5.009004|5.003007|p
PERL_PV_ESCAPE_DWIM|5.019008||Viu
PERL_PV_ESCAPE_DWIM_ALL_HEX|||Viu
PERL_PV_ESCAPE_FIRSTCHAR|5.009004|5.003007|p
PERL_PV_ESCAPE_NOBACKSLASH|5.009004|5.003007|p
PERL_PV_ESCAPE_NOCLEAR|5.009004|5.003007|p
PERL_PV_ESCAPE_NONASCII|5.013009|5.013009|
PERL_PV_ESCAPE_QUOTE|5.009004|5.003007|p
PERL_PV_ESCAPE_RE|5.009005|5.003007|p
PERL_PV_ESCAPE_UNI|5.009004|5.003007|p
752075217522752375247525752675277528752975307531753275337534753575367537753875397540PL_sv_no|5.004005|5.003007|p
PL_sv_root|5.005000||Viu
PL_sv_serial|5.010001||Viu
PL_sv_undef|5.004005|5.003007|p
PL_sv_yes|5.004005|5.003007|p
PL_sv_zero|5.027003|5.027003|
PL_sys_intern|5.005000||Viu
PL_tainted|5.005000|5.003007|poVnu
PL_tainting|5.005000|5.003007|poVnu
PL_taint_warn|5.007003||Viu
PL_threadhook|5.008000||Viu
PL_tmps_floor|5.005000||Viu
PL_tmps_ix|5.005000||Viu
PL_tmps_max|5.005000||Viu
PL_tmps_stack|5.005000||Viu
PL_tokenbuf||5.003007|ponu
PL_top_env|5.005000||Viu
PL_toptarget|5.005000||Viu
PL_TR_SPECIAL_HANDLING_UTF8|5.031006||Viu
PL_underlying_numeric_obj|5.027009||Viu
PL_unicode|5.008001||Viu
77517752775377547755775677577758775977607761776277637764776577667767776877697770777177727773777477757776777777787779778077817782778377847785PRUNE_tb|5.035004||Viu
PRUNE_tb_p8|5.033003||Viu
PRUNE_tb_pb|5.033003||Viu
PSEUDO|5.009004||Viu
PSEUDO_t8|5.035004||Viu
PSEUDO_t8_p8|5.033003||Viu
PSEUDO_t8_pb|5.033003||Viu
PSEUDO_tb|5.035004||Viu
PSEUDO_tb_p8|5.033003||Viu
PSEUDO_tb_pb|5.033003||Viu
pthread_addr_t|5.005000||Viu
PTHREAD_ATFORK|5.007002||Viu
pthread_attr_init|5.006000||Viu
PTHREAD_ATTR_SETDETACHSTATE|5.006000||Viu
pthread_condattr_default|5.005000||Viu
PTHREAD_CREATE|5.006000||Viu
pthread_create|5.008001||Viu
PTHREAD_CREATE_JOINABLE|5.005000||Viu
PTHREAD_GETSPECIFIC|5.007002||Viu
PTHREAD_GETSPECIFIC_INT|5.006000||Viu
pthread_key_create|5.005000||Viu
pthread_keycreate|5.008001||Viu
pthread_mutexattr_default|5.005000||Viu
pthread_mutexattr_init|5.005000||Viu
pthread_mutexattr_settype|5.005000||Viu
pTHX_12|5.019010||Viu
pTHX_1|5.006000||Viu
pTHX_2|5.006000||Viu
pTHX_3|5.006000||Viu
pTHX_4|5.006000||Viu
pTHX|5.006000|5.003007|p
pTHX_5|5.009003||Viu
pTHX_6|5.009003||Viu
pTHX_7|5.009003||Viu
pTHX_8|5.009003||Viu
9675967696779678967996809681968296839684968596869687968896899690969196929693969496959696TARGi|5.023005||Viu
TARGn|5.023005||Viu
TARGu|5.023005||Viu
telldir
|5.005000||Viu
T_FMT|5.027010||Viu
T_FMT_AMPM|5.027010||Viu
THIS|5.003007|5.003007|V
THOUSEP|5.027010||Viu
THR|5.005000||Viu
THREAD_CREATE_NEEDS_STACK|5.007002||Viu
thread_locale_init|5.027009|5.027009|xnu
thread_locale_term|5.027009|5.027009|xnu
THREAD_RET_TYPE|5.005000||Viu
tied_method|5.013009||vViu
TIED_METHOD_ARGUMENTS_ON_STACK|5.013009||Viu
TIED_METHOD_MORTALIZE_NOT_NEEDED|5.013009||Viu
TIED_METHOD_SAY|5.013009||Viu
times
|5.005000||Viu
Time_t|5.003007|5.003007|Vn
Timeval|5.004000|5.004000|Vn
TM|5.011000||Viu
tmpfile|5.003007||Viu
117091171011711117121171311714117151171611717117181171911720117211172211723117241172511726117271172811729117301173111732117331173411735117361173711738117391174011741117421174311744117451174611747117481174911750117511175211753117541175511756#endif
#ifndef dTHR
# define dTHR dNOOP
#endif
#ifndef dTHX
# define dTHX dNOOP
#endif
/* Hint: dTHX
ppport.h */
#ifndef dTHXa
# define dTHXa(x) dNOOP
#endif
#ifndef pTHX
# define pTHX void
#endif
#ifndef pTHX_
# define pTHX_
#endif
#ifndef aTHX
# define aTHX
#endif
/* Hint: aTHX
ppport.h */
#ifndef aTHX_
# define aTHX_
#endif
/* Hint: aTHX_
through ppport.h */
#if (PERL_BCDVERSION < 0x5006000)
# ifdef USE_THREADS
# define aTHXR thr
# define aTHXR_ thr,
# else
# define aTHXR
# define aTHXR_
# endif
121261212712128121291213012131121321213312134121351213612137121381213912140121411214212143121441214512146U32 DPPP_(my_PL_signals) = D_PPP_PERL_SIGNALS_INIT;
#else
extern U32 DPPP_(my_PL_signals);
#endif
#define PL_signals DPPP_(my_PL_signals)
#endif
/* Hint: PL_ppaddr
* Calling an op via PL_ppaddr requires passing a context argument
*
for
threaded builds. Since the context argument is different
for
* automatically be
defined
as the correct argument.
*/
#if (PERL_BCDVERSION <= 0x5005005)
/* Replace: 1 */
# define PL_ppaddr ppaddr
# define PL_no_modify no_modify
/* Replace: 0 */
#endif
156711567215673156741567515676156771567815679156801568115682156831568415685156861568715688156891569015691#if defined(PERL_USE_GCC_BRACE_GROUPS)
# define newRV_noinc(sv) ({ SV *_sv = (SV *)newRV((sv)); SvREFCNT_dec((sv)); _sv; })
#else
# define newRV_noinc(sv) ((PL_Sv = (SV *)newRV((sv))), SvREFCNT_dec((sv)), PL_Sv)
#endif
#endif
/*
* Boilerplate macros
for
initializing and accessing interpreter-
local
* data from C. All statics in extensions should be reworked to
use
* this,
if
you want to make the extension thread-safe. See ext/re/re.xs
*
* Code that uses these macros is responsible
for
the following:
* 1.
#define MY_CXT_KEY to a unique string, e.g. "DynaLoader_guts"
* 2. Declare a typedef named my_cxt_t that is a structure that contains
* all the data that needs to be interpreter-
local
.
* 3. Use the START_MY_CXT macro
after
the declaration of my_cxt_t.
* 4. Use the MY_CXT_INIT macro such that it is called exactly once
* (typically put in the BOOT: section).
* 5. Use the members of the my_cxt_t structure everywhere as
156931569415695156961569715698156991570015701157021570315704157051570615707157081570915710157111571215713* 6. Use the dMY_CXT macro (a declaration) in all the functions that
* access MY_CXT.
*/
#if defined(MULTIPLICITY) || defined(PERL_OBJECT) || \
defined
(PERL_CAPI) ||
defined
(PERL_IMPLICIT_CONTEXT)
#ifndef START_MY_CXT
/* This must appear in all extensions that define a my_cxt_t structure,
* right
after
the definition (i.e. at file scope). The non-threads
* case below uses it to declare the data as static. */
#define START_MY_CXT
#if (PERL_BCDVERSION < 0x5004068)
/* Fetches the SV that keeps the per-interpreter data. */
#define dMY_CXT_SV \
SV
*my_cxt_sv
= get_sv(MY_CXT_KEY, FALSE)
#else /* >= perl5.004_68 */
#define dMY_CXT_SV \
SV
*my_cxt_sv
=
*hv_fetch
(PL_modglobal, MY_CXT_KEY, \
167571675816759167601676116762167631676416765167661676716768167691677016771167721677316774167751677616777
if
(
*sp
+ len <=
send
&& memEQ(
*sp
, radix, len)) {
*sp
+= len;
return
TRUE;
}
}
#else
/* older perls don't have PL_numeric_radix_sv so the radix
* must manually be requested from locale.h
*/
#include <locale.h>
dTHR; /* needed
for
older threaded perls */
struct lconv
*lc
= localeconv();
char
*radix
=
lc
->decimal_point;
if
(radix && IN_LOCALE) {
STRLEN len = strlen(radix);
if
(
*sp
+ len <=
send
&& memEQ(
*sp
, radix, len)) {
*sp
+= len;
return
TRUE;
}
}
#endif
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172#!/bin/perl
use
strict;
use
warnings;
use
Test::More;
use
Arcus::Client;
use
threads;
use
threads::shared;
use
FindBin;
use
ArcusTestCommon;
if
(not ArcusTestCommon->is_zk_port_opened()) {
plan
skip_all
=>
"zk is not running..."
;
}
open
(STDERR,
'>'
,
'/dev/null'
);
my
$cache
= ArcusTestCommon->create_client(
"perl-test:"
);
unless
(ok(
$cache
,
"Check Arcus Client Is Created Appropriately"
)) {
plan
skip_all
=>
"arcus client is not created appropriately..."
;
};
sub
cache_worker {
my
(
$thread_id
) =
@_
;
my
$key
=
"key_$thread_id"
;
my
$value
=
"value_from_thread_$thread_id"
;
if
(
$cache
->set(
$key
,
$value
) and (
$cache
->get(
$key
) eq
$value
)) {
return
1;
}
return
0;
}
my
@threads
;
for
my
$i
(1..5) {
push
@threads
, threads->create(\
&cache_worker
,
$i
);
}
for
my
$i
(0 ..
$#threads
) {
my
$ret
=
$threads
[
$i
]->
join
();
ok(
$ret
,
"Thread $i Test"
);
}
for
my
$i
(1..5) {
my
$key
=
"key_$i"
;
my
$value
=
"value_from_thread_$i"
;
is(
$cache
->get(
$key
),
$value
,
"Main: get $key"
);
}
sub
update_worker {
my
(
$thread_id
) =
@_
;
$cache
->set(
"key"
,
$thread_id
);
return
1;
}
my
@update_threads
;
my
$num_threads
= 5;
for
my
$i
(1..
$num_threads
) {
push
@update_threads
, threads->create(\
&update_worker
,
$i
);
}
for
my
$i
(0 ..
$#update_threads
) {
my
$ret
=
$update_threads
[
$i
]->
join
();
ok(
$ret
,
"Thread $i Test"
);
}
ok(
grep
{
$_
eq
$cache
->get(
"key"
) } (1..
$num_threads
),
"value is one of the expected values"
);
ok(
$cache
->flush_all,
"Flush All"
);
done_testing();
( run in 0.308 second using v1.01-cache-2.11-cpan-05444aca049 )