Arcus-Client

 view release on metacpan or  search on metacpan

Base.xs  view on Meta::CPAN


#include "ppport.h"
#include "libmemcached/memcached.h"

#define MIN_THREAD 1

typedef struct arcus_st {
  memcached_st *proxy;
  memcached_st *global;
  memcached_pool_st *pool;
  int max_thread;
  int cur_thread;
  bool main_proxy;
} Arcus_API;

enum arcus_op {
  ARCUS_SET,
  ARCUS_CAS,
  ARCUS_ADD,
  ARCUS_REPLACE,
  ARCUS_APPEND,
  ARCUS_PREPEND,

Base.xs  view on Meta::CPAN

    arcus_proxy_close(arcus->proxy);
    memcached_free(arcus->proxy);
    arcus->proxy = NULL;
  }
}

static void initialize_arcus_api(pTHX_ Arcus_API *arcus, HV *conf)
{
  char *zk_address = NULL;
  char *service_code = NULL;
  int max_thread = -1;

  SV **ps = hv_fetchs(conf, "zk_address", 0);
  if (ps) {
    SvGETMAGIC(*ps);
  }
  if (ps && SvOK(*ps)) {
    if (!SvROK(*ps) || SvTYPE(SvRV(*ps)) != SVt_PVAV) {
      croak("zk_address argument is not array reference.");
    }

Base.xs  view on Meta::CPAN

  }
  if (ps && SvOK(*ps)) {
    service_code = SvPV_nolen(*ps);
  }
  if (service_code == NULL) {
    memcached_free(arcus->proxy);
    memcached_free(arcus->global);
    croak("service_code argument is invalid.");
  }

  ps = hv_fetchs(conf, "max_thread", 0);
  if (ps) {
    SvGETMAGIC(*ps);
  }
  if (ps && SvOK(*ps)) {
    if (SvIOK(*ps)) {
      max_thread = SvIV(*ps);
    }
  }
  if (max_thread < MIN_THREAD) {
    croak("max_thread argument is invalid. it should be greater than or equal to %d.", MIN_THREAD);
  }
  arcus->max_thread = max_thread;

  arcus_return_t ret = arcus_proxy_create(arcus->proxy, zk_address, service_code);
  if (ret != ARCUS_SUCCESS) {
    memcached_free(arcus->proxy);
    memcached_free(arcus->global);
    croak("failed to create the arcus proxy object: %d (%s)", ret, arcus_strerror(ret));
  }

  ps = hv_fetchs(conf, "connect_timeout", 0);
  if (ps) {

Base.xs  view on Meta::CPAN

    STRLEN name_len;
    char *namespace = SvPV(*ps, name_len);

    if (namespace && name_len > 0) {
      memcached_set_namespace(arcus->global, namespace, name_len);
    } else if (!namespace && name_len == 0) {
      warn("namespace argument is invalid. it is ignored.");
    }
  }

  arcus->pool = memcached_pool_create(arcus->global, MIN_THREAD, arcus->max_thread);
  if (arcus->pool == NULL) {
    arcus_proxy_close(arcus->proxy);
    memcached_free(arcus->proxy);
    memcached_free(arcus->global);
    croak("failed to create the memcached pool object.");
  }

  ret = arcus_proxy_connect(arcus->global, arcus->pool, arcus->proxy);
  if (ret != ARCUS_SUCCESS) {
    arcus_pool_close(arcus->pool);

Base.xs  view on Meta::CPAN

    Newx(arcus, 1, Arcus_API);
    arcus->global = memcached_create(NULL);
    if (arcus->global == NULL) {
      croak("Failed to create the global memcached object");
    }
    arcus->proxy = memcached_create(NULL);
    if (arcus->proxy == NULL) {
      croak("Failed to create the proxy memcached object");
    }
    arcus->global = memcached_create(NULL);
    arcus->cur_thread = 0;
    arcus->main_proxy = true;
    initialize_arcus_api(aTHX_ arcus, (HV *) SvRV(self));
    SV* sv = newSV(0);
    sv_setref_pv(sv, class, (void*)arcus);
    mXPUSHs(sv);
  } else {
    arcus = (Arcus_API *)SvUV(self);
  }
  arcus->cur_thread++;

void
DESTROY(arcus)
  Arcus_API *arcus
  CODE:
  arcus->cur_thread--;
  if (!arcus->cur_thread) {
    destroy_arcus_api(arcus);
  }

void
connect_proxy(arcus)
  Arcus_API *arcus
  CODE:
  arcus->cur_thread = 1;
  arcus->main_proxy = false;
  arcus->global = memcached_clone(NULL, arcus->global);
  if (arcus->global == NULL) {
    croak("Failed to create the global memcached object");
  }
  arcus->pool = memcached_pool_create(arcus->global, MIN_THREAD, arcus->max_thread);
  if (arcus->pool == NULL) {
    memcached_free(arcus->global);
    arcus->global = NULL;
    croak("Failed to create the memcached pool object");
  }
  arcus_return_t arcus_ret = arcus_proxy_connect(arcus->global, arcus->pool, arcus->proxy);
  if (arcus_ret != ARCUS_SUCCESS) {
    arcus->proxy = NULL;
    destroy_arcus_api(arcus);
    croak("Failed to connect : %d (%s)", arcus_ret, arcus_strerror(arcus_ret));

MANIFEST  view on Meta::CPAN

t/arithmetic.t
t/big-value.t
t/commands.t
t/delete.t
t/fork.t
t/get.t
t/magic.t
t/multi-op.t
t/serialize.t
t/store.t
t/thread.t

Base.xs
Changes
cpanfile
Makefile.PL
MANIFEST
ppport.h
README.md
typemap
META.yml                                 Module YAML meta-data (added by MakeMaker)

lib/Arcus/Client.pm  view on Meta::CPAN

      warn "failed to deserialize." if ($@);
    }
  }
  return ($value);
};

