Net-ZooKeeper

 view release on metacpan or  search on metacpan

ZooKeeper.xs  view on Meta::CPAN

        ZOO_PERM_READ = ZOO_PERM_READ
        ZOO_PERM_WRITE = ZOO_PERM_WRITE
        ZOO_PERM_CREATE = ZOO_PERM_CREATE
        ZOO_PERM_DELETE = ZOO_PERM_DELETE
        ZOO_PERM_ADMIN = ZOO_PERM_ADMIN
        ZOO_PERM_ALL = ZOO_PERM_ALL

        ZOO_CREATED_EVENT = ZOO_CREATED_EVENT
        ZOO_DELETED_EVENT = ZOO_DELETED_EVENT
        ZOO_CHANGED_EVENT = ZOO_CHANGED_EVENT
        ZOO_CHILD_EVENT = ZOO_CHILD_EVENT
        ZOO_SESSION_EVENT = ZOO_SESSION_EVENT
        ZOO_NOTWATCHING_EVENT = ZOO_NOTWATCHING_EVENT

        ZOO_EXPIRED_SESSION_STATE = ZOO_EXPIRED_SESSION_STATE
        ZOO_AUTH_FAILED_STATE = ZOO_AUTH_FAILED_STATE
        ZOO_CONNECTING_STATE = ZOO_CONNECTING_STATE
        ZOO_ASSOCIATING_STATE = ZOO_ASSOCIATING_STATE
        ZOO_CONNECTED_STATE = ZOO_CONNECTED_STATE

        ZOO_LOG_LEVEL_OFF = ZOO_LOG_LEVEL_OFF
        ZOO_LOG_LEVEL_ERROR = ZOO_LOG_LEVEL_ERROR
        ZOO_LOG_LEVEL_WARN = ZOO_LOG_LEVEL_WARN
        ZOO_LOG_LEVEL_INFO = ZOO_LOG_LEVEL_INFO
        ZOO_LOG_LEVEL_DEBUG = ZOO_LOG_LEVEL_DEBUG
    CODE:
         if (!ix) {
             if (!alias) {
                 alias = GvNAME(CvGV(cv));
             }

             if (strEQ(alias, "ZOK")) {
                 RETVAL = ZOK;
             }
             else if (strEQ(alias, "ZOO_LOG_LEVEL_OFF")) {
                 RETVAL = ZOO_LOG_LEVEL_OFF;
             }
             else {
                 Perl_croak(aTHX_ "unknown " PACKAGE_NAME " constant: %s",
                            alias);
             }
         }
         else {
             RETVAL = ix;
         }
    OUTPUT:
        RETVAL


AV *
zk_acl_constant(alias=Nullch)
        char *alias
    ALIAS:
        ZOO_OPEN_ACL_UNSAFE = 1
        ZOO_READ_ACL_UNSAFE = 2
        ZOO_CREATOR_ALL_ACL = 3
    PREINIT:
        struct ACL_vector acl;
        AV *acl_arr;
        int i;
    PPCODE:
        if (!ix && !alias) {
            alias = GvNAME(CvGV(cv));
        }

        if (ix == 1 || (alias != NULL && strEQ(alias, "ZOO_OPEN_ACL_UNSAFE"))) {
            acl = ZOO_OPEN_ACL_UNSAFE;
        }
        else if (ix == 2 || (alias != NULL && strEQ(alias, "ZOO_READ_ACL_UNSAFE"))) {
            acl = ZOO_READ_ACL_UNSAFE;
        }
        else if (ix == 3 || (alias != NULL && strEQ(alias, "ZOO_CREATOR_ALL_ACL"))) {
            acl = ZOO_CREATOR_ALL_ACL;
        }
        else {
             Perl_croak(aTHX_ "unknown " PACKAGE_NAME " constant: %s", alias);
        }

        acl_arr = newAV();

        av_extend(acl_arr, acl.count);

        for (i = 0; i < acl.count; ++i) {
            HV *acl_entry_hash = newHV();
            SV *val;

            _zk_fill_acl_entry_hash(aTHX_ &acl.data[i], acl_entry_hash);

            val = newRV_noinc((SV*) acl_entry_hash);

            if (!av_store(acl_arr, i, val)) {
                SvREFCNT_dec(val);
            }
        }

        ST(0) = sv_2mortal(newRV_noinc((SV*) acl_arr));

        XSRETURN(1);


void
zk_set_log_level(level)
        int level
    PPCODE:
        if (level < ZOO_LOG_LEVEL_OFF || level > ZOO_LOG_LEVEL_DEBUG) {
            Perl_croak(aTHX_ "invalid log level: %d", level);
        }

        zoo_set_debug_level(level);

        XSRETURN_EMPTY;


void
zk_set_deterministic_conn_order(flag)
        bool flag
    PPCODE:
        zoo_deterministic_conn_order(!!flag);

        XSRETURN_EMPTY;

void
zk_set_ignore_session_events(flag)
        bool flag
    PPCODE:
        zk_ignore_session_events = flag;

        XSRETURN_EMPTY;

