Net-ZooKeeper
view release on metacpan or search on metacpan
ZooKeeper.xs view on Meta::CPAN
ZOO_PERM_READ = ZOO_PERM_READ
ZOO_PERM_WRITE = ZOO_PERM_WRITE
ZOO_PERM_CREATE = ZOO_PERM_CREATE
ZOO_PERM_DELETE = ZOO_PERM_DELETE
ZOO_PERM_ADMIN = ZOO_PERM_ADMIN
ZOO_PERM_ALL = ZOO_PERM_ALL
ZOO_CREATED_EVENT = ZOO_CREATED_EVENT
ZOO_DELETED_EVENT = ZOO_DELETED_EVENT
ZOO_CHANGED_EVENT = ZOO_CHANGED_EVENT
ZOO_CHILD_EVENT = ZOO_CHILD_EVENT
ZOO_SESSION_EVENT = ZOO_SESSION_EVENT
ZOO_NOTWATCHING_EVENT = ZOO_NOTWATCHING_EVENT
ZOO_EXPIRED_SESSION_STATE = ZOO_EXPIRED_SESSION_STATE
ZOO_AUTH_FAILED_STATE = ZOO_AUTH_FAILED_STATE
ZOO_CONNECTING_STATE = ZOO_CONNECTING_STATE
ZOO_ASSOCIATING_STATE = ZOO_ASSOCIATING_STATE
ZOO_CONNECTED_STATE = ZOO_CONNECTED_STATE
ZOO_LOG_LEVEL_OFF = ZOO_LOG_LEVEL_OFF
ZOO_LOG_LEVEL_ERROR = ZOO_LOG_LEVEL_ERROR
ZOO_LOG_LEVEL_WARN = ZOO_LOG_LEVEL_WARN
ZOO_LOG_LEVEL_INFO = ZOO_LOG_LEVEL_INFO
ZOO_LOG_LEVEL_DEBUG = ZOO_LOG_LEVEL_DEBUG
CODE:
if (!ix) {
if (!alias) {
alias = GvNAME(CvGV(cv));
}
if (strEQ(alias, "ZOK")) {
RETVAL = ZOK;
}
else if (strEQ(alias, "ZOO_LOG_LEVEL_OFF")) {
RETVAL = ZOO_LOG_LEVEL_OFF;
}
else {
Perl_croak(aTHX_ "unknown " PACKAGE_NAME " constant: %s",
alias);
}
}
else {
RETVAL = ix;
}
OUTPUT:
RETVAL
AV *
zk_acl_constant(alias=Nullch)
char *alias
ALIAS:
ZOO_OPEN_ACL_UNSAFE = 1
ZOO_READ_ACL_UNSAFE = 2
ZOO_CREATOR_ALL_ACL = 3
PREINIT:
struct ACL_vector acl;
AV *acl_arr;
int i;
PPCODE:
if (!ix && !alias) {
alias = GvNAME(CvGV(cv));
}
if (ix == 1 || (alias != NULL && strEQ(alias, "ZOO_OPEN_ACL_UNSAFE"))) {
acl = ZOO_OPEN_ACL_UNSAFE;
}
else if (ix == 2 || (alias != NULL && strEQ(alias, "ZOO_READ_ACL_UNSAFE"))) {
acl = ZOO_READ_ACL_UNSAFE;
}
else if (ix == 3 || (alias != NULL && strEQ(alias, "ZOO_CREATOR_ALL_ACL"))) {
acl = ZOO_CREATOR_ALL_ACL;
}
else {
Perl_croak(aTHX_ "unknown " PACKAGE_NAME " constant: %s", alias);
}
acl_arr = newAV();
av_extend(acl_arr, acl.count);
for (i = 0; i < acl.count; ++i) {
HV *acl_entry_hash = newHV();
SV *val;
_zk_fill_acl_entry_hash(aTHX_ &acl.data[i], acl_entry_hash);
val = newRV_noinc((SV*) acl_entry_hash);
if (!av_store(acl_arr, i, val)) {
SvREFCNT_dec(val);
}
}
ST(0) = sv_2mortal(newRV_noinc((SV*) acl_arr));
XSRETURN(1);
void
zk_set_log_level(level)
int level
PPCODE:
if (level < ZOO_LOG_LEVEL_OFF || level > ZOO_LOG_LEVEL_DEBUG) {
Perl_croak(aTHX_ "invalid log level: %d", level);
}
zoo_set_debug_level(level);
XSRETURN_EMPTY;
void
zk_set_deterministic_conn_order(flag)
bool flag
PPCODE:
zoo_deterministic_conn_order(!!flag);
XSRETURN_EMPTY;
void
zk_set_ignore_session_events(flag)
bool flag
PPCODE:
zk_ignore_session_events = flag;
XSRETURN_EMPTY;
void
zk_new(package, hosts, ...)
char *package
char *hosts
PREINIT:
int recv_timeout = DEFAULT_RECV_TIMEOUT_MSEC;
const clientid_t *client_id = NULL;
zk_t *zk;
zk_handle_t *handle;
HV *stash, *zk_hash, *attr_hash;
SV *attr;
int i;
PPCODE:
if (items > 2 && items % 2) {
Perl_croak(aTHX_ "invalid number of arguments");
}
for (i = 2; i < items; i += 2) {
char *key = SvPV_nolen(ST(i));
if (strcaseEQ(key, "session_timeout")) {
recv_timeout = SvIV(ST(i + 1));
/* NOTE: would be nice if requirement in zookeeper_interest()
* that recv_timeout*2 be non-negative was documented
*/
if (recv_timeout < 0 || recv_timeout > (PERL_INT_MAX >> 1)) {
Perl_croak(aTHX_ "invalid session timeout: %d",
recv_timeout);
}
}
else if (strcaseEQ(key, "session_id")) {
STRLEN client_id_len;
client_id = (const clientid_t*) SvPV(ST(i + 1), client_id_len);
if (client_id_len != sizeof(clientid_t)) {
Perl_croak(aTHX_ "invalid session ID");
}
}
}
Newxz(zk, 1, zk_t);
zk->handle = zookeeper_init(hosts, NULL, recv_timeout,
client_id, NULL, 0);
if (!zk->handle) {
Safefree(zk);
XSRETURN_UNDEF;
}
Newxz(zk->first_watch, 1, zk_watch_t);
zk->data_buf_len = DEFAULT_DATA_BUF_LEN;
zk->path_buf_len = DEFAULT_PATH_BUF_LEN;
zk->watch_timeout = DEFAULT_WATCH_TIMEOUT;
zk->hosts_len = strlen(hosts);
zk->hosts = savepvn(hosts, zk->hosts_len);
Newx(handle, 1, zk_handle_t);
handle->signature = PACKAGE_SIGNATURE;
handle->handle.zk = zk;
/* We use several tricks from DBI here. The attr_hash is our
* empty inner hash; we attach extra magic to it in the form of
* our zk_handle_t structure. Then we tie attr_hash to zk_hash,
* our outer hash. This is what is passed around (by reference) by
* callers.
*
* Most methods use _zk_get_handle_outer() which finds our inner
* handle, then returns the zk_t structure from its extra magic
* pointer.
*
* However, the tied hash methods, FETCH(), STORE(), and so forth,
* receive an already-dereferenced inner handle hash. This is
* because we bless both inner and outer handles into this class,
* so when a caller's code references a hash element in our
* outer handle, Perl detects its tied magic, looks up the
* tied object (our inner handle) and invokes the tied hash methods
* in its class on it. Since we blessed it into the same class
* as the outer handle, these methods simply reside in our package.
*/
stash = gv_stashpv(package, GV_ADDWARN);
attr_hash = newHV();
sv_magic((SV*) attr_hash, Nullsv, PERL_MAGIC_ext,
(const char*) handle, 0);
attr = sv_bless(newRV_noinc((SV*) attr_hash), stash);
zk_hash = newHV();
sv_magic((SV*) zk_hash, attr, PERL_MAGIC_tied, Nullch, 0);
SvREFCNT_dec(attr);
ST(0) = sv_bless(sv_2mortal(newRV_noinc((SV*) zk_hash)), stash);
XSRETURN(1);
void
zk_DESTROY(zkh)
Net::ZooKeeper zkh
PREINIT:
zk_handle_t *handle;
HV *attr_hash;
int ret = ZBADARGUMENTS;
PPCODE:
handle = _zk_check_handle_outer(aTHX_ zkh, &attr_hash,
PACKAGE_NAME, PACKAGE_SIGNATURE);
if (!handle) {
handle = _zk_check_handle_inner(aTHX_ zkh, PACKAGE_SIGNATURE);
if (handle) {
attr_hash = zkh;
zkh = NULL;
}
}
if (handle) {
zk_t *zk = handle->handle.zk;
ret = zookeeper_close(zk->handle);
/* detach all now-inactive watches still tied to handles */
_zk_release_watches(aTHX_ zk->first_watch, 1);
Safefree(zk->first_watch);
Safefree(zk->hosts);
Safefree(zk);
Safefree(handle);
sv_unmagic((SV*) attr_hash, PERL_MAGIC_ext);
}
if (zkh && attr_hash) {
sv_unmagic((SV*) zkh, PERL_MAGIC_tied);
}
if (GIMME_V == G_VOID) {
XSRETURN_EMPTY;
}
else if (ret == ZOK) {
XSRETURN_YES;
}
else {
XSRETURN_NO;
}
void
zk_CLONE(package)
char *package
PPCODE:
XSRETURN_EMPTY;
void
zk_CLONE_SKIP(package)
char *package
PPCODE:
XSRETURN_YES;
void
zk_TIEHASH(package, ...)
char *package
PPCODE:
Perl_croak(aTHX_ "tying hashes of class "
PACKAGE_NAME " not supported");
void
zk_UNTIE(attr_hash, ref_count)
Net::ZooKeeper attr_hash
IV ref_count
PPCODE:
Perl_croak(aTHX_ "untying hashes of class "
PACKAGE_NAME " not supported");
void
zk_FIRSTKEY(attr_hash)
Net::ZooKeeper attr_hash
PREINIT:
zk_t *zk;
PPCODE:
zk = _zk_get_handle_inner(aTHX_ attr_hash);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
ST(0) = sv_2mortal(newSVpvn(zk_keys[0].name, zk_keys[0].name_len));
XSRETURN(1);
void
zk_NEXTKEY(attr_hash, attr_key)
Net::ZooKeeper attr_hash
SV *attr_key
PREINIT:
zk_t *zk;
char *key;
int i;
PPCODE:
zk = _zk_get_handle_inner(aTHX_ attr_hash);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
for (i = 0; i < NUM_KEYS; ++i) {
if (strcaseEQ(key, zk_keys[i].name)) {
++i;
break;
}
}
if (i < NUM_KEYS) {
ST(0) = sv_2mortal(newSVpvn(zk_keys[i].name, zk_keys[i].name_len));
XSRETURN(1);
}
else {
XSRETURN_EMPTY;
}
void
zk_SCALAR(attr_hash)
Net::ZooKeeper attr_hash
PPCODE:
XSRETURN_YES;
void
zk_FETCH(attr_hash, attr_key)
Net::ZooKeeper attr_hash
SV *attr_key
PREINIT:
zk_t *zk;
char *key;
SV *val = NULL;
PPCODE:
zk = _zk_get_handle_inner(aTHX_ attr_hash);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
if (strcaseEQ(key, "data_read_len")) {
val = newSViv(zk->data_buf_len);
}
else if (strcaseEQ(key, "path_read_len")) {
val = newSViv(zk->path_buf_len);
}
else if (strcaseEQ(key, "watch_timeout")) {
val = newSVuv(zk->watch_timeout);
}
else if (strcaseEQ(key, "hosts")) {
val = newSVpvn(zk->hosts, zk->hosts_len);
}
else if (strcaseEQ(key, "session_timeout")) {
val = newSViv(zoo_recv_timeout(zk->handle));
}
else if (strcaseEQ(key, "session_id")) {
const clientid_t *client_id;
clientid_t null_client_id;
client_id = zoo_client_id(zk->handle);
memset(&null_client_id, 0, sizeof(clientid_t));
if (!memcmp(client_id, &null_client_id, sizeof(clientid_t))) {
val = newSVpv("", 0);
}
else {
val = newSVpvn((const char*) client_id, sizeof(clientid_t));
}
}
else if (strcaseEQ(key, "pending_watches")) {
/* cleanup any completed watches not tied to a handle */
val = newSVuv(_zk_release_watches(aTHX_ zk->first_watch, 0));
}
else if (strcaseEQ(key, "state")) {
val = newSViv(zoo_state(zk->handle));
}
if (val) {
ST(0) = sv_2mortal(val);
XSRETURN(1);
}
Perl_warn(aTHX_ "invalid element: %s", key);
XSRETURN_UNDEF;
void
zk_STORE(attr_hash, attr_key, attr_val)
Net::ZooKeeper attr_hash
SV *attr_key
SV *attr_val
PREINIT:
zk_t *zk;
char *key;
PPCODE:
zk = _zk_get_handle_inner(aTHX_ attr_hash);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
if (strcaseEQ(key, "data_read_len")) {
int val = SvIV(attr_val);
if (val < 0) {
Perl_croak(aTHX_ "invalid data read length: %d", val);
}
zk->data_buf_len = val;
}
else if (strcaseEQ(key, "path_read_len")) {
int val = SvIV(attr_val);
if (val < 0) {
Perl_croak(aTHX_ "invalid path read length: %d", val);
}
zk->path_buf_len = val;
}
else if (strcaseEQ(key, "watch_timeout")) {
zk->watch_timeout = SvUV(attr_val);
}
else {
int i;
for (i = 0; i < NUM_KEYS; ++i) {
if (strcaseEQ(key, zk_keys[i].name)) {
Perl_warn(aTHX_ "read-only element: %s", key);
XSRETURN_EMPTY;
}
}
Perl_warn(aTHX_ "invalid element: %s", key);
}
XSRETURN_EMPTY;
void
zk_EXISTS(attr_hash, attr_key)
Net::ZooKeeper attr_hash
SV *attr_key
PREINIT:
zk_t *zk;
char *key;
int i;
PPCODE:
zk = _zk_get_handle_inner(aTHX_ attr_hash);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
for (i = 0; i < NUM_KEYS; ++i) {
if (strcaseEQ(key, zk_keys[i].name)) {
XSRETURN_YES;
}
}
XSRETURN_NO;
void
zk_DELETE(attr_hash, attr_key)
Net::ZooKeeper attr_hash
SV *attr_key
PPCODE:
Perl_warn(aTHX_ "deleting elements from hashes of class "
PACKAGE_NAME " not supported");
XSRETURN_EMPTY;
void
zk_CLEAR(attr_hash)
Net::ZooKeeper attr_hash
PPCODE:
Perl_warn(aTHX_ "clearing hashes of class "
PACKAGE_NAME " not supported");
XSRETURN_EMPTY;
SV *
zk_get_error(zkh)
Net::ZooKeeper zkh
PREINIT:
zk_t *zk;
CODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
RETVAL = newSViv(zk->last_ret);
errno = zk->last_errno;
OUTPUT:
RETVAL
void
zk_add_auth(zkh, scheme, cert)
Net::ZooKeeper zkh
char *scheme
char *cert; cert = (char *) SvPV($arg, cert_len);
PREINIT:
zk_t *zk;
STRLEN cert_len;
zk_watch_t *watch;
int ret;
PPCODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
zk->last_ret = ZOK;
zk->last_errno = 0;
if (cert_len > PERL_INT_MAX) {
Perl_croak(aTHX_ "invalid certificate length: %u", cert_len);
}
watch = _zk_create_watch(aTHX);
if (!watch) {
/* errno will be set */
zk->last_ret = ZSYSTEMERROR;
zk->last_errno = errno;
XSRETURN_NO;
}
errno = 0;
ret = zoo_add_auth(zk->handle, scheme, cert, cert_len,
_zk_auth_completion, watch);
zk->last_ret = ret;
zk->last_errno = errno;
if (ret == ZOK) {
pthread_mutex_lock(&watch->mutex);
while (!watch->done) {
pthread_cond_wait(&watch->cond, &watch->mutex);
}
pthread_mutex_unlock(&watch->mutex);
if (watch->done) {
ret = watch->ret;
}
else {
ret = ZINVALIDSTATE;
}
/* errno may be set while we waited */
zk->last_ret = ret;
zk->last_errno = errno;
}
_zk_destroy_watch(aTHX_ watch);
if (ret == ZOK) {
XSRETURN_YES;
}
else {
XSRETURN_NO;
}
void
zk_create(zkh, path, buf, ...)
Net::ZooKeeper zkh
char *path
char *buf; buf = (char *) SvPV($arg, buf_len);
PREINIT:
zk_t *zk;
STRLEN buf_len;
int flags = 0;
char *path_buf;
int path_buf_len;
AV *acl_arr = NULL;
struct ACL_vector acl;
int i, ret;
PPCODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
zk->last_ret = ZOK;
zk->last_errno = 0;
if (items > 3 && !(items % 2)) {
Perl_croak(aTHX_ "invalid number of arguments");
}
if (buf_len > PERL_INT_MAX) {
Perl_croak(aTHX_ "invalid data length: %u", buf_len);
}
path_buf_len = zk->path_buf_len;
for (i = 3; i < items; i += 2) {
char *key = SvPV_nolen(ST(i));
if (strcaseEQ(key, "path_read_len")) {
path_buf_len = SvIV(ST(i + 1));
if (path_buf_len < 2) {
Perl_croak(aTHX_ "invalid path read length: %d",
path_buf_len);
}
}
else if (strcaseEQ(key, "flags")) {
flags = SvIV(ST(i + 1));
if (flags & ~(ZOO_SEQUENCE | ZOO_EPHEMERAL)) {
Perl_croak(aTHX_ "invalid create flags: %d", flags);
}
}
else if (strcaseEQ(key, "acl")) {
const char *err;
if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVAV) {
Perl_croak(aTHX_ "invalid ACL array reference");
}
acl_arr = (AV*) SvRV(ST(i + 1));
err = _zk_fill_acl(aTHX_ acl_arr, &acl);
if (err) {
Perl_croak(aTHX_ err);
}
}
}
/* NOTE: would be nice to be able to rely on null-terminated string */
++path_buf_len;
Newxz(path_buf, path_buf_len, char);
errno = 0;
ret = zoo_create(zk->handle, path, buf, buf_len,
(acl_arr ? &acl : NULL), flags,
path_buf, path_buf_len);
zk->last_ret = ret;
zk->last_errno = errno;
if (acl_arr) {
_zk_free_acl(aTHX_ &acl);
}
if (ret == ZOK) {
ST(0) = sv_newmortal();
#ifdef SV_HAS_TRAILING_NUL
sv_usepvn_flags(ST(0), path_buf, strlen(path_buf),
SV_HAS_TRAILING_NUL);
#else
sv_usepvn(ST(0), path_buf, strlen(path_buf));
#endif
SvCUR_set(ST(0), strlen(path_buf));
XSRETURN(1);
}
Safefree(path_buf);
XSRETURN_UNDEF;
void
zk_delete(zkh, path, ...)
Net::ZooKeeper zkh
char *path
PREINIT:
zk_t *zk;
int version = -1;
int i, ret;
PPCODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
zk->last_ret = ZOK;
zk->last_errno = 0;
if (items > 2 && items % 2) {
Perl_croak(aTHX_ "invalid number of arguments");
}
for (i = 2; i < items; i += 2) {
char *key = SvPV_nolen(ST(i));
if (strcaseEQ(key, "version")) {
version = SvIV(ST(i + 1));
if (version < 0) {
Perl_croak(aTHX_ "invalid version requirement: %d",
version);
}
}
}
errno = 0;
ret = zoo_delete(zk->handle, path, version);
zk->last_ret = ret;
zk->last_errno = errno;
if (ret == ZOK) {
XSRETURN_YES;
}
else {
XSRETURN_NO;
}
void
zk_exists(zkh, path, ...)
Net::ZooKeeper zkh
char *path
PREINIT:
zk_t *zk;
zk_stat_t *stat = NULL;
zk_watch_t *old_watch = NULL;
zk_handle_t *watch_handle = NULL;
watcher_fn watcher = NULL;
zk_watch_t *new_watch = NULL;
int i, ret;
PPCODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
zk->last_ret = ZOK;
zk->last_errno = 0;
if (items > 2 && items % 2) {
Perl_croak(aTHX_ "invalid number of arguments");
}
for (i = 2; i < items; i += 2) {
char *key = SvPV_nolen(ST(i));
if (strcaseEQ(key, "stat")) {
if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
!sv_derived_from(ST(i + 1), STAT_PACKAGE_NAME)) {
Perl_croak(aTHX_ "stat is not a hash reference of "
"type " STAT_PACKAGE_NAME);
}
stat = _zks_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)));
if (!stat) {
Perl_croak(aTHX_ "invalid stat handle");
}
}
else if (strcaseEQ(key, "watch")) {
if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
!sv_derived_from(ST(i + 1), WATCH_PACKAGE_NAME)) {
Perl_croak(aTHX_ "watch is not a hash reference of "
"type " WATCH_PACKAGE_NAME);
}
old_watch = _zkw_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)),
&watch_handle);
if (!old_watch) {
Perl_croak(aTHX_ "invalid watch handle");
}
}
}
if (watch_handle) {
new_watch = _zk_acquire_watch(aTHX);
if (!new_watch) {
/* errno will be set */
zk->last_ret = ZSYSTEMERROR;
zk->last_errno = errno;
XSRETURN_NO;
}
watcher = _zk_watcher;
}
errno = 0;
ret = zoo_wexists(zk->handle, path, watcher, new_watch, stat);
zk->last_ret = ret;
zk->last_errno = errno;
if (watch_handle) {
_zk_replace_watch(aTHX_ watch_handle, zk->first_watch,
old_watch, new_watch);
}
if (ret == ZOK) {
XSRETURN_YES;
}
else {
XSRETURN_NO;
}
void
zk_get_children(zkh, path, ...)
Net::ZooKeeper zkh
char *path
PREINIT:
zk_t *zk;
zk_watch_t *old_watch = NULL;
zk_handle_t *watch_handle = NULL;
watcher_fn watcher = NULL;
zk_watch_t *new_watch = NULL;
struct String_vector strings;
int i, ret;
PPCODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
zk->last_ret = ZOK;
zk->last_errno = 0;
if (items > 2 && items % 2) {
Perl_croak(aTHX_ "invalid number of arguments");
}
for (i = 2; i < items; i += 2) {
char *key = SvPV_nolen(ST(i));
if (strcaseEQ(key, "watch")) {
if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
!sv_derived_from(ST(i + 1), WATCH_PACKAGE_NAME)) {
Perl_croak(aTHX_ "watch is not a hash reference of "
"type " WATCH_PACKAGE_NAME);
}
old_watch = _zkw_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)),
&watch_handle);
if (!old_watch) {
Perl_croak(aTHX_ "invalid watch handle");
}
}
}
if (watch_handle) {
new_watch = _zk_acquire_watch(aTHX);
if (!new_watch) {
/* errno will be set */
zk->last_ret = ZSYSTEMERROR;
zk->last_errno = errno;
if (GIMME_V == G_ARRAY) {
XSRETURN_EMPTY;
}
else {
XSRETURN_UNDEF;
}
}
watcher = _zk_watcher;
}
Zero(&strings, 1, struct String_vector);
errno = 0;
ret = zoo_wget_children(zk->handle, path, watcher, new_watch,
&strings);
zk->last_ret = ret;
zk->last_errno = errno;
if (watch_handle) {
_zk_replace_watch(aTHX_ watch_handle, zk->first_watch,
old_watch, new_watch);
}
if (ret == ZOK) {
int num_children;
num_children =
(strings.count > PERL_INT_MAX) ? PERL_INT_MAX : strings.count;
if (GIMME_V == G_ARRAY && num_children > 0) {
EXTEND(SP, num_children);
for (i = 0; i < num_children; ++i) {
ST(i) = sv_2mortal(newSVpv(strings.data[i], 0));
}
}
/* NOTE: would be nice if this were documented as required */
deallocate_String_vector(&strings);
if (GIMME_V == G_ARRAY) {
if (num_children == 0) {
XSRETURN_EMPTY;
}
XSRETURN(num_children);
}
else {
ST(0) = sv_2mortal(newSViv(num_children));
XSRETURN(1);
}
}
else {
if (GIMME_V == G_ARRAY) {
XSRETURN_EMPTY;
}
else {
XSRETURN_UNDEF;
}
}
void
zk_get(zkh, path, ...)
Net::ZooKeeper zkh
char *path
PREINIT:
zk_t *zk;
int buf_len;
zk_stat_t *stat = NULL;
zk_watch_t *old_watch = NULL;
zk_handle_t *watch_handle = NULL;
char *buf;
watcher_fn watcher = NULL;
zk_watch_t *new_watch = NULL;
int i, ret;
PPCODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
zk->last_ret = ZOK;
zk->last_errno = 0;
if (items > 2 && items % 2) {
Perl_croak(aTHX_ "invalid number of arguments");
}
buf_len = zk->data_buf_len;
for (i = 2; i < items; i += 2) {
char *key = SvPV_nolen(ST(i));
if (strcaseEQ(key, "data_read_len")) {
buf_len = SvIV(ST(i + 1));
if (buf_len < 0) {
Perl_croak(aTHX_ "invalid data read length: %d",
buf_len);
}
}
else if (strcaseEQ(key, "stat")) {
if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
!sv_derived_from(ST(i + 1), STAT_PACKAGE_NAME)) {
Perl_croak(aTHX_ "stat is not a hash reference of "
"type " STAT_PACKAGE_NAME);
}
stat = _zks_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)));
if (!stat) {
Perl_croak(aTHX_ "invalid stat handle");
}
}
else if (strcaseEQ(key, "watch")) {
if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
!sv_derived_from(ST(i + 1), WATCH_PACKAGE_NAME)) {
Perl_croak(aTHX_ "watch is not a hash reference of "
"type " WATCH_PACKAGE_NAME);
}
old_watch = _zkw_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)),
&watch_handle);
if (!old_watch) {
Perl_croak(aTHX_ "invalid watch handle");
}
}
}
if (watch_handle) {
new_watch = _zk_acquire_watch(aTHX);
if (!new_watch) {
/* errno will be set */
zk->last_ret = ZSYSTEMERROR;
zk->last_errno = errno;
XSRETURN_UNDEF;
}
watcher = _zk_watcher;
}
Newx(buf, buf_len + 1, char);
errno = 0;
ret = zoo_wget(zk->handle, path, watcher, new_watch,
buf, &buf_len, stat);
zk->last_ret = ret;
zk->last_errno = errno;
if (watch_handle) {
_zk_replace_watch(aTHX_ watch_handle, zk->first_watch,
old_watch, new_watch);
}
if (ret == ZOK) {
ST(0) = sv_newmortal();
#ifdef SV_HAS_TRAILING_NUL
buf[buf_len] = '\0';
sv_usepvn_flags(ST(0), buf, buf_len, SV_HAS_TRAILING_NUL);
#else
sv_usepvn(ST(0), buf, buf_len);
#endif
XSRETURN(1);
}
else {
Safefree(buf);
XSRETURN_UNDEF;
}
void
zk_set(zkh, path, buf, ...)
Net::ZooKeeper zkh
char *path
char *buf; buf = (char *) SvPV($arg, buf_len);
PREINIT:
zk_t *zk;
int version = -1;
zk_stat_t *stat = NULL;
STRLEN buf_len;
int i, ret;
PPCODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
zk->last_ret = ZOK;
zk->last_errno = 0;
if (items > 3 && !(items % 2)) {
Perl_croak(aTHX_ "invalid number of arguments");
}
if (buf_len > PERL_INT_MAX) {
Perl_croak(aTHX_ "invalid data length: %u", buf_len);
}
for (i = 3; i < items; i += 2) {
char *key = SvPV_nolen(ST(i));
if (strcaseEQ(key, "version")) {
version = SvIV(ST(i + 1));
if (version < 0) {
Perl_croak(aTHX_ "invalid version requirement: %d",
version);
}
}
else if (strcaseEQ(key, "stat")) {
if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
!sv_derived_from(ST(i + 1), STAT_PACKAGE_NAME)) {
Perl_croak(aTHX_ "stat is not a hash reference of "
"type " STAT_PACKAGE_NAME);
}
stat = _zks_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)));
if (!stat) {
Perl_croak(aTHX_ "invalid stat handle");
}
}
}
errno = 0;
ret = zoo_set2(zk->handle, path, buf, buf_len, version, stat);
zk->last_ret = ret;
zk->last_errno = errno;
if (ret == ZOK) {
XSRETURN_YES;
}
else {
XSRETURN_NO;
}
void
zk_get_acl(zkh, path, ...)
Net::ZooKeeper zkh
char *path
PREINIT:
zk_t *zk;
zk_stat_t *stat = NULL;
struct ACL_vector acl;
int i, ret;
PPCODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
zk->last_ret = ZOK;
zk->last_errno = 0;
if (items > 2 && items % 2) {
Perl_croak(aTHX_ "invalid number of arguments");
}
for (i = 2; i < items; i += 2) {
char *key = SvPV_nolen(ST(i));
if (strcaseEQ(key, "stat")) {
if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
!sv_derived_from(ST(i + 1), STAT_PACKAGE_NAME)) {
Perl_croak(aTHX_ "stat is not a hash reference of "
"type " STAT_PACKAGE_NAME);
}
stat = _zks_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)));
if (!stat) {
Perl_croak(aTHX_ "invalid stat handle");
}
}
}
errno = 0;
ret = zoo_get_acl(zk->handle, path, &acl, stat);
zk->last_ret = ret;
zk->last_errno = errno;
if (ret == ZOK) {
int num_acl_entries;
num_acl_entries =
(acl.count > PERL_INT_MAX) ? PERL_INT_MAX : acl.count;
if (GIMME_V == G_ARRAY && num_acl_entries > 0) {
EXTEND(SP, num_acl_entries);
for (i = 0; i < num_acl_entries; ++i) {
HV *acl_entry_hash = newHV();
_zk_fill_acl_entry_hash(aTHX_ &acl.data[i],
acl_entry_hash);
ST(i) = sv_2mortal(newRV_noinc((SV*) acl_entry_hash));
}
}
/* NOTE: would be nice if this were documented as required */
deallocate_ACL_vector(&acl);
if (GIMME_V == G_ARRAY) {
if (num_acl_entries == 0) {
XSRETURN_EMPTY;
}
XSRETURN(num_acl_entries);
}
else {
ST(0) = sv_2mortal(newSViv(num_acl_entries));
XSRETURN(1);
}
}
else {
if (GIMME_V == G_ARRAY) {
XSRETURN_EMPTY;
}
else {
XSRETURN_UNDEF;
}
}
void
zk_set_acl(zkh, path, acl_arr, ...)
Net::ZooKeeper zkh
char *path
AV *acl_arr
PREINIT:
zk_t *zk;
const char *err;
int version = -1;
struct ACL_vector acl;
int i, ret;
PPCODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
zk->last_ret = ZOK;
zk->last_errno = 0;
if (items > 3 && !(items % 2)) {
Perl_croak(aTHX_ "invalid number of arguments");
}
err = _zk_fill_acl(aTHX_ acl_arr, &acl);
if (err) {
Perl_croak(aTHX_ err);
}
for (i = 3; i < items; i += 2) {
char *key = SvPV_nolen(ST(i));
if (strcaseEQ(key, "version")) {
version = SvIV(ST(i + 1));
if (version < 0) {
Perl_croak(aTHX_ "invalid version requirement: %d",
version);
}
}
}
errno = 0;
ret = zoo_set_acl(zk->handle, path, version, &acl);
zk->last_ret = ret;
zk->last_errno = errno;
_zk_free_acl(aTHX_ &acl);
if (ret == ZOK) {
XSRETURN_YES;
}
else {
XSRETURN_NO;
}
void
zk_stat(zkh)
Net::ZooKeeper zkh
PREINIT:
zk_t *zk;
zk_handle_t *handle;
HV *stash, *stat_hash, *attr_hash;
SV *attr;
PPCODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
zk->last_ret = ZOK;
zk->last_errno = 0;
Newx(handle, 1, zk_handle_t);
handle->signature = STAT_PACKAGE_SIGNATURE;
Newxz(handle->handle.stat, 1, zk_stat_t);
/* As in zk_new(), we use two levels of magic here. */
stash = gv_stashpv(STAT_PACKAGE_NAME, GV_ADDWARN);
attr_hash = newHV();
sv_magic((SV*) attr_hash, Nullsv, PERL_MAGIC_ext,
(const char*) handle, 0);
attr = sv_bless(newRV_noinc((SV*) attr_hash), stash);
stat_hash = newHV();
sv_magic((SV*) stat_hash, attr, PERL_MAGIC_tied, Nullch, 0);
SvREFCNT_dec(attr);
ST(0) = sv_bless(sv_2mortal(newRV_noinc((SV*) stat_hash)), stash);
XSRETURN(1);
void
zk_watch(zkh, ...)
Net::ZooKeeper zkh
PREINIT:
zk_t *zk;
unsigned int timeout;
zk_watch_t *watch;
zk_handle_t *handle;
HV *stash, *watch_hash, *attr_hash;
SV *attr;
int i;
PPCODE:
zk = _zk_get_handle_outer(aTHX_ zkh);
if (!zk) {
Perl_croak(aTHX_ "invalid handle");
}
zk->last_ret = ZOK;
zk->last_errno = 0;
if (items > 1 && !(items % 2)) {
Perl_croak(aTHX_ "invalid number of arguments");
}
timeout = zk->watch_timeout;
for (i = 1; i < items; i += 2) {
char *key = SvPV_nolen(ST(i));
if (strcaseEQ(key, "timeout")) {
timeout = SvUV(ST(i + 1));
}
}
watch = _zk_acquire_watch(aTHX);
if (!watch) {
/* errno will be set */
zk->last_ret = ZSYSTEMERROR;
zk->last_errno = errno;
XSRETURN_UNDEF;
}
Newx(handle, 1, zk_handle_t);
handle->signature = WATCH_PACKAGE_SIGNATURE;
handle->handle.watch = watch;
/* As in zk_new(), we use two levels of magic here. */
stash = gv_stashpv(WATCH_PACKAGE_NAME, GV_ADDWARN);
attr_hash = newHV();
watch->timeout = timeout;
sv_magic((SV*) attr_hash, Nullsv, PERL_MAGIC_ext,
(const char*) handle, 0);
attr = sv_bless(newRV_noinc((SV*) attr_hash), stash);
watch_hash = newHV();
sv_magic((SV*) watch_hash, attr, PERL_MAGIC_tied, Nullch, 0);
SvREFCNT_dec(attr);
ST(0) = sv_bless(sv_2mortal(newRV_noinc((SV*) watch_hash)), stash);
XSRETURN(1);
MODULE = Net::ZooKeeper PACKAGE = Net::ZooKeeper::Stat PREFIX = zks_
void
zks_DESTROY(zksh)
Net::ZooKeeper::Stat zksh
PREINIT:
zk_handle_t *handle;
HV *attr_hash;
int ret = ZBADARGUMENTS;
PPCODE:
handle = _zk_check_handle_outer(aTHX_ zksh, &attr_hash,
STAT_PACKAGE_NAME,
STAT_PACKAGE_SIGNATURE);
if (!handle) {
handle = _zk_check_handle_inner(aTHX_ zksh,
STAT_PACKAGE_SIGNATURE);
if (handle) {
attr_hash = zksh;
zksh = NULL;
}
}
if (handle) {
ret = ZOK;
Safefree(handle->handle.stat);
Safefree(handle);
sv_unmagic((SV*) attr_hash, PERL_MAGIC_ext);
}
if (zksh && attr_hash) {
sv_unmagic((SV*) zksh, PERL_MAGIC_tied);
}
if (GIMME_V == G_VOID) {
XSRETURN_EMPTY;
}
else if (ret == ZOK) {
XSRETURN_YES;
}
else {
XSRETURN_NO;
}
void
zks_CLONE(package)
char *package
PPCODE:
XSRETURN_EMPTY;
void
zks_CLONE_SKIP(package)
char *package
PPCODE:
XSRETURN_YES;
void
zks_TIEHASH(package, ...)
char *package
PPCODE:
Perl_croak(aTHX_ "tying hashes of class "
STAT_PACKAGE_NAME " not supported");
void
zks_UNTIE(attr_hash, ref_count)
Net::ZooKeeper::Stat attr_hash
IV ref_count
PPCODE:
Perl_croak(aTHX_ "untying hashes of class "
STAT_PACKAGE_NAME " not supported");
void
zks_FIRSTKEY(attr_hash)
Net::ZooKeeper::Stat attr_hash
PREINIT:
zk_stat_t *stat;
PPCODE:
stat = _zks_get_handle_inner(aTHX_ attr_hash);
if (!stat) {
Perl_croak(aTHX_ "invalid handle");
}
ST(0) = sv_2mortal(newSVpvn(zk_stat_keys[0].name,
zk_stat_keys[0].name_len));
XSRETURN(1);
void
zks_NEXTKEY(attr_hash, attr_key)
Net::ZooKeeper::Stat attr_hash
SV *attr_key
PREINIT:
zk_stat_t *stat;
char *key;
int i;
PPCODE:
stat = _zks_get_handle_inner(aTHX_ attr_hash);
if (!stat) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
for (i = 0; i < NUM_STAT_KEYS; ++i) {
if (strcaseEQ(key, zk_stat_keys[i].name)) {
++i;
break;
}
}
if (i < NUM_STAT_KEYS) {
ST(0) = sv_2mortal(newSVpvn(zk_stat_keys[i].name,
zk_stat_keys[i].name_len));
XSRETURN(1);
}
else {
XSRETURN_EMPTY;
}
void
zks_SCALAR(attr_hash)
Net::ZooKeeper::Stat attr_hash
PPCODE:
XSRETURN_YES;
void
zks_FETCH(attr_hash, attr_key)
Net::ZooKeeper::Stat attr_hash
SV *attr_key
PREINIT:
zk_stat_t *stat;
char *key;
SV *val = NULL;
int i;
PPCODE:
stat = _zks_get_handle_inner(aTHX_ attr_hash);
if (!stat) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
for (i = 0; i < NUM_STAT_KEYS; ++i) {
if (strcaseEQ(key, zk_stat_keys[i].name)) {
if (zk_stat_keys[i].size * CHAR_BIT == 32) {
val = newSViv(*((int32_t*) (((char*) stat) +
zk_stat_keys[i].offset)));
}
else {
/* NOTE: %lld is inconsistent, so cast to a double */
val = newSVpvf("%.0f", (double)
*((int64_t*) (((char*) stat) +
zk_stat_keys[i].offset)));
}
break;
}
}
if (val) {
ST(0) = sv_2mortal(val);
XSRETURN(1);
}
Perl_warn(aTHX_ "invalid element: %s", key);
XSRETURN_UNDEF;
void
zks_STORE(attr_hash, attr_key, attr_val)
Net::ZooKeeper::Stat attr_hash
SV *attr_key
SV *attr_val
PREINIT:
zk_stat_t *stat;
char *key;
int i;
PPCODE:
stat = _zks_get_handle_inner(aTHX_ attr_hash);
if (!stat) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
for (i = 0; i < NUM_STAT_KEYS; ++i) {
if (strcaseEQ(key, zk_stat_keys[i].name)) {
Perl_warn(aTHX_ "read-only element: %s", key);
XSRETURN_EMPTY;
}
}
Perl_warn(aTHX_ "invalid element: %s", key);
XSRETURN_EMPTY;
void
zks_EXISTS(attr_hash, attr_key)
Net::ZooKeeper::Stat attr_hash
SV *attr_key
PREINIT:
zk_stat_t *stat;
char *key;
int i;
PPCODE:
stat = _zks_get_handle_inner(aTHX_ attr_hash);
if (!stat) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
for (i = 0; i < NUM_STAT_KEYS; ++i) {
if (strcaseEQ(key, zk_stat_keys[i].name)) {
XSRETURN_YES;
}
}
XSRETURN_NO;
void
zks_DELETE(attr_hash, attr_key)
Net::ZooKeeper::Stat attr_hash
SV *attr_key
PPCODE:
Perl_warn(aTHX_ "deleting elements from hashes of class "
STAT_PACKAGE_NAME " not supported");
XSRETURN_EMPTY;
void
zks_CLEAR(attr_hash)
Net::ZooKeeper::Stat attr_hash
PPCODE:
Perl_warn(aTHX_ "clearing hashes of class "
STAT_PACKAGE_NAME " not supported");
XSRETURN_EMPTY;
MODULE = Net::ZooKeeper PACKAGE = Net::ZooKeeper::Watch PREFIX = zkw_
void
zkw_DESTROY(zkwh)
Net::ZooKeeper::Watch zkwh
PREINIT:
zk_handle_t *handle;
HV *attr_hash;
int ret = ZBADARGUMENTS;
PPCODE:
handle = _zk_check_handle_outer(aTHX_ zkwh, &attr_hash,
WATCH_PACKAGE_NAME,
WATCH_PACKAGE_SIGNATURE);
if (!handle) {
handle = _zk_check_handle_inner(aTHX_ zkwh,
WATCH_PACKAGE_SIGNATURE);
if (handle) {
attr_hash = zkwh;
zkwh = NULL;
}
}
if (handle) {
ret = ZOK;
_zk_release_watch(aTHX_ handle->handle.watch, 0);
Safefree(handle);
sv_unmagic((SV*) attr_hash, PERL_MAGIC_ext);
}
if (zkwh && attr_hash) {
sv_unmagic((SV*) zkwh, PERL_MAGIC_tied);
}
if (GIMME_V == G_VOID) {
XSRETURN_EMPTY;
}
else if (ret == ZOK) {
XSRETURN_YES;
}
else {
XSRETURN_NO;
}
void
zkw_CLONE(package)
char *package
PPCODE:
XSRETURN_EMPTY;
void
zkw_CLONE_SKIP(package)
char *package
PPCODE:
XSRETURN_YES;
void
zkw_TIEHASH(package, ...)
char *package
PPCODE:
Perl_croak(aTHX_ "tying hashes of class "
WATCH_PACKAGE_NAME " not supported");
void
zkw_UNTIE(attr_hash, ref_count)
Net::ZooKeeper::Watch attr_hash
IV ref_count
PPCODE:
Perl_croak(aTHX_ "untying hashes of class "
WATCH_PACKAGE_NAME " not supported");
void
zkw_FIRSTKEY(attr_hash)
Net::ZooKeeper::Watch attr_hash
PREINIT:
zk_watch_t *watch;
PPCODE:
watch = _zkw_get_handle_inner(aTHX_ attr_hash);
if (!watch) {
Perl_croak(aTHX_ "invalid handle");
}
ST(0) = sv_2mortal(newSVpvn(zk_watch_keys[0].name,
zk_watch_keys[0].name_len));
XSRETURN(1);
void
zkw_NEXTKEY(attr_hash, attr_key)
Net::ZooKeeper::Watch attr_hash
SV *attr_key
PREINIT:
zk_watch_t *watch;
char *key;
int i;
PPCODE:
watch = _zkw_get_handle_inner(aTHX_ attr_hash);
if (!watch) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
for (i = 0; i < NUM_WATCH_KEYS; ++i) {
if (strcaseEQ(key, zk_watch_keys[i].name)) {
++i;
break;
}
}
if (i < NUM_WATCH_KEYS) {
ST(0) = sv_2mortal(newSVpvn(zk_watch_keys[i].name,
zk_watch_keys[i].name_len));
XSRETURN(1);
}
else {
XSRETURN_EMPTY;
}
void
zkw_SCALAR(attr_hash)
Net::ZooKeeper::Watch attr_hash
PPCODE:
XSRETURN_YES;
void
zkw_FETCH(attr_hash, attr_key)
Net::ZooKeeper::Watch attr_hash
SV *attr_key
PREINIT:
zk_watch_t *watch;
char *key;
SV *val = NULL;
PPCODE:
watch = _zkw_get_handle_inner(aTHX_ attr_hash);
if (!watch) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
if (strcaseEQ(key, "timeout")) {
val = newSVuv(watch->timeout);
}
else if (strcaseEQ(key, "event")) {
val = newSViv(watch->event_type);
}
else if (strcaseEQ(key, "state")) {
val = newSViv(watch->event_state);
}
if (val) {
ST(0) = sv_2mortal(val);
XSRETURN(1);
}
Perl_warn(aTHX_ "invalid element: %s", key);
XSRETURN_UNDEF;
void
zkw_STORE(attr_hash, attr_key, attr_val)
Net::ZooKeeper::Watch attr_hash
SV *attr_key
SV *attr_val
PREINIT:
zk_watch_t *watch;
char *key;
PPCODE:
watch = _zkw_get_handle_inner(aTHX_ attr_hash);
if (!watch) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
if (strcaseEQ(key, "timeout")) {
watch->timeout = SvUV(attr_val);
}
else {
int i;
for (i = 0; i < NUM_WATCH_KEYS; ++i) {
if (strcaseEQ(key, zk_watch_keys[i].name)) {
Perl_warn(aTHX_ "read-only element: %s", key);
XSRETURN_EMPTY;
}
}
Perl_warn(aTHX_ "invalid element: %s", key);
}
XSRETURN_EMPTY;
void
zkw_EXISTS(attr_hash, attr_key)
Net::ZooKeeper::Watch attr_hash
SV *attr_key
PREINIT:
zk_watch_t *watch;
char *key;
int i;
PPCODE:
watch = _zkw_get_handle_inner(aTHX_ attr_hash);
if (!watch) {
Perl_croak(aTHX_ "invalid handle");
}
key = SvPV_nolen(attr_key);
for (i = 0; i < NUM_WATCH_KEYS; ++i) {
if (strcaseEQ(key, zk_watch_keys[i].name)) {
XSRETURN_YES;
}
}
XSRETURN_NO;
void
zkw_DELETE(attr_hash, attr_key)
Net::ZooKeeper::Watch attr_hash
SV *attr_key
PPCODE:
Perl_warn(aTHX_ "deleting elements from hashes of class "
WATCH_PACKAGE_NAME " not supported");
XSRETURN_EMPTY;
void
zkw_CLEAR(attr_hash)
Net::ZooKeeper::Watch attr_hash
PPCODE:
Perl_warn(aTHX_ "clearing hashes of class "
WATCH_PACKAGE_NAME " not supported");
XSRETURN_EMPTY;
void
zkw_wait(zkwh, ...)
Net::ZooKeeper::Watch zkwh
PREINIT:
zk_watch_t *watch;
unsigned int timeout;
struct timeval end_timeval;
int i, done;
struct timespec wait_timespec;
PPCODE:
watch = _zkw_get_handle_outer(aTHX_ zkwh, NULL);
if (!watch) {
Perl_croak(aTHX_ "invalid handle");
}
if (items > 1 && !(items % 2)) {
Perl_croak(aTHX_ "invalid number of arguments");
}
timeout = watch->timeout;
for (i = 1; i < items; i += 2) {
char *key = SvPV_nolen(ST(i));
if (strcaseEQ(key, "timeout")) {
timeout = SvUV(ST(i + 1));
}
}
gettimeofday(&end_timeval, NULL);
end_timeval.tv_sec += timeout / 1000;
end_timeval.tv_usec += (timeout % 1000) * 1000;
if (end_timeval.tv_usec >= 1000000) {
end_timeval.tv_usec -= 1000000;
end_timeval.tv_sec++;
}
wait_timespec.tv_sec = end_timeval.tv_sec;
wait_timespec.tv_nsec = end_timeval.tv_usec * 1000;
pthread_mutex_lock(&watch->mutex);
while (!watch->done) {
struct timeval curr_timeval;
gettimeofday(&curr_timeval, NULL);
if (end_timeval.tv_sec < curr_timeval.tv_sec ||
(end_timeval.tv_sec == curr_timeval.tv_sec &&
end_timeval.tv_usec <= curr_timeval.tv_usec)) {
break;
}
pthread_cond_timedwait(&watch->cond, &watch->mutex,
&wait_timespec);
}
done = watch->done;
pthread_mutex_unlock(&watch->mutex);
if (done) {
XSRETURN_YES;
}
else {
XSRETURN_NO;
}
( run in 1.707 second using v1.01-cache-2.11-cpan-71847e10f99 )