sub new {
  my ($class, $args) = @_;
  $args->{connect_timeout} //= 1.0; # second
  $args->{io_timeout}      //= 0.8; # second
  $args->{max_thread} //= 64;
  $args->{nowait} = 0; # TODO: Fix a C Client's noreply flags (issue: #3)
  $args->{hash_namespace} = 1;
  $args->{namespace} //= "";
  $args->{serialize_methods} //= [ \&Storable::nfreeze, \&Storable::thaw ];
  $args->{compress_threshold} //= -1;
  $args->{compress_ratio} //= 0.8;
  $args->{compress_methods} //= [ \&Compress::Zlib::memGzip,
                                  \&Compress::Zlib::memGunzip ] if $HAVE_ZLIB;
  my $arcus = $class->SUPER::new($args);
  bless($arcus, $class);

ppport.h  view on Meta::CPAN

NO_TAINT_SUPPORT|5.017006||Viu
not_a_number|5.005000||Viu
NOTE3|5.027001||Viu
NOTHING|5.003007||Viu
NOTHING_t8|5.035004||Viu
NOTHING_t8_p8|5.033003||Viu
NOTHING_t8_pb|5.033003||Viu
NOTHING_tb|5.035004||Viu
NOTHING_tb_p8|5.033003||Viu
NOTHING_tb_pb|5.033003||Viu
nothreadhook|5.008000|5.008000|
notify_parser_that_changed_to_utf8|5.025010||Viu
not_incrementable|5.021002||Viu
NOT_IN_PAD|5.005000||Viu
NOT_REACHED|5.019006|5.003007|poVnu
NPOSIXA|5.017003||Viu
NPOSIXA_t8|5.035004||Viu
NPOSIXA_t8_p8|5.033003||Viu
NPOSIXA_t8_pb|5.033003||Viu
NPOSIXA_tb|5.035004||Viu
NPOSIXA_tb_p8|5.033003||Viu

ppport.h  view on Meta::CPAN

