Net-Kafka
view release on metacpan or search on metacpan
void
krd_new(package, type, params = NULL)
char *package
int type
HV* params
PREINIT:
plrd_kafka_t *krd;
rd_kafka_conf_t* conf;
rd_kafka_t* rk;
char errstr[ERRSTR_SIZE];
PPCODE:
Newxz(krd, 1, plrd_kafka_t);
conf = krd_parse_config(aTHX_ krd, params);
if (type == RD_KAFKA_PRODUCER) {
DEBUGF(krd->debug_xs, "Creating producer");
prd_init(krd, conf);
} else {
DEBUGF(krd->debug_xs, "Creating consumer");
cns_init(krd, conf);
}
rk = rd_kafka_new(type, conf, errstr, ERRSTR_SIZE);
RETVAL = tp_list;
OUTPUT:
RETVAL
rd_kafka_event_t*
krd_queue_poll(rdk, timeout_ms = 0)
plrd_kafka_t *rdk
int timeout_ms
PREINIT:
rd_kafka_event_t* rke;
PPCODE:
rke = rd_kafka_queue_poll(rdk->queue, timeout_ms);
if (! rke) {
XSRETURN_EMPTY;
return;
}
ST(0) = sv_newmortal();
sv_setref_pv( ST(0), "Net::Kafka::Event", (void *)rke );
XSRETURN(1);
CODE:
err = rd_kafka_resume_partitions(rdk->rk, tp_list);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
croak("Error resuming partitions: %s", rd_kafka_err2str(err));
}
void
krd_consumer_poll(rdk, timeout_ms = 0)
plrd_kafka_t *rdk
int timeout_ms
PPCODE:
rd_kafka_message_t *rd_msg = rd_kafka_consumer_poll( rdk->rk, timeout_ms );
if (! rd_msg) {
XSRETURN_EMPTY;
return;
}
ST(0) = sv_newmortal();
sv_setref_pv( ST(0), "Net::Kafka::Message", (void *)rd_msg );
XSRETURN(1);
void
krd_topic(rdk, topic)
plrd_kafka_t* rdk
char *topic
PPCODE:
rd_kafka_topic_t* rd_topic = rd_kafka_topic_new(rdk->rk, topic, NULL);
DEBUG2F(rdk->debug_xs, "Created Net::Kafka::Topic %s", rd_kafka_topic_name(rd_topic));
ST(0) = sv_newmortal();
sv_setref_pv( ST(0), "Net::Kafka::Topic", (void *)rd_topic );
XSRETURN(1);
void
krd_close(rdk)
plrd_kafka_t* rdk
CODE:
void
krd_query_watermark_offsets(rdk, topic, partition, timeout_ms)
plrd_kafka_t* rdk
char *topic
int partition
long timeout_ms
PREINIT:
rd_kafka_resp_err_t err;
long low, high;
PPCODE:
err = rd_kafka_query_watermark_offsets(rdk->rk, topic, partition, &low, &high, timeout_ms);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
croak("Error querying watermark offsets: %s", rd_kafka_err2str(err));
}
EXTEND(SP, 2);
PUSHs(sv_2mortal(newSViv(low)));
PUSHs(sv_2mortal(newSViv(high)));
MODULE = Net::Kafka PACKAGE = Net::Kafka::Event PREFIX = krdev_
PROTOTYPES: DISABLE
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
void
krdm_timestamp(rd_msg)
rd_kafka_message_t *rd_msg
PREINIT:
rd_kafka_timestamp_type_t tstype;
PPCODE:
long timestamp = rd_kafka_message_timestamp(rd_msg, &tstype);
EXTEND(SP, 2);
PUSHs(sv_2mortal(newSViv(timestamp)));
PUSHs(sv_2mortal(newSViv(tstype)));
long
krdm_offset(rd_msg)
rd_kafka_message_t *rd_msg
CODE:
/* that will truncate offset if perl doesn't support 64bit ints */
void
krdh_get_last(hdrs, name)
PREINIT:
rd_kafka_resp_err_t err;
const void *value;
size_t value_len;
INPUT:
rd_kafka_headers_t* hdrs
const char *name
PPCODE:
err = rd_kafka_header_get_last(hdrs, name, &value, &value_len);
if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
PUSHs(sv_2mortal(newSVpvn(value, value_len)));
} else if (err == RD_KAFKA_RESP_ERR__NOENT) {
XSRETURN_UNDEF;
} else {
croak("Error while getting header: %s", rd_kafka_err2str(err));
}
HV*
rd_kafka_topic_partition_list_t* rktpl
CODE:
RETVAL = rd_kafka_topic_partition_list_copy(rktpl);
OUTPUT:
RETVAL
void
ktpl_get(rktpl, idx)
rd_kafka_topic_partition_list_t *rktpl
int idx
PPCODE:
if (!rktpl || idx < 0 || idx >= rktpl->cnt) {
return;
}
char* tn = rktpl->elems[idx].topic;
EXTEND(SP, 3);
PUSHs(sv_2mortal(newSVpv(tn, strlen(tn))));
PUSHs(sv_2mortal(newSViv(rktpl->elems[idx].partition)));
PUSHs(sv_2mortal(newSViv(rktpl->elems[idx].offset)));
void
croak("Error setting offset: %s", rd_kafka_err2str(err));
}
void
ktpl_offset(rktpl, topic, partition)
rd_kafka_topic_partition_list_t *rktpl
char *topic
int partition
PREINIT:
rd_kafka_topic_partition_t *tp;
PPCODE:
tp = rd_kafka_topic_partition_list_find(rktpl, topic, partition);
if (tp == NULL) {
XSRETURN_EMPTY;
return;
}
ST(0) = sv_2mortal(newSViv(tp->offset));
XSRETURN(1);
int
( run in 0.826 second using v1.01-cache-2.11-cpan-5511b514fd6 )