void
zk_new(package, hosts, ...)
        char *package
        char *hosts
    PREINIT:
        int recv_timeout = DEFAULT_RECV_TIMEOUT_MSEC;
        const clientid_t *client_id = NULL;
        zk_t *zk;
        zk_handle_t *handle;
        HV *stash, *zk_hash, *attr_hash;
        SV *attr;
        int i;
    PPCODE:
        if (items > 2 && items % 2) {
            Perl_croak(aTHX_ "invalid number of arguments");
        }

        for (i = 2; i < items; i += 2) {
            char *key = SvPV_nolen(ST(i));

            if (strcaseEQ(key, "session_timeout")) {
                recv_timeout = SvIV(ST(i + 1));

                /* NOTE: would be nice if requirement in zookeeper_interest()
                 * that recv_timeout*2 be non-negative was documented
                 */
                if (recv_timeout < 0 || recv_timeout > (PERL_INT_MAX >> 1)) {
                    Perl_croak(aTHX_ "invalid session timeout: %d",
                               recv_timeout);
                }
            }
            else if (strcaseEQ(key, "session_id")) {
                STRLEN client_id_len;

                client_id = (const clientid_t*) SvPV(ST(i + 1), client_id_len);

                if (client_id_len != sizeof(clientid_t)) {
                    Perl_croak(aTHX_ "invalid session ID");
                }
            }
        }

        Newxz(zk, 1, zk_t);

        zk->handle = zookeeper_init(hosts, NULL, recv_timeout,
                                    client_id, NULL, 0);

        if (!zk->handle) {
            Safefree(zk);

            XSRETURN_UNDEF;
        }

        Newxz(zk->first_watch, 1, zk_watch_t);

        zk->data_buf_len = DEFAULT_DATA_BUF_LEN;
        zk->path_buf_len = DEFAULT_PATH_BUF_LEN;
        zk->watch_timeout = DEFAULT_WATCH_TIMEOUT;

        zk->hosts_len = strlen(hosts);
        zk->hosts = savepvn(hosts, zk->hosts_len);

        Newx(handle, 1, zk_handle_t);

        handle->signature = PACKAGE_SIGNATURE;
        handle->handle.zk = zk;

        /* We use several tricks from DBI here.  The attr_hash is our
         * empty inner hash; we attach extra magic to it in the form of
         * our zk_handle_t structure.  Then we tie attr_hash to zk_hash,
         * our outer hash.  This is what is passed around (by reference) by
         * callers.
         *
         * Most methods use _zk_get_handle_outer() which finds our inner
         * handle, then returns the zk_t structure from its extra magic
         * pointer.
         *
         * However, the tied hash methods, FETCH(), STORE(), and so forth,
         * receive an already-dereferenced inner handle hash.  This is
         * because we bless both inner and outer handles into this class,
         * so when a caller's code references a hash element in our
         * outer handle, Perl detects its tied magic, looks up the
         * tied object (our inner handle) and invokes the tied hash methods
         * in its class on it.  Since we blessed it into the same class
         * as the outer handle, these methods simply reside in our package.
         */

        stash = gv_stashpv(package, GV_ADDWARN);

        attr_hash = newHV();

        sv_magic((SV*) attr_hash, Nullsv, PERL_MAGIC_ext,
                 (const char*) handle, 0);

        attr = sv_bless(newRV_noinc((SV*) attr_hash), stash);

        zk_hash = newHV();

        sv_magic((SV*) zk_hash, attr, PERL_MAGIC_tied, Nullch, 0);
        SvREFCNT_dec(attr);

        ST(0) = sv_bless(sv_2mortal(newRV_noinc((SV*) zk_hash)), stash);

        XSRETURN(1);


void
zk_DESTROY(zkh)
        Net::ZooKeeper zkh
    PREINIT:
        zk_handle_t *handle;
        HV *attr_hash;
        int ret = ZBADARGUMENTS;
    PPCODE:
        handle = _zk_check_handle_outer(aTHX_ zkh, &attr_hash,
                                        PACKAGE_NAME, PACKAGE_SIGNATURE);

        if (!handle) {
            handle = _zk_check_handle_inner(aTHX_ zkh, PACKAGE_SIGNATURE);

            if (handle) {
                attr_hash = zkh;
                zkh = NULL;
            }
        }

        if (handle) {
            zk_t *zk = handle->handle.zk;

            ret = zookeeper_close(zk->handle);

            /* detach all now-inactive watches still tied to handles */
            _zk_release_watches(aTHX_ zk->first_watch, 1);

            Safefree(zk->first_watch);
            Safefree(zk->hosts);
            Safefree(zk);
            Safefree(handle);

            sv_unmagic((SV*) attr_hash, PERL_MAGIC_ext);
        }

        if (zkh && attr_hash) {
            sv_unmagic((SV*) zkh, PERL_MAGIC_tied);
        }

        if (GIMME_V == G_VOID) {
            XSRETURN_EMPTY;
        }
        else if (ret == ZOK) {
            XSRETURN_YES;
        }
        else {
            XSRETURN_NO;
        }


void
zk_CLONE(package)
        char *package
    PPCODE:
        XSRETURN_EMPTY;


void
zk_CLONE_SKIP(package)
        char *package
    PPCODE:
        XSRETURN_YES;


void
zk_TIEHASH(package, ...)
        char *package
    PPCODE:
        Perl_croak(aTHX_ "tying hashes of class "
                         PACKAGE_NAME " not supported");


void
zk_UNTIE(attr_hash, ref_count)
        Net::ZooKeeper attr_hash
        IV ref_count
    PPCODE:
        Perl_croak(aTHX_ "untying hashes of class "
                         PACKAGE_NAME " not supported");


void
zk_FIRSTKEY(attr_hash)
        Net::ZooKeeper attr_hash
    PREINIT:
        zk_t *zk;
    PPCODE:
        zk = _zk_get_handle_inner(aTHX_ attr_hash);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        ST(0) = sv_2mortal(newSVpvn(zk_keys[0].name, zk_keys[0].name_len));

        XSRETURN(1);


void
zk_NEXTKEY(attr_hash, attr_key)
        Net::ZooKeeper attr_hash
        SV *attr_key
    PREINIT:
        zk_t *zk;
        char *key;
        int i;
    PPCODE:
        zk = _zk_get_handle_inner(aTHX_ attr_hash);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        for (i = 0; i < NUM_KEYS; ++i) {
            if (strcaseEQ(key, zk_keys[i].name)) {
                ++i;

                break;
            }
        }

        if (i < NUM_KEYS) {
            ST(0) = sv_2mortal(newSVpvn(zk_keys[i].name, zk_keys[i].name_len));

            XSRETURN(1);
        }
        else {
            XSRETURN_EMPTY;
        }


void
zk_SCALAR(attr_hash)
        Net::ZooKeeper attr_hash
    PPCODE:
        XSRETURN_YES;


void
zk_FETCH(attr_hash, attr_key)
        Net::ZooKeeper attr_hash
        SV *attr_key
    PREINIT:
        zk_t *zk;
        char *key;
        SV *val = NULL;
    PPCODE:
        zk = _zk_get_handle_inner(aTHX_ attr_hash);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        if (strcaseEQ(key, "data_read_len")) {
            val = newSViv(zk->data_buf_len);
        }
        else if (strcaseEQ(key, "path_read_len")) {
            val = newSViv(zk->path_buf_len);
        }
        else if (strcaseEQ(key, "watch_timeout")) {
            val = newSVuv(zk->watch_timeout);
        }
        else if (strcaseEQ(key, "hosts")) {
            val = newSVpvn(zk->hosts, zk->hosts_len);
        }
        else if (strcaseEQ(key, "session_timeout")) {
            val = newSViv(zoo_recv_timeout(zk->handle));
        }
        else if (strcaseEQ(key, "session_id")) {
            const clientid_t *client_id;
            clientid_t null_client_id;

            client_id = zoo_client_id(zk->handle);

            memset(&null_client_id, 0, sizeof(clientid_t));

            if (!memcmp(client_id, &null_client_id, sizeof(clientid_t))) {
                val = newSVpv("", 0);
            }
            else {
                val = newSVpvn((const char*) client_id, sizeof(clientid_t));
            }
        }
        else if (strcaseEQ(key, "pending_watches")) {
            /* cleanup any completed watches not tied to a handle */
            val = newSVuv(_zk_release_watches(aTHX_ zk->first_watch, 0));
        }
        else if (strcaseEQ(key, "state")) {
            val = newSViv(zoo_state(zk->handle));
        }

        if (val) {
            ST(0) = sv_2mortal(val);

            XSRETURN(1);
        }

        Perl_warn(aTHX_ "invalid element: %s", key);

        XSRETURN_UNDEF;