PerlProc_popen_list|5.007001||Viu
PerlProc_setgid|5.005000||Viu
PerlProc_setjmp|5.005000||Viu
PerlProc_setuid|5.005000||Viu
PerlProc_signal|5.005000||Viu
PerlProc_sleep|5.005000||Viu
PerlProc_spawnvp|5.008000||Viu
PerlProc_times|5.005000||Viu
PerlProc_wait|5.005000||Viu
PerlProc_waitpid|5.005000||Viu
perl_pthread_mutex_lock|5.023006||Viu
perl_pthread_mutex_unlock|5.023006||Viu
PERL_PV_ESCAPE_ALL|5.009004|5.003007|p
PERL_PV_ESCAPE_DWIM|5.019008||Viu
PERL_PV_ESCAPE_DWIM_ALL_HEX|||Viu
PERL_PV_ESCAPE_FIRSTCHAR|5.009004|5.003007|p
PERL_PV_ESCAPE_NOBACKSLASH|5.009004|5.003007|p
PERL_PV_ESCAPE_NOCLEAR|5.009004|5.003007|p
PERL_PV_ESCAPE_NONASCII|5.013009|5.013009|
PERL_PV_ESCAPE_QUOTE|5.009004|5.003007|p
PERL_PV_ESCAPE_RE|5.009005|5.003007|p
PERL_PV_ESCAPE_UNI|5.009004|5.003007|p

ppport.h  view on Meta::CPAN

PL_sv_no|5.004005|5.003007|p
PL_sv_root|5.005000||Viu
PL_sv_serial|5.010001||Viu
PL_sv_undef|5.004005|5.003007|p
PL_sv_yes|5.004005|5.003007|p
PL_sv_zero|5.027003|5.027003|
PL_sys_intern|5.005000||Viu
PL_tainted|5.005000|5.003007|poVnu
PL_tainting|5.005000|5.003007|poVnu
PL_taint_warn|5.007003||Viu
PL_threadhook|5.008000||Viu
PL_tmps_floor|5.005000||Viu
PL_tmps_ix|5.005000||Viu
PL_tmps_max|5.005000||Viu
PL_tmps_stack|5.005000||Viu
PL_tokenbuf||5.003007|ponu
PL_top_env|5.005000||Viu
PL_toptarget|5.005000||Viu
PL_TR_SPECIAL_HANDLING_UTF8|5.031006||Viu
PL_underlying_numeric_obj|5.027009||Viu
PL_unicode|5.008001||Viu

ppport.h  view on Meta::CPAN

PRUNE_tb|5.035004||Viu
PRUNE_tb_p8|5.033003||Viu
PRUNE_tb_pb|5.033003||Viu
PSEUDO|5.009004||Viu
PSEUDO_t8|5.035004||Viu
PSEUDO_t8_p8|5.033003||Viu
PSEUDO_t8_pb|5.033003||Viu
PSEUDO_tb|5.035004||Viu
PSEUDO_tb_p8|5.033003||Viu
PSEUDO_tb_pb|5.033003||Viu
pthread_addr_t|5.005000||Viu
PTHREAD_ATFORK|5.007002||Viu
pthread_attr_init|5.006000||Viu
PTHREAD_ATTR_SETDETACHSTATE|5.006000||Viu
pthread_condattr_default|5.005000||Viu
PTHREAD_CREATE|5.006000||Viu
pthread_create|5.008001||Viu
PTHREAD_CREATE_JOINABLE|5.005000||Viu
PTHREAD_GETSPECIFIC|5.007002||Viu
PTHREAD_GETSPECIFIC_INT|5.006000||Viu
pthread_key_create|5.005000||Viu
pthread_keycreate|5.008001||Viu
pthread_mutexattr_default|5.005000||Viu
pthread_mutexattr_init|5.005000||Viu
pthread_mutexattr_settype|5.005000||Viu
pTHX_12|5.019010||Viu
pTHX_1|5.006000||Viu
pTHX_2|5.006000||Viu
pTHX_3|5.006000||Viu
pTHX_4|5.006000||Viu
pTHX|5.006000|5.003007|p
pTHX_5|5.009003||Viu
pTHX_6|5.009003||Viu
pTHX_7|5.009003||Viu
pTHX_8|5.009003||Viu

