EV-Kafka

 view release on metacpan or  search on metacpan

src/EV__Kafka.xs  view on Meta::CPAN

sasl(EV::Kafka::Conn self, const char *mechanism, const char *username = NULL, const char *password = NULL)
    CODE:
    {
        if (self->sasl_mechanism) { Safefree(self->sasl_mechanism); self->sasl_mechanism = NULL; }
        if (self->sasl_username) { Safefree(self->sasl_username); self->sasl_username = NULL; }
        if (self->sasl_password) { Safefree(self->sasl_password); self->sasl_password = NULL; }
        if (SvOK(ST(1))) {
            self->sasl_mechanism = savepv(mechanism);
            if (username) self->sasl_username = savepv(username);
            if (password) self->sasl_password = savepv(password);
        }
    }

void
auto_reconnect(EV::Kafka::Conn self, int enable, int delay_ms = 1000)
    CODE:
    {
        self->auto_reconnect = enable;
        self->reconnect_delay_ms = delay_ms;
    }

void
metadata(EV::Kafka::Conn self, SV *topics_sv, SV *cb)
    CODE:
    {
        if (self->state != CONN_READY)
            croak("not connected");

        kf_buf_t body;
        kf_buf_init(&body);

        /* Metadata v1-v4 (non-flexible) */
        int16_t ver = self->api_versions[API_METADATA];
        if (ver < 0) ver = 1;
        if (ver > 4) ver = 4;

        if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
            AV *topics = (AV*)SvRV(topics_sv);
            SSize_t i, count = av_len(topics) + 1;
            kf_buf_append_i32(&body, (int32_t)count);
            for (i = 0; i < count; i++) {
                SV **elem = av_fetch(topics, i, 0);
                STRLEN tlen;
                const char *tname = SvPV(*elem, tlen);
                kf_buf_append_string(&body, tname, (int16_t)tlen);
            }
        } else {
            kf_buf_append_i32(&body, -1); /* null array = all topics */
        }

        /* allow_auto_topic_creation (v4+) */
        if (ver >= 4)
            kf_buf_append_i8(&body, 1);

        conn_send_request(aTHX_ self, API_METADATA, ver, &body, cb, 0, 0);
        kf_buf_free(&body);
    }

void
api_versions(EV::Kafka::Conn self)
    PPCODE:
    {
        if (!self->api_versions_known)
            XSRETURN_UNDEF;

        HV *hv = newHV();
        int i;
        for (i = 0; i < API_VERSIONS_MAX_KEY; i++) {
            if (self->api_versions[i] >= 0) {
                char key[8];
                int klen = snprintf(key, sizeof(key), "%d", i);
                hv_store(hv, key, klen, newSViv(self->api_versions[i]), 0);
            }
        }
        EXTEND(SP, 1);
        mPUSHs(newRV_noinc((SV*)hv));
        XSRETURN(1);
    }

void
fetch(EV::Kafka::Conn self, const char *topic, int partition, SV *offset_sv, SV *arg1 = NULL, SV *arg2 = NULL)
    CODE:
    {
        if (self->state != CONN_READY)
            croak("not connected");

        /* Accept either fetch(..., $cb) or fetch(..., \%opts, $cb).
         * $opts may set max_bytes, max_wait_ms, min_bytes. */
        SV *cb = NULL;
        HV *opts = NULL;
        if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
            opts = (HV*)SvRV(arg1);
            cb = arg2;
        } else {
            cb = arg1;
        }

        int32_t max_bytes = 1048576;
        int32_t max_wait_ms = 500;
        int32_t min_bytes = 1;
        if (opts) {
            SV **v;
            if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
                max_bytes = (int32_t)SvIV(*v);
            if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
                max_wait_ms = (int32_t)SvIV(*v);
            if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
                min_bytes = (int32_t)SvIV(*v);
        }

        int64_t offset = SvIV(offset_sv);
        STRLEN topic_len = strlen(topic);

        int16_t ver = self->api_versions[API_FETCH];
        if (ver < 0) ver = 4;
        if (ver > 7) ver = 7;

        kf_buf_t body;
        kf_buf_init(&body);

        kf_buf_append_i32(&body, -1);          /* replica_id = -1 (consumer) */

src/EV__Kafka.xs  view on Meta::CPAN

            }
        }

        conn_send_request(aTHX_ self, API_TXN_OFFSET_COMMIT, ver, &body, cb, 0, 0);
        kf_buf_free(&body);
    }