void
zk_STORE(attr_hash, attr_key, attr_val)
        Net::ZooKeeper attr_hash
        SV *attr_key
        SV *attr_val
    PREINIT:
        zk_t *zk;
        char *key;
    PPCODE:
        zk = _zk_get_handle_inner(aTHX_ attr_hash);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        if (strcaseEQ(key, "data_read_len")) {
            int val = SvIV(attr_val);

            if (val < 0) {
                Perl_croak(aTHX_ "invalid data read length: %d", val);
            }

            zk->data_buf_len = val;
        }
        else if (strcaseEQ(key, "path_read_len")) {
            int val = SvIV(attr_val);

            if (val < 0) {
                Perl_croak(aTHX_ "invalid path read length: %d", val);
            }

            zk->path_buf_len = val;
        }
        else if (strcaseEQ(key, "watch_timeout")) {
            zk->watch_timeout = SvUV(attr_val);
        }
        else {
            int i;

            for (i = 0; i < NUM_KEYS; ++i) {
                if (strcaseEQ(key, zk_keys[i].name)) {
                    Perl_warn(aTHX_ "read-only element: %s", key);

                    XSRETURN_EMPTY;
                }
            }

            Perl_warn(aTHX_ "invalid element: %s", key);
        }

        XSRETURN_EMPTY;


void
zk_EXISTS(attr_hash, attr_key)
        Net::ZooKeeper attr_hash
        SV *attr_key
    PREINIT:
        zk_t *zk;
        char *key;
        int i;
    PPCODE:
        zk = _zk_get_handle_inner(aTHX_ attr_hash);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        for (i = 0; i < NUM_KEYS; ++i) {
            if (strcaseEQ(key, zk_keys[i].name)) {
                XSRETURN_YES;
            }
        }

        XSRETURN_NO;


void
zk_DELETE(attr_hash, attr_key)
        Net::ZooKeeper attr_hash
        SV *attr_key
    PPCODE:
        Perl_warn(aTHX_ "deleting elements from hashes of class "
                        PACKAGE_NAME " not supported");

        XSRETURN_EMPTY;


void
zk_CLEAR(attr_hash)
        Net::ZooKeeper attr_hash
    PPCODE:
        Perl_warn(aTHX_ "clearing hashes of class "
                        PACKAGE_NAME " not supported");

        XSRETURN_EMPTY;


SV *
zk_get_error(zkh)
        Net::ZooKeeper zkh
    PREINIT:
        zk_t *zk;
    CODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        RETVAL = newSViv(zk->last_ret);
        errno = zk->last_errno;
    OUTPUT:
        RETVAL


void
zk_add_auth(zkh, scheme, cert)
        Net::ZooKeeper zkh
        char *scheme
        char *cert; cert = (char *) SvPV($arg, cert_len);
    PREINIT:
        zk_t *zk;
        STRLEN cert_len;
        zk_watch_t *watch;
        int ret;
    PPCODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        zk->last_ret = ZOK;
        zk->last_errno = 0;

        if (cert_len > PERL_INT_MAX) {
            Perl_croak(aTHX_ "invalid certificate length: %u", cert_len);
        }

        watch = _zk_create_watch(aTHX);

        if (!watch) {
            /* errno will be set */
            zk->last_ret = ZSYSTEMERROR;
            zk->last_errno = errno;

            XSRETURN_NO;
        }

        errno = 0;
        ret = zoo_add_auth(zk->handle, scheme, cert, cert_len,
                           _zk_auth_completion, watch);

        zk->last_ret = ret;
        zk->last_errno = errno;

        if (ret == ZOK) {
            pthread_mutex_lock(&watch->mutex);

            while (!watch->done) {
                pthread_cond_wait(&watch->cond, &watch->mutex);
            }

            pthread_mutex_unlock(&watch->mutex);

            if (watch->done) {
                ret = watch->ret;
            }
            else {
                ret = ZINVALIDSTATE;
            }

            /* errno may be set while we waited */
            zk->last_ret = ret;
            zk->last_errno = errno;
        }

        _zk_destroy_watch(aTHX_ watch);

        if (ret == ZOK) {
            XSRETURN_YES;
        }
        else {
            XSRETURN_NO;
        }


void
zk_create(zkh, path, buf, ...)
        Net::ZooKeeper zkh
        char *path
        char *buf; buf = (char *) SvPV($arg, buf_len);
    PREINIT:
        zk_t *zk;
        STRLEN buf_len;
        int flags = 0;
        char *path_buf;
        int path_buf_len;
        AV *acl_arr = NULL;
        struct ACL_vector acl;
        int i, ret;
    PPCODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        zk->last_ret = ZOK;
        zk->last_errno = 0;

        if (items > 3 && !(items % 2)) {
            Perl_croak(aTHX_ "invalid number of arguments");
        }

        if (buf_len > PERL_INT_MAX) {
            Perl_croak(aTHX_ "invalid data length: %u", buf_len);
        }

        path_buf_len = zk->path_buf_len;

        for (i = 3; i < items; i += 2) {
            char *key = SvPV_nolen(ST(i));

            if (strcaseEQ(key, "path_read_len")) {
                path_buf_len = SvIV(ST(i + 1));

                if (path_buf_len < 2) {
                    Perl_croak(aTHX_ "invalid path read length: %d",
                               path_buf_len);
                }
            }
            else if (strcaseEQ(key, "flags")) {
                flags = SvIV(ST(i + 1));

                if (flags & ~(ZOO_SEQUENCE | ZOO_EPHEMERAL)) {
                    Perl_croak(aTHX_ "invalid create flags: %d", flags);
                }
            }
            else if (strcaseEQ(key, "acl")) {
                const char *err;

                if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVAV) {
                    Perl_croak(aTHX_ "invalid ACL array reference");
                }

                acl_arr = (AV*) SvRV(ST(i + 1));

                err = _zk_fill_acl(aTHX_ acl_arr, &acl);

                if (err) {
                    Perl_croak(aTHX_ err);
                }
            }
        }

        /* NOTE: would be nice to be able to rely on null-terminated string */
        ++path_buf_len;
        Newxz(path_buf, path_buf_len, char);

        errno = 0;
        ret = zoo_create(zk->handle, path, buf, buf_len,
                         (acl_arr ? &acl : NULL), flags,
                         path_buf, path_buf_len);

        zk->last_ret = ret;
        zk->last_errno = errno;

        if (acl_arr) {
            _zk_free_acl(aTHX_ &acl);
        }

        if (ret == ZOK) {
            ST(0) = sv_newmortal();
#ifdef SV_HAS_TRAILING_NUL
            sv_usepvn_flags(ST(0), path_buf, strlen(path_buf),
                            SV_HAS_TRAILING_NUL);
#else
            sv_usepvn(ST(0), path_buf, strlen(path_buf));
#endif
            SvCUR_set(ST(0), strlen(path_buf));

            XSRETURN(1);
        }

        Safefree(path_buf);

        XSRETURN_UNDEF;