ppport.h  view on Meta::CPAN

TARGi|5.023005||Viu
TARGn|5.023005||Viu
TARGu|5.023005||Viu
telldir|5.005000||Viu
T_FMT|5.027010||Viu
T_FMT_AMPM|5.027010||Viu
THIS|5.003007|5.003007|V
THOUSEP|5.027010||Viu
THR|5.005000||Viu
THREAD_CREATE_NEEDS_STACK|5.007002||Viu
thread_locale_init|5.027009|5.027009|xnu
thread_locale_term|5.027009|5.027009|xnu
THREAD_RET_TYPE|5.005000||Viu
tied_method|5.013009||vViu
TIED_METHOD_ARGUMENTS_ON_STACK|5.013009||Viu
TIED_METHOD_MORTALIZE_NOT_NEEDED|5.013009||Viu
TIED_METHOD_SAY|5.013009||Viu
times|5.005000||Viu
Time_t|5.003007|5.003007|Vn
Timeval|5.004000|5.004000|Vn
TM|5.011000||Viu
tmpfile|5.003007||Viu

ppport.h  view on Meta::CPAN

#endif
#ifndef dTHR
#  define dTHR                           dNOOP
#endif
#ifndef dTHX
#  define dTHX                           dNOOP
#endif

/* Hint: dTHX

  For pre-5.6.0 thread compatibility, instead use dTHXR, available only through
  ppport.h */
#ifndef dTHXa
#  define dTHXa(x)                       dNOOP
#endif
#ifndef pTHX
#  define pTHX                           void
#endif

#ifndef pTHX_
#  define pTHX_
#endif

#ifndef aTHX
#  define aTHX
#endif

/* Hint: aTHX

  For pre-5.6.0 thread compatibility, instead use aTHXR, available only through
  ppport.h */
#ifndef aTHX_
#  define aTHX_
#endif

/* Hint: aTHX_

  For pre-5.6.0 thread compatibility, instead use aTHXR_, available only
  through ppport.h */

#if (PERL_BCDVERSION < 0x5006000)
#  ifdef USE_THREADS
#    define aTHXR  thr
#    define aTHXR_ thr,
#  else
#    define aTHXR
#    define aTHXR_
#  endif

ppport.h  view on Meta::CPAN

U32 DPPP_(my_PL_signals) = D_PPP_PERL_SIGNALS_INIT;
#else
extern U32 DPPP_(my_PL_signals);
#endif
#define PL_signals DPPP_(my_PL_signals)

#endif

/* Hint: PL_ppaddr
 * Calling an op via PL_ppaddr requires passing a context argument
 * for threaded builds. Since the context argument is different for
 * 5.005 perls, you can use aTHXR (supplied by ppport.h), which will
 * automatically be defined as the correct argument.
 */

#if (PERL_BCDVERSION <= 0x5005005)
/* Replace: 1 */
#  define PL_ppaddr                 ppaddr
#  define PL_no_modify              no_modify
/* Replace: 0 */
#endif

ppport.h  view on Meta::CPAN

#if defined(PERL_USE_GCC_BRACE_GROUPS)
#  define newRV_noinc(sv) ({ SV *_sv = (SV *)newRV((sv)); SvREFCNT_dec((sv)); _sv; })
#else
#  define newRV_noinc(sv) ((PL_Sv = (SV *)newRV((sv))), SvREFCNT_dec((sv)), PL_Sv)
#endif
#endif