MODULE = EV::Kafka  PACKAGE = EV::Kafka

int
_murmur2(SV *data_sv)
    CODE:
    {
        STRLEN len;
        const unsigned char *data = (const unsigned char *)SvPV(data_sv, len);
        uint32_t h = 0x9747b28c ^ (uint32_t)len;
        const uint32_t m = 0x5bd1e995;
        size_t i = 0;
        size_t remaining = len;

        while (remaining >= 4) {
            uint32_t k;
            memcpy(&k, data + i, 4); /* little-endian on x86, matches Java */
            k *= m;
            k ^= k >> 24;
            k *= m;
            h *= m;
            h ^= k;
            i += 4;
            remaining -= 4;
        }

        switch (remaining) {
            case 3: h ^= (uint32_t)data[i + 2] << 16; /* fallthrough */
            case 2: h ^= (uint32_t)data[i + 1] << 8;  /* fallthrough */
            case 1: h ^= (uint32_t)data[i]; h *= m;
        }

        h ^= h >> 13;
        h *= m;
        h ^= h >> 15;

        RETVAL = (int)(h & 0x7FFFFFFF);
    }
    OUTPUT:
        RETVAL

unsigned int
_crc32c(SV *data_sv)
    CODE:
    {
        STRLEN len;
        const char *data = SvPV(data_sv, len);
        RETVAL = crc32c(data, len);
    }
    OUTPUT:
        RETVAL

void
_error_name(int code)
    PPCODE:
    {
        const char *name = NULL;
        switch (code) {
            case  0: name = "NONE"; break;
            case  1: name = "OFFSET_OUT_OF_RANGE"; break;
            case  2: name = "CORRUPT_MESSAGE"; break;
            case  3: name = "UNKNOWN_TOPIC_OR_PARTITION"; break;
            case  5: name = "LEADER_NOT_AVAILABLE"; break;
            case  6: name = "NOT_LEADER_OR_FOLLOWER"; break;
            case  7: name = "REQUEST_TIMED_OUT"; break;
            case 10: name = "MESSAGE_TOO_LARGE"; break;
            case 15: name = "COORDINATOR_NOT_AVAILABLE"; break;
            case 16: name = "NOT_COORDINATOR"; break;
            case 17: name = "INVALID_TOPIC_EXCEPTION"; break;
            case 19: name = "NOT_ENOUGH_REPLICAS"; break;
            case 20: name = "NOT_ENOUGH_REPLICAS_AFTER_APPEND"; break;
            case 22: name = "ILLEGAL_GENERATION"; break;
            case 25: name = "UNKNOWN_MEMBER_ID"; break;
            case 26: name = "INVALID_SESSION_TIMEOUT"; break;
            case 27: name = "REBALANCE_IN_PROGRESS"; break;
            case 35: name = "UNSUPPORTED_VERSION"; break;
            case 36: name = "TOPIC_ALREADY_EXISTS"; break;
            case 39: name = "REASSIGNMENT_IN_PROGRESS"; break;
            case 41: name = "NOT_CONTROLLER"; break;
            case 45: name = "OUT_OF_ORDER_SEQUENCE_NUMBER"; break;
            case 46: name = "DUPLICATE_SEQUENCE_NUMBER"; break;
            case 47: name = "INVALID_REPLICATION_FACTOR"; break;
            case 58: name = "SASL_AUTHENTICATION_FAILED"; break;
            case 72: name = "LISTENER_NOT_FOUND"; break;
            case 79: name = "MEMBER_ID_REQUIRED"; break;
            default: name = "UNKNOWN"; break;
        }
        EXTEND(SP, 1);
        mPUSHp(name, strlen(name));
        XSRETURN(1);
    }

# ---- internal test helpers (not part of the public API) ----

SV*
_test_zigzag_i32(int v)
    CODE:
    {
        int32_t v32 = (int32_t)v;
        uint32_t z = (uint32_t)((v32 << 1) ^ (v32 >> 31));
        RETVAL = newSVuv(z);
    }
    OUTPUT:
        RETVAL

SV*
_test_zigzag_i64(SV *v_sv)
    CODE:
    {
        int64_t v = (int64_t)SvIV(v_sv);
        uint64_t z = (uint64_t)((v << 1) ^ (v >> 63));
        RETVAL = newSVuv(z);
    }
    OUTPUT:
        RETVAL



( run in 0.442 second using v1.01-cache-2.11-cpan-71847e10f99 )