void
zk_delete(zkh, path, ...)
        Net::ZooKeeper zkh
        char *path
    PREINIT:
        zk_t *zk;
        int version = -1;
        int i, ret;
    PPCODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        zk->last_ret = ZOK;
        zk->last_errno = 0;

        if (items > 2 && items % 2) {
            Perl_croak(aTHX_ "invalid number of arguments");
        }

        for (i = 2; i < items; i += 2) {
            char *key = SvPV_nolen(ST(i));

            if (strcaseEQ(key, "version")) {
                version = SvIV(ST(i + 1));

                if (version < 0) {
                    Perl_croak(aTHX_ "invalid version requirement: %d",
                               version);
                }
            }
        }

        errno = 0;
        ret = zoo_delete(zk->handle, path, version);

        zk->last_ret = ret;
        zk->last_errno = errno;

        if (ret == ZOK) {
            XSRETURN_YES;
        }
        else {
            XSRETURN_NO;
        }


void
zk_exists(zkh, path, ...)
        Net::ZooKeeper zkh
        char *path
    PREINIT:
        zk_t *zk;
        zk_stat_t *stat = NULL;
        zk_watch_t *old_watch = NULL;
        zk_handle_t *watch_handle = NULL;
        watcher_fn watcher = NULL;
        zk_watch_t *new_watch = NULL;
        int i, ret;
    PPCODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        zk->last_ret = ZOK;
        zk->last_errno = 0;

        if (items > 2 && items % 2) {
            Perl_croak(aTHX_ "invalid number of arguments");
        }

        for (i = 2; i < items; i += 2) {
            char *key = SvPV_nolen(ST(i));

            if (strcaseEQ(key, "stat")) {
                if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
                    !sv_derived_from(ST(i + 1), STAT_PACKAGE_NAME)) {
                    Perl_croak(aTHX_ "stat is not a hash reference of "
                                     "type " STAT_PACKAGE_NAME);
                }

                stat = _zks_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)));

                if (!stat) {
                    Perl_croak(aTHX_ "invalid stat handle");
                }
            }
            else if (strcaseEQ(key, "watch")) {
                if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
                    !sv_derived_from(ST(i + 1), WATCH_PACKAGE_NAME)) {
                    Perl_croak(aTHX_ "watch is not a hash reference of "
                                     "type " WATCH_PACKAGE_NAME);
                }

                old_watch = _zkw_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)),
                                                  &watch_handle);

                if (!old_watch) {
                    Perl_croak(aTHX_ "invalid watch handle");
                }
            }
        }

        if (watch_handle) {
            new_watch = _zk_acquire_watch(aTHX);

            if (!new_watch) {
                /* errno will be set */
                zk->last_ret = ZSYSTEMERROR;
                zk->last_errno = errno;

                XSRETURN_NO;
            }

            watcher = _zk_watcher;
        }

        errno = 0;
        ret = zoo_wexists(zk->handle, path, watcher, new_watch, stat);

        zk->last_ret = ret;
        zk->last_errno = errno;

        if (watch_handle) {
            _zk_replace_watch(aTHX_ watch_handle, zk->first_watch,
                              old_watch, new_watch);
        }

        if (ret == ZOK) {
            XSRETURN_YES;
        }
        else {
            XSRETURN_NO;
        }


void
zk_get_children(zkh, path, ...)
        Net::ZooKeeper zkh
        char *path
    PREINIT:
        zk_t *zk;
        zk_watch_t *old_watch = NULL;
        zk_handle_t *watch_handle = NULL;
        watcher_fn watcher = NULL;
        zk_watch_t *new_watch = NULL;
        struct String_vector strings;
        int i, ret;
    PPCODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        zk->last_ret = ZOK;
        zk->last_errno = 0;

        if (items > 2 && items % 2) {
            Perl_croak(aTHX_ "invalid number of arguments");
        }

        for (i = 2; i < items; i += 2) {
            char *key = SvPV_nolen(ST(i));

            if (strcaseEQ(key, "watch")) {
                if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
                    !sv_derived_from(ST(i + 1), WATCH_PACKAGE_NAME)) {
                    Perl_croak(aTHX_ "watch is not a hash reference of "
                                     "type " WATCH_PACKAGE_NAME);
                }

                old_watch = _zkw_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)),
                                                  &watch_handle);

                if (!old_watch) {
                    Perl_croak(aTHX_ "invalid watch handle");
                }
            }
        }

        if (watch_handle) {
            new_watch = _zk_acquire_watch(aTHX);

            if (!new_watch) {
                /* errno will be set */
                zk->last_ret = ZSYSTEMERROR;
                zk->last_errno = errno;

                if (GIMME_V == G_ARRAY) {
                    XSRETURN_EMPTY;
                }
                else {
                    XSRETURN_UNDEF;
                }
            }

            watcher = _zk_watcher;
        }

        Zero(&strings, 1, struct String_vector);

        errno = 0;
        ret = zoo_wget_children(zk->handle, path, watcher, new_watch,
                                &strings);

        zk->last_ret = ret;
        zk->last_errno = errno;

        if (watch_handle) {
            _zk_replace_watch(aTHX_ watch_handle, zk->first_watch,
                              old_watch, new_watch);
        }

        if (ret == ZOK) {
            int num_children;

            num_children =
                (strings.count > PERL_INT_MAX) ? PERL_INT_MAX : strings.count;

            if (GIMME_V == G_ARRAY && num_children > 0) {
                EXTEND(SP, num_children);

                for (i = 0; i < num_children; ++i) {
                    ST(i) = sv_2mortal(newSVpv(strings.data[i], 0));
                }
            }

            /* NOTE: would be nice if this were documented as required */
            deallocate_String_vector(&strings);

            if (GIMME_V == G_ARRAY) {
                if (num_children == 0) {
                    XSRETURN_EMPTY;
                }

                XSRETURN(num_children);
            }
            else {
                ST(0) = sv_2mortal(newSViv(num_children));

                XSRETURN(1);
            }
        }
        else {
            if (GIMME_V == G_ARRAY) {
                XSRETURN_EMPTY;
            }
            else {
                XSRETURN_UNDEF;
            }
        }


void
zk_get(zkh, path, ...)
        Net::ZooKeeper zkh
        char *path
    PREINIT:
        zk_t *zk;
        int buf_len;
        zk_stat_t *stat = NULL;
        zk_watch_t *old_watch = NULL;
        zk_handle_t *watch_handle = NULL;
        char *buf;
        watcher_fn watcher = NULL;
        zk_watch_t *new_watch = NULL;
        int i, ret;
    PPCODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        zk->last_ret = ZOK;
        zk->last_errno = 0;

        if (items > 2 && items % 2) {
            Perl_croak(aTHX_ "invalid number of arguments");
        }

        buf_len = zk->data_buf_len;

        for (i = 2; i < items; i += 2) {
            char *key = SvPV_nolen(ST(i));

            if (strcaseEQ(key, "data_read_len")) {
                buf_len = SvIV(ST(i + 1));

                if (buf_len < 0) {
                    Perl_croak(aTHX_ "invalid data read length: %d",
                               buf_len);
                }
            }
            else if (strcaseEQ(key, "stat")) {
                if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
                    !sv_derived_from(ST(i + 1), STAT_PACKAGE_NAME)) {
                    Perl_croak(aTHX_ "stat is not a hash reference of "
                                     "type " STAT_PACKAGE_NAME);
                }

                stat = _zks_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)));

                if (!stat) {
                    Perl_croak(aTHX_ "invalid stat handle");
                }
            }
            else if (strcaseEQ(key, "watch")) {
                if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
                    !sv_derived_from(ST(i + 1), WATCH_PACKAGE_NAME)) {
                    Perl_croak(aTHX_ "watch is not a hash reference of "
                                     "type " WATCH_PACKAGE_NAME);
                }

                old_watch = _zkw_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)),
                                                  &watch_handle);

                if (!old_watch) {
                    Perl_croak(aTHX_ "invalid watch handle");
                }
            }
        }

        if (watch_handle) {
            new_watch = _zk_acquire_watch(aTHX);

            if (!new_watch) {
                /* errno will be set */
                zk->last_ret = ZSYSTEMERROR;
                zk->last_errno = errno;

                XSRETURN_UNDEF;
            }

            watcher = _zk_watcher;
        }

        Newx(buf, buf_len + 1, char);

        errno = 0;
        ret = zoo_wget(zk->handle, path, watcher, new_watch,
                       buf, &buf_len, stat);

        zk->last_ret = ret;
        zk->last_errno = errno;

        if (watch_handle) {
            _zk_replace_watch(aTHX_ watch_handle, zk->first_watch,
                              old_watch, new_watch);
        }

        if (ret == ZOK) {
            ST(0) = sv_newmortal();
#ifdef SV_HAS_TRAILING_NUL
            buf[buf_len] = '\0';
            sv_usepvn_flags(ST(0), buf, buf_len, SV_HAS_TRAILING_NUL);
#else
            sv_usepvn(ST(0), buf, buf_len);
#endif

            XSRETURN(1);
        }
        else {
            Safefree(buf);

            XSRETURN_UNDEF;
        }


void
zk_set(zkh, path, buf, ...)
        Net::ZooKeeper zkh
        char *path
        char *buf; buf = (char *) SvPV($arg, buf_len);
    PREINIT:
        zk_t *zk;
        int version = -1;
        zk_stat_t *stat = NULL;
        STRLEN buf_len;
        int i, ret;
    PPCODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        zk->last_ret = ZOK;
        zk->last_errno = 0;

        if (items > 3 && !(items % 2)) {
            Perl_croak(aTHX_ "invalid number of arguments");
        }

        if (buf_len > PERL_INT_MAX) {
            Perl_croak(aTHX_ "invalid data length: %u", buf_len);
        }

        for (i = 3; i < items; i += 2) {
            char *key = SvPV_nolen(ST(i));

            if (strcaseEQ(key, "version")) {
                version = SvIV(ST(i + 1));

                if (version < 0) {
                    Perl_croak(aTHX_ "invalid version requirement: %d",
                               version);
                }
            }
            else if (strcaseEQ(key, "stat")) {
                if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
                    !sv_derived_from(ST(i + 1), STAT_PACKAGE_NAME)) {
                    Perl_croak(aTHX_ "stat is not a hash reference of "
                                     "type " STAT_PACKAGE_NAME);
                }

                stat = _zks_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)));

                if (!stat) {
                    Perl_croak(aTHX_ "invalid stat handle");
                }
            }
        }

        errno = 0;
        ret = zoo_set2(zk->handle, path, buf, buf_len, version, stat);

        zk->last_ret = ret;
        zk->last_errno = errno;

        if (ret == ZOK) {
            XSRETURN_YES;
        }
        else {
            XSRETURN_NO;
        }


void
zk_get_acl(zkh, path, ...)
        Net::ZooKeeper zkh
        char *path
    PREINIT:
        zk_t *zk;
        zk_stat_t *stat = NULL;
        struct ACL_vector acl;
        int i, ret;
    PPCODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        zk->last_ret = ZOK;
        zk->last_errno = 0;

        if (items > 2 && items % 2) {
            Perl_croak(aTHX_ "invalid number of arguments");
        }

        for (i = 2; i < items; i += 2) {
            char *key = SvPV_nolen(ST(i));

            if (strcaseEQ(key, "stat")) {
                if (!SvROK(ST(i + 1)) || SvTYPE(SvRV(ST(i + 1))) != SVt_PVHV ||
                    !sv_derived_from(ST(i + 1), STAT_PACKAGE_NAME)) {
                    Perl_croak(aTHX_ "stat is not a hash reference of "
                                     "type " STAT_PACKAGE_NAME);
                }

                stat = _zks_get_handle_outer(aTHX_ (HV*) SvRV(ST(i + 1)));

                if (!stat) {
                    Perl_croak(aTHX_ "invalid stat handle");
                }
            }
        }

        errno = 0;
        ret = zoo_get_acl(zk->handle, path, &acl, stat);

        zk->last_ret = ret;
        zk->last_errno = errno;

        if (ret == ZOK) {
            int num_acl_entries;

            num_acl_entries =
                (acl.count > PERL_INT_MAX) ? PERL_INT_MAX : acl.count;

            if (GIMME_V == G_ARRAY && num_acl_entries > 0) {
                EXTEND(SP, num_acl_entries);

                for (i = 0; i < num_acl_entries; ++i) {
                    HV *acl_entry_hash = newHV();

                    _zk_fill_acl_entry_hash(aTHX_ &acl.data[i],
                                            acl_entry_hash);

                    ST(i) = sv_2mortal(newRV_noinc((SV*) acl_entry_hash));
                }
            }

            /* NOTE: would be nice if this were documented as required */
            deallocate_ACL_vector(&acl);

            if (GIMME_V == G_ARRAY) {
                if (num_acl_entries == 0) {
                    XSRETURN_EMPTY;
                }

                XSRETURN(num_acl_entries);
            }
            else {
                ST(0) = sv_2mortal(newSViv(num_acl_entries));

                XSRETURN(1);
            }
        }
        else {
            if (GIMME_V == G_ARRAY) {
                XSRETURN_EMPTY;
            }
            else {
                XSRETURN_UNDEF;
            }
        }


void
zk_set_acl(zkh, path, acl_arr, ...)
        Net::ZooKeeper zkh
        char *path
        AV *acl_arr
    PREINIT:
        zk_t *zk;
        const char *err;
        int version = -1;
        struct ACL_vector acl;
        int i, ret;
    PPCODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        zk->last_ret = ZOK;
        zk->last_errno = 0;

        if (items > 3 && !(items % 2)) {
            Perl_croak(aTHX_ "invalid number of arguments");
        }

        err = _zk_fill_acl(aTHX_ acl_arr, &acl);

        if (err) {
            Perl_croak(aTHX_ err);
        }

        for (i = 3; i < items; i += 2) {
            char *key = SvPV_nolen(ST(i));

            if (strcaseEQ(key, "version")) {
                version = SvIV(ST(i + 1));

                if (version < 0) {
                    Perl_croak(aTHX_ "invalid version requirement: %d",
                               version);
                }
            }
        }

        errno = 0;
        ret = zoo_set_acl(zk->handle, path, version, &acl);

        zk->last_ret = ret;
        zk->last_errno = errno;

        _zk_free_acl(aTHX_ &acl);

        if (ret == ZOK) {
            XSRETURN_YES;
        }
        else {
            XSRETURN_NO;
        }


void
zk_stat(zkh)
        Net::ZooKeeper zkh
    PREINIT:
        zk_t *zk;
        zk_handle_t *handle;
        HV *stash, *stat_hash, *attr_hash;
        SV *attr;
    PPCODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        zk->last_ret = ZOK;
        zk->last_errno = 0;

        Newx(handle, 1, zk_handle_t);

        handle->signature = STAT_PACKAGE_SIGNATURE;

        Newxz(handle->handle.stat, 1, zk_stat_t);

        /* As in zk_new(), we use two levels of magic here. */

        stash = gv_stashpv(STAT_PACKAGE_NAME, GV_ADDWARN);

        attr_hash = newHV();

        sv_magic((SV*) attr_hash, Nullsv, PERL_MAGIC_ext,
                 (const char*) handle, 0);

        attr = sv_bless(newRV_noinc((SV*) attr_hash), stash);

        stat_hash = newHV();

        sv_magic((SV*) stat_hash, attr, PERL_MAGIC_tied, Nullch, 0);
        SvREFCNT_dec(attr);

        ST(0) = sv_bless(sv_2mortal(newRV_noinc((SV*) stat_hash)), stash);

        XSRETURN(1);


void
zk_watch(zkh, ...)
        Net::ZooKeeper zkh
    PREINIT:
        zk_t *zk;
        unsigned int timeout;
        zk_watch_t *watch;
        zk_handle_t *handle;
        HV *stash, *watch_hash, *attr_hash;
        SV *attr;
        int i;
    PPCODE:
        zk = _zk_get_handle_outer(aTHX_ zkh);

        if (!zk) {
            Perl_croak(aTHX_ "invalid handle");
        }

        zk->last_ret = ZOK;
        zk->last_errno = 0;

        if (items > 1 && !(items % 2)) {
            Perl_croak(aTHX_ "invalid number of arguments");
        }

        timeout = zk->watch_timeout;

        for (i = 1; i < items; i += 2) {
            char *key = SvPV_nolen(ST(i));

            if (strcaseEQ(key, "timeout")) {
                timeout = SvUV(ST(i + 1));
            }
        }

        watch = _zk_acquire_watch(aTHX);

        if (!watch) {
            /* errno will be set */
            zk->last_ret = ZSYSTEMERROR;
            zk->last_errno = errno;

            XSRETURN_UNDEF;
        }

        Newx(handle, 1, zk_handle_t);

        handle->signature = WATCH_PACKAGE_SIGNATURE;
        handle->handle.watch = watch;

        /* As in zk_new(), we use two levels of magic here. */

        stash = gv_stashpv(WATCH_PACKAGE_NAME, GV_ADDWARN);

        attr_hash = newHV();

        watch->timeout = timeout;

        sv_magic((SV*) attr_hash, Nullsv, PERL_MAGIC_ext,
                 (const char*) handle, 0);

        attr = sv_bless(newRV_noinc((SV*) attr_hash), stash);

        watch_hash = newHV();

        sv_magic((SV*) watch_hash, attr, PERL_MAGIC_tied, Nullch, 0);
        SvREFCNT_dec(attr);

        ST(0) = sv_bless(sv_2mortal(newRV_noinc((SV*) watch_hash)), stash);

        XSRETURN(1);


MODULE = Net::ZooKeeper  PACKAGE = Net::ZooKeeper::Stat  PREFIX = zks_

void
zks_DESTROY(zksh)
        Net::ZooKeeper::Stat zksh
    PREINIT:
        zk_handle_t *handle;
        HV *attr_hash;
        int ret = ZBADARGUMENTS;
    PPCODE:
        handle = _zk_check_handle_outer(aTHX_ zksh, &attr_hash,
                                        STAT_PACKAGE_NAME,
                                        STAT_PACKAGE_SIGNATURE);

        if (!handle) {
            handle = _zk_check_handle_inner(aTHX_ zksh,
                                            STAT_PACKAGE_SIGNATURE);

            if (handle) {
                attr_hash = zksh;
                zksh = NULL;
            }
        }

        if (handle) {
            ret = ZOK;

            Safefree(handle->handle.stat);
            Safefree(handle);

            sv_unmagic((SV*) attr_hash, PERL_MAGIC_ext);
        }

        if (zksh && attr_hash) {
            sv_unmagic((SV*) zksh, PERL_MAGIC_tied);
        }

        if (GIMME_V == G_VOID) {
            XSRETURN_EMPTY;
        }
        else if (ret == ZOK) {
            XSRETURN_YES;
        }
        else {
            XSRETURN_NO;
        }


void
zks_CLONE(package)
        char *package
    PPCODE:
        XSRETURN_EMPTY;


void
zks_CLONE_SKIP(package)
        char *package
    PPCODE:
        XSRETURN_YES;