/*
 * Boilerplate macros for initializing and accessing interpreter-local
 * data from C.  All statics in extensions should be reworked to use
 * this, if you want to make the extension thread-safe.  See ext/re/re.xs
 * for an example of the use of these macros.
 *
 * Code that uses these macros is responsible for the following:
 * 1. #define MY_CXT_KEY to a unique string, e.g. "DynaLoader_guts"
 * 2. Declare a typedef named my_cxt_t that is a structure that contains
 *    all the data that needs to be interpreter-local.
 * 3. Use the START_MY_CXT macro after the declaration of my_cxt_t.
 * 4. Use the MY_CXT_INIT macro such that it is called exactly once
 *    (typically put in the BOOT: section).
 * 5. Use the members of the my_cxt_t structure everywhere as

ppport.h  view on Meta::CPAN

 * 6. Use the dMY_CXT macro (a declaration) in all the functions that
 *    access MY_CXT.
 */

#if defined(MULTIPLICITY) || defined(PERL_OBJECT) || \
    defined(PERL_CAPI)    || defined(PERL_IMPLICIT_CONTEXT)

#ifndef START_MY_CXT

/* This must appear in all extensions that define a my_cxt_t structure,
 * right after the definition (i.e. at file scope).  The non-threads
 * case below uses it to declare the data as static. */
#define START_MY_CXT

#if (PERL_BCDVERSION < 0x5004068)
/* Fetches the SV that keeps the per-interpreter data. */
#define dMY_CXT_SV \
        SV *my_cxt_sv = get_sv(MY_CXT_KEY, FALSE)
#else /* >= perl5.004_68 */
#define dMY_CXT_SV \
        SV *my_cxt_sv = *hv_fetch(PL_modglobal, MY_CXT_KEY,             \

ppport.h  view on Meta::CPAN

        if (*sp + len <= send && memEQ(*sp, radix, len)) {
            *sp += len;
            return TRUE;
        }
    }
#else
    /* older perls don't have PL_numeric_radix_sv so the radix
     * must manually be requested from locale.h
     */
#include <locale.h>
    dTHR;  /* needed for older threaded perls */
    struct lconv *lc = localeconv();
    char *radix = lc->decimal_point;
    if (radix && IN_LOCALE) {
        STRLEN len = strlen(radix);
        if (*sp + len <= send && memEQ(*sp, radix, len)) {
            *sp += len;
            return TRUE;
        }
    }
#endif

t/thread.t  view on Meta::CPAN

#!/bin/perl

use strict;
use warnings;
use Test::More;

use Arcus::Client;
use threads;
use threads::shared;

use FindBin;
use lib "$FindBin::Bin";
use ArcusTestCommon;

if (not ArcusTestCommon->is_zk_port_opened()) {
  plan skip_all => "zk is not running...";
}

open(STDERR, '>', '/dev/null');

my $cache = ArcusTestCommon->create_client("perl-test:");
unless (ok($cache, "Check Arcus Client Is Created Appropriately")) {
  plan skip_all => "arcus client is not created appropriately...";
};

sub cache_worker {
  my ($thread_id) = @_;
  my $key = "key_$thread_id";
  my $value = "value_from_thread_$thread_id";
  if ($cache->set($key, $value) and ($cache->get($key) eq $value)) {
    return 1;
  }
  return 0;
}

my @threads;
for my $i (1..5) {
  push @threads, threads->create(\&cache_worker, $i);
}

for my $i (0 .. $#threads) {
  my $ret = $threads[$i]->join();
  ok($ret, "Thread $i Test");
}

for my $i (1..5) {
  my $key = "key_$i";
  my $value = "value_from_thread_$i";
  is($cache->get($key), $value, "Main: get $key");
}

sub update_worker {
  my ($thread_id) = @_;
  $cache->set("key", $thread_id);
  return 1;
}

my @update_threads;
my $num_threads = 5;
for my $i (1..$num_threads) {
  push @update_threads, threads->create(\&update_worker, $i);
}

for my $i (0 .. $#update_threads) {
  my $ret = $update_threads[$i]->join();
  ok($ret, "Thread $i Test");
}

ok(grep { $_ eq $cache->get("key") } (1..$num_threads), "value is one of the expected values");

ok($cache->flush_all, "Flush All");
done_testing();



( run in 0.590 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )