EV-Kafka
view release on metacpan or search on metacpan
src/EV__Kafka.xs view on Meta::CPAN
sasl(EV::Kafka::Conn self, const char *mechanism, const char *username = NULL, const char *password = NULL)
CODE:
{
if (self->sasl_mechanism) { Safefree(self->sasl_mechanism); self->sasl_mechanism = NULL; }
if (self->sasl_username) { Safefree(self->sasl_username); self->sasl_username = NULL; }
if (self->sasl_password) { Safefree(self->sasl_password); self->sasl_password = NULL; }
if (SvOK(ST(1))) {
self->sasl_mechanism = savepv(mechanism);
if (username) self->sasl_username = savepv(username);
if (password) self->sasl_password = savepv(password);
}
}
void
auto_reconnect(EV::Kafka::Conn self, int enable, int delay_ms = 1000)
CODE:
{
self->auto_reconnect = enable;
self->reconnect_delay_ms = delay_ms;
}
void
metadata(EV::Kafka::Conn self, SV *topics_sv, SV *cb)
CODE:
{
if (self->state != CONN_READY)
croak("not connected");
kf_buf_t body;
kf_buf_init(&body);
/* Metadata v1-v4 (non-flexible) */
int16_t ver = self->api_versions[API_METADATA];
if (ver < 0) ver = 1;
if (ver > 4) ver = 4;
if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
AV *topics = (AV*)SvRV(topics_sv);
SSize_t i, count = av_len(topics) + 1;
kf_buf_append_i32(&body, (int32_t)count);
for (i = 0; i < count; i++) {
SV **elem = av_fetch(topics, i, 0);
STRLEN tlen;
const char *tname = SvPV(*elem, tlen);
kf_buf_append_string(&body, tname, (int16_t)tlen);
}
} else {
kf_buf_append_i32(&body, -1); /* null array = all topics */
}
/* allow_auto_topic_creation (v4+) */
if (ver >= 4)
kf_buf_append_i8(&body, 1);
conn_send_request(aTHX_ self, API_METADATA, ver, &body, cb, 0, 0);
kf_buf_free(&body);
}
void
api_versions(EV::Kafka::Conn self)
PPCODE:
{
if (!self->api_versions_known)
XSRETURN_UNDEF;
HV *hv = newHV();
int i;
for (i = 0; i < API_VERSIONS_MAX_KEY; i++) {
if (self->api_versions[i] >= 0) {
char key[8];
int klen = snprintf(key, sizeof(key), "%d", i);
hv_store(hv, key, klen, newSViv(self->api_versions[i]), 0);
}
}
EXTEND(SP, 1);
mPUSHs(newRV_noinc((SV*)hv));
XSRETURN(1);
}
void
fetch(EV::Kafka::Conn self, const char *topic, int partition, SV *offset_sv, SV *arg1 = NULL, SV *arg2 = NULL)
CODE:
{
if (self->state != CONN_READY)
croak("not connected");
/* Accept either fetch(..., $cb) or fetch(..., \%opts, $cb).
* $opts may set max_bytes, max_wait_ms, min_bytes. */
SV *cb = NULL;
HV *opts = NULL;
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
opts = (HV*)SvRV(arg1);
cb = arg2;
} else {
cb = arg1;
}
int32_t max_bytes = 1048576;
int32_t max_wait_ms = 500;
int32_t min_bytes = 1;
if (opts) {
SV **v;
if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
max_bytes = (int32_t)SvIV(*v);
if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
max_wait_ms = (int32_t)SvIV(*v);
if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
min_bytes = (int32_t)SvIV(*v);
}
int64_t offset = SvIV(offset_sv);
STRLEN topic_len = strlen(topic);
int16_t ver = self->api_versions[API_FETCH];
if (ver < 0) ver = 4;
if (ver > 7) ver = 7;
kf_buf_t body;
kf_buf_init(&body);
kf_buf_append_i32(&body, -1); /* replica_id = -1 (consumer) */
src/EV__Kafka.xs view on Meta::CPAN
}
}
conn_send_request(aTHX_ self, API_TXN_OFFSET_COMMIT, ver, &body, cb, 0, 0);
kf_buf_free(&body);
}
MODULE = EV::Kafka PACKAGE = EV::Kafka
int
_murmur2(SV *data_sv)
CODE:
{
STRLEN len;
const unsigned char *data = (const unsigned char *)SvPV(data_sv, len);
uint32_t h = 0x9747b28c ^ (uint32_t)len;
const uint32_t m = 0x5bd1e995;
size_t i = 0;
size_t remaining = len;
while (remaining >= 4) {
uint32_t k;
memcpy(&k, data + i, 4); /* little-endian on x86, matches Java */
k *= m;
k ^= k >> 24;
k *= m;
h *= m;
h ^= k;
i += 4;
remaining -= 4;
}
switch (remaining) {
case 3: h ^= (uint32_t)data[i + 2] << 16; /* fallthrough */
case 2: h ^= (uint32_t)data[i + 1] << 8; /* fallthrough */
case 1: h ^= (uint32_t)data[i]; h *= m;
}
h ^= h >> 13;
h *= m;
h ^= h >> 15;
RETVAL = (int)(h & 0x7FFFFFFF);
}
OUTPUT:
RETVAL
unsigned int
_crc32c(SV *data_sv)
CODE:
{
STRLEN len;
const char *data = SvPV(data_sv, len);
RETVAL = crc32c(data, len);
}
OUTPUT:
RETVAL
void
_error_name(int code)
PPCODE:
{
const char *name = NULL;
switch (code) {
case 0: name = "NONE"; break;
case 1: name = "OFFSET_OUT_OF_RANGE"; break;
case 2: name = "CORRUPT_MESSAGE"; break;
case 3: name = "UNKNOWN_TOPIC_OR_PARTITION"; break;
case 5: name = "LEADER_NOT_AVAILABLE"; break;
case 6: name = "NOT_LEADER_OR_FOLLOWER"; break;
case 7: name = "REQUEST_TIMED_OUT"; break;
case 10: name = "MESSAGE_TOO_LARGE"; break;
case 15: name = "COORDINATOR_NOT_AVAILABLE"; break;
case 16: name = "NOT_COORDINATOR"; break;
case 17: name = "INVALID_TOPIC_EXCEPTION"; break;
case 19: name = "NOT_ENOUGH_REPLICAS"; break;
case 20: name = "NOT_ENOUGH_REPLICAS_AFTER_APPEND"; break;
case 22: name = "ILLEGAL_GENERATION"; break;
case 25: name = "UNKNOWN_MEMBER_ID"; break;
case 26: name = "INVALID_SESSION_TIMEOUT"; break;
case 27: name = "REBALANCE_IN_PROGRESS"; break;
case 35: name = "UNSUPPORTED_VERSION"; break;
case 36: name = "TOPIC_ALREADY_EXISTS"; break;
case 39: name = "REASSIGNMENT_IN_PROGRESS"; break;
case 41: name = "NOT_CONTROLLER"; break;
case 45: name = "OUT_OF_ORDER_SEQUENCE_NUMBER"; break;
case 46: name = "DUPLICATE_SEQUENCE_NUMBER"; break;
case 47: name = "INVALID_REPLICATION_FACTOR"; break;
case 58: name = "SASL_AUTHENTICATION_FAILED"; break;
case 72: name = "LISTENER_NOT_FOUND"; break;
case 79: name = "MEMBER_ID_REQUIRED"; break;
default: name = "UNKNOWN"; break;
}
EXTEND(SP, 1);
mPUSHp(name, strlen(name));
XSRETURN(1);
}
# ---- internal test helpers (not part of the public API) ----
SV*
_test_zigzag_i32(int v)
CODE:
{
int32_t v32 = (int32_t)v;
uint32_t z = (uint32_t)((v32 << 1) ^ (v32 >> 31));
RETVAL = newSVuv(z);
}
OUTPUT:
RETVAL
SV*
_test_zigzag_i64(SV *v_sv)
CODE:
{
int64_t v = (int64_t)SvIV(v_sv);
uint64_t z = (uint64_t)((v << 1) ^ (v >> 63));
RETVAL = newSVuv(z);
}
OUTPUT:
RETVAL
( run in 0.442 second using v1.01-cache-2.11-cpan-71847e10f99 )