void
zks_TIEHASH(package, ...)
        char *package
    PPCODE:
        Perl_croak(aTHX_ "tying hashes of class "
                         STAT_PACKAGE_NAME " not supported");


void
zks_UNTIE(attr_hash, ref_count)
        Net::ZooKeeper::Stat attr_hash
        IV ref_count
    PPCODE:
        Perl_croak(aTHX_ "untying hashes of class "
                         STAT_PACKAGE_NAME " not supported");


void
zks_FIRSTKEY(attr_hash)
        Net::ZooKeeper::Stat attr_hash
    PREINIT:
        zk_stat_t *stat;
    PPCODE:
        stat = _zks_get_handle_inner(aTHX_ attr_hash);

        if (!stat) {
            Perl_croak(aTHX_ "invalid handle");
        }

        ST(0) = sv_2mortal(newSVpvn(zk_stat_keys[0].name,
                                    zk_stat_keys[0].name_len));

        XSRETURN(1);


void
zks_NEXTKEY(attr_hash, attr_key)
        Net::ZooKeeper::Stat attr_hash
        SV *attr_key
    PREINIT:
        zk_stat_t *stat;
        char *key;
        int i;
    PPCODE:
        stat = _zks_get_handle_inner(aTHX_ attr_hash);

        if (!stat) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        for (i = 0; i < NUM_STAT_KEYS; ++i) {
            if (strcaseEQ(key, zk_stat_keys[i].name)) {
                ++i;

                break;
            }
        }

        if (i < NUM_STAT_KEYS) {
            ST(0) = sv_2mortal(newSVpvn(zk_stat_keys[i].name,
                                        zk_stat_keys[i].name_len));

            XSRETURN(1);
        }
        else {
            XSRETURN_EMPTY;
        }


void
zks_SCALAR(attr_hash)
        Net::ZooKeeper::Stat attr_hash
    PPCODE:
        XSRETURN_YES;


void
zks_FETCH(attr_hash, attr_key)
        Net::ZooKeeper::Stat attr_hash
        SV *attr_key
    PREINIT:
        zk_stat_t *stat;
        char *key;
        SV *val = NULL;
        int i;
    PPCODE:
        stat = _zks_get_handle_inner(aTHX_ attr_hash);

        if (!stat) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        for (i = 0; i < NUM_STAT_KEYS; ++i) {
            if (strcaseEQ(key, zk_stat_keys[i].name)) {
                if (zk_stat_keys[i].size * CHAR_BIT == 32) {
                    val = newSViv(*((int32_t*) (((char*) stat) +
                                                zk_stat_keys[i].offset)));
                }
                else {
                    /* NOTE: %lld is inconsistent, so cast to a double */
                    val = newSVpvf("%.0f", (double)
                                   *((int64_t*) (((char*) stat) +
                                                 zk_stat_keys[i].offset)));
                }

                break;
            }
        }

        if (val) {
            ST(0) = sv_2mortal(val);

            XSRETURN(1);
        }

        Perl_warn(aTHX_ "invalid element: %s", key);

        XSRETURN_UNDEF;


void
zks_STORE(attr_hash, attr_key, attr_val)
        Net::ZooKeeper::Stat attr_hash
        SV *attr_key
        SV *attr_val
    PREINIT:
        zk_stat_t *stat;
        char *key;
        int i;
    PPCODE:
        stat = _zks_get_handle_inner(aTHX_ attr_hash);

        if (!stat) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        for (i = 0; i < NUM_STAT_KEYS; ++i) {
            if (strcaseEQ(key, zk_stat_keys[i].name)) {
                Perl_warn(aTHX_ "read-only element: %s", key);

                XSRETURN_EMPTY;
            }
        }

        Perl_warn(aTHX_ "invalid element: %s", key);

        XSRETURN_EMPTY;


void
zks_EXISTS(attr_hash, attr_key)
        Net::ZooKeeper::Stat attr_hash
        SV *attr_key
    PREINIT:
        zk_stat_t *stat;
        char *key;
        int i;
    PPCODE:
        stat = _zks_get_handle_inner(aTHX_ attr_hash);

        if (!stat) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        for (i = 0; i < NUM_STAT_KEYS; ++i) {
            if (strcaseEQ(key, zk_stat_keys[i].name)) {
                XSRETURN_YES;
            }
        }

        XSRETURN_NO;


void
zks_DELETE(attr_hash, attr_key)
        Net::ZooKeeper::Stat attr_hash
        SV *attr_key
    PPCODE:
        Perl_warn(aTHX_ "deleting elements from hashes of class "
                        STAT_PACKAGE_NAME " not supported");

        XSRETURN_EMPTY;


void
zks_CLEAR(attr_hash)
        Net::ZooKeeper::Stat attr_hash
    PPCODE:
        Perl_warn(aTHX_ "clearing hashes of class "
                        STAT_PACKAGE_NAME " not supported");

        XSRETURN_EMPTY;


MODULE = Net::ZooKeeper  PACKAGE = Net::ZooKeeper::Watch  PREFIX = zkw_

void
zkw_DESTROY(zkwh)
        Net::ZooKeeper::Watch zkwh
    PREINIT:
        zk_handle_t *handle;
        HV *attr_hash;
        int ret = ZBADARGUMENTS;
    PPCODE:
        handle = _zk_check_handle_outer(aTHX_ zkwh, &attr_hash,
                                        WATCH_PACKAGE_NAME,
                                        WATCH_PACKAGE_SIGNATURE);

        if (!handle) {
            handle = _zk_check_handle_inner(aTHX_ zkwh,
                                            WATCH_PACKAGE_SIGNATURE);

            if (handle) {
                attr_hash = zkwh;
                zkwh = NULL;
            }
        }

        if (handle) {
            ret = ZOK;

            _zk_release_watch(aTHX_ handle->handle.watch, 0);
            Safefree(handle);

            sv_unmagic((SV*) attr_hash, PERL_MAGIC_ext);
        }

        if (zkwh && attr_hash) {
            sv_unmagic((SV*) zkwh, PERL_MAGIC_tied);
        }

        if (GIMME_V == G_VOID) {
            XSRETURN_EMPTY;
        }
        else if (ret == ZOK) {
            XSRETURN_YES;
        }
        else {
            XSRETURN_NO;
        }


void
zkw_CLONE(package)
        char *package
    PPCODE:
        XSRETURN_EMPTY;


void
zkw_CLONE_SKIP(package)
        char *package
    PPCODE:
        XSRETURN_YES;


void
zkw_TIEHASH(package, ...)
        char *package
    PPCODE:
        Perl_croak(aTHX_ "tying hashes of class "
                         WATCH_PACKAGE_NAME " not supported");


void
zkw_UNTIE(attr_hash, ref_count)
        Net::ZooKeeper::Watch attr_hash
        IV ref_count
    PPCODE:
        Perl_croak(aTHX_ "untying hashes of class "
                         WATCH_PACKAGE_NAME " not supported");


void
zkw_FIRSTKEY(attr_hash)
        Net::ZooKeeper::Watch attr_hash
    PREINIT:
        zk_watch_t *watch;
    PPCODE:
        watch = _zkw_get_handle_inner(aTHX_ attr_hash);

        if (!watch) {
            Perl_croak(aTHX_ "invalid handle");
        }

        ST(0) = sv_2mortal(newSVpvn(zk_watch_keys[0].name,
                                    zk_watch_keys[0].name_len));

        XSRETURN(1);


void
zkw_NEXTKEY(attr_hash, attr_key)
        Net::ZooKeeper::Watch attr_hash
        SV *attr_key
    PREINIT:
        zk_watch_t *watch;
        char *key;
        int i;
    PPCODE:
        watch = _zkw_get_handle_inner(aTHX_ attr_hash);

        if (!watch) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        for (i = 0; i < NUM_WATCH_KEYS; ++i) {
            if (strcaseEQ(key, zk_watch_keys[i].name)) {
                ++i;

                break;
            }
        }

        if (i < NUM_WATCH_KEYS) {
            ST(0) = sv_2mortal(newSVpvn(zk_watch_keys[i].name,
                                        zk_watch_keys[i].name_len));

            XSRETURN(1);
        }
        else {
            XSRETURN_EMPTY;
        }


void
zkw_SCALAR(attr_hash)
        Net::ZooKeeper::Watch attr_hash
    PPCODE:
        XSRETURN_YES;


void
zkw_FETCH(attr_hash, attr_key)
        Net::ZooKeeper::Watch attr_hash
        SV *attr_key
    PREINIT:
        zk_watch_t *watch;
        char *key;
        SV *val = NULL;
    PPCODE:
        watch = _zkw_get_handle_inner(aTHX_ attr_hash);

        if (!watch) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        if (strcaseEQ(key, "timeout")) {
            val = newSVuv(watch->timeout);
        }
        else if (strcaseEQ(key, "event")) {
            val = newSViv(watch->event_type);
        }
        else if (strcaseEQ(key, "state")) {
            val = newSViv(watch->event_state);
        }

        if (val) {
            ST(0) = sv_2mortal(val);

            XSRETURN(1);
        }

        Perl_warn(aTHX_ "invalid element: %s", key);

        XSRETURN_UNDEF;


void
zkw_STORE(attr_hash, attr_key, attr_val)
        Net::ZooKeeper::Watch attr_hash
        SV *attr_key
        SV *attr_val
    PREINIT:
        zk_watch_t *watch;
        char *key;
    PPCODE:
        watch = _zkw_get_handle_inner(aTHX_ attr_hash);

        if (!watch) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        if (strcaseEQ(key, "timeout")) {
            watch->timeout = SvUV(attr_val);
        }
        else {
            int i;

            for (i = 0; i < NUM_WATCH_KEYS; ++i) {
                if (strcaseEQ(key, zk_watch_keys[i].name)) {
                    Perl_warn(aTHX_ "read-only element: %s", key);

                    XSRETURN_EMPTY;
                }
            }

            Perl_warn(aTHX_ "invalid element: %s", key);
        }

        XSRETURN_EMPTY;


void
zkw_EXISTS(attr_hash, attr_key)
        Net::ZooKeeper::Watch attr_hash
        SV *attr_key
    PREINIT:
        zk_watch_t *watch;
        char *key;
        int i;
    PPCODE:
        watch = _zkw_get_handle_inner(aTHX_ attr_hash);

        if (!watch) {
            Perl_croak(aTHX_ "invalid handle");
        }

        key = SvPV_nolen(attr_key);

        for (i = 0; i < NUM_WATCH_KEYS; ++i) {
            if (strcaseEQ(key, zk_watch_keys[i].name)) {
                XSRETURN_YES;
            }
        }

        XSRETURN_NO;


void
zkw_DELETE(attr_hash, attr_key)
        Net::ZooKeeper::Watch attr_hash
        SV *attr_key
    PPCODE:
        Perl_warn(aTHX_ "deleting elements from hashes of class "
                        WATCH_PACKAGE_NAME " not supported");

        XSRETURN_EMPTY;


void
zkw_CLEAR(attr_hash)
        Net::ZooKeeper::Watch attr_hash
    PPCODE:
        Perl_warn(aTHX_ "clearing hashes of class "
                        WATCH_PACKAGE_NAME " not supported");

        XSRETURN_EMPTY;


void
zkw_wait(zkwh, ...)
        Net::ZooKeeper::Watch zkwh
    PREINIT:
        zk_watch_t *watch;
        unsigned int timeout;
        struct timeval end_timeval;
        int i, done;
        struct timespec wait_timespec;
    PPCODE:
        watch = _zkw_get_handle_outer(aTHX_ zkwh, NULL);

        if (!watch) {
            Perl_croak(aTHX_ "invalid handle");
        }

        if (items > 1 && !(items % 2)) {
            Perl_croak(aTHX_ "invalid number of arguments");
        }

        timeout = watch->timeout;

        for (i = 1; i < items; i += 2) {
            char *key = SvPV_nolen(ST(i));

            if (strcaseEQ(key, "timeout")) {
                timeout = SvUV(ST(i + 1));
            }
        }

        gettimeofday(&end_timeval, NULL);

        end_timeval.tv_sec += timeout / 1000;
        end_timeval.tv_usec += (timeout % 1000) * 1000;
        if (end_timeval.tv_usec >= 1000000) {
            end_timeval.tv_usec -= 1000000;
            end_timeval.tv_sec++;
        }

        wait_timespec.tv_sec = end_timeval.tv_sec;
        wait_timespec.tv_nsec = end_timeval.tv_usec * 1000;

        pthread_mutex_lock(&watch->mutex);

        while (!watch->done) {
            struct timeval curr_timeval;

            gettimeofday(&curr_timeval, NULL);

            if (end_timeval.tv_sec < curr_timeval.tv_sec ||
                (end_timeval.tv_sec == curr_timeval.tv_sec &&
                 end_timeval.tv_usec <= curr_timeval.tv_usec)) {
                break;
            }

            pthread_cond_timedwait(&watch->cond, &watch->mutex,
                                   &wait_timespec);
        }

        done = watch->done;

        pthread_mutex_unlock(&watch->mutex);

        if (done) {
            XSRETURN_YES;
        }
        else {
            XSRETURN_NO;
        }



( run in 1.707 second using v1.01-cache-2.11-cpan-71847e10f99 )