Net-Kafka

 view release on metacpan or  search on metacpan

Kafka.xs  view on Meta::CPAN

    {
        /* Create a read-only constant. Fast, optimised at perl compilation. */
        SvUPGRADE( *sv, SVt_RV );
        SvRV_set( *sv, sv_value );
        SvROK_on( *sv );
        SvREADONLY_on( sv_value );
    }
}

#define MAKE_CONSTANT_IV( name ) make_constant_iv( aTHX_ stash, #name, strlen( #name ), name )

MODULE = Net::Kafka    PACKAGE = Net::Kafka    PREFIX = krd_
PROTOTYPES: DISABLE

BOOT:
    {
        dTHX;
        HV *stash = get_hv( "Net::Kafka::", GV_ADD );

        MAKE_CONSTANT_IV( RD_KAFKA_VERSION );
        MAKE_CONSTANT_IV( RD_KAFKA_PRODUCER );
        MAKE_CONSTANT_IV( RD_KAFKA_CONSUMER );
        MAKE_CONSTANT_IV( RD_KAFKA_TIMESTAMP_NOT_AVAILABLE );
        MAKE_CONSTANT_IV( RD_KAFKA_TIMESTAMP_CREATE_TIME );
        MAKE_CONSTANT_IV( RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME );
        MAKE_CONSTANT_IV( RD_KAFKA_PARTITION_UA );
        MAKE_CONSTANT_IV( RD_KAFKA_OFFSET_BEGINNING );
        MAKE_CONSTANT_IV( RD_KAFKA_OFFSET_END );
        MAKE_CONSTANT_IV( RD_KAFKA_OFFSET_STORED );
        MAKE_CONSTANT_IV( RD_KAFKA_OFFSET_INVALID );

        MAKE_CONSTANT_IV( RD_KAFKA_EVENT_NONE );
        MAKE_CONSTANT_IV( RD_KAFKA_EVENT_DR );
        MAKE_CONSTANT_IV( RD_KAFKA_EVENT_FETCH );
        MAKE_CONSTANT_IV( RD_KAFKA_EVENT_LOG );
        MAKE_CONSTANT_IV( RD_KAFKA_EVENT_ERROR );
        MAKE_CONSTANT_IV( RD_KAFKA_EVENT_REBALANCE );
        MAKE_CONSTANT_IV( RD_KAFKA_EVENT_OFFSET_COMMIT );
        MAKE_CONSTANT_IV( RD_KAFKA_EVENT_STATS );

        ++PL_sub_generation;
    }

const char *
krd_rd_kafka_version()
    CODE:
        RETVAL = rd_kafka_version_str();
    OUTPUT:
        RETVAL

void
krd_new(package, type, params = NULL)
        char *package
        int type
        HV* params
    PREINIT:
        plrd_kafka_t *krd;
        rd_kafka_conf_t* conf;
        rd_kafka_t* rk;
        char errstr[ERRSTR_SIZE];
    PPCODE:
        Newxz(krd, 1, plrd_kafka_t);
        conf = krd_parse_config(aTHX_ krd, params);
        if (type == RD_KAFKA_PRODUCER) {
            DEBUGF(krd->debug_xs, "Creating producer");
            prd_init(krd, conf);
        } else {
            DEBUGF(krd->debug_xs, "Creating consumer");
            cns_init(krd, conf);
        }
        rk = rd_kafka_new(type, conf, errstr, ERRSTR_SIZE);
        if (rk == NULL) {
            croak("%s", errstr);
        }
        krd->rk = rk;
        krd->thx = (IV) PERL_GET_THX;
        krd->type = type;
        krd->is_closed = 0;

        ST(0) = sv_newmortal();
        sv_setref_pv(ST(0), "Net::Kafka", (void *)krd);
        krd->self = newSVsv((SV*)ST(0));

        if (type == RD_KAFKA_PRODUCER) {
            prd_start(krd);
        } else {
            cns_start(krd);
        }

        XSRETURN(1);

const char *
krd_get_debug_contexts()
    CODE:
        RETVAL = rd_kafka_get_debug_contexts();
    OUTPUT:
        RETVAL

void
krd_subscribe(rdk, topics)
        plrd_kafka_t* rdk
        AV* topics
    PREINIT:
        STRLEN strl;
        int i, len;
        rd_kafka_topic_partition_list_t* topic_list;
        rd_kafka_resp_err_t err;
        char* topic;
        SV** topic_sv;
    CODE:
        len = av_len(topics) + 1;
        topic_list = rd_kafka_topic_partition_list_new(len);
        for (i=0; i < len; i++) {
            topic_sv = av_fetch(topics, i, 0);
            if (topic_sv != NULL) {
                topic = SvPV(*topic_sv, strl);
                rd_kafka_topic_partition_list_add(topic_list, topic, -1);
            }
        }
        err = rd_kafka_subscribe(rdk->rk, topic_list);
        rd_kafka_topic_partition_list_destroy(topic_list);

Kafka.xs  view on Meta::CPAN

rd_kafka_topic_partition_list_t *
krd_subscription(rdk)
        plrd_kafka_t* rdk
    PREINIT:
        rd_kafka_topic_partition_list_t* tp_list;
        rd_kafka_resp_err_t err;
    CODE:
        err = rd_kafka_subscription(rdk->rk, &tp_list);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error retrieving subscriptions: %s", rd_kafka_err2str(err));
        }
        RETVAL = tp_list;
    OUTPUT:
        RETVAL

void
krd_assign(rdk, tp_list = NULL)
        plrd_kafka_t* rdk
        rd_kafka_topic_partition_list_t* tp_list
    PREINIT:
        rd_kafka_resp_err_t err;
    CODE:
        err = rd_kafka_assign(rdk->rk, tp_list);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error assigning partitions: %s", rd_kafka_err2str(err));
        }

void
krd_position(rdk, tp_list)
        plrd_kafka_t* rdk
        rd_kafka_topic_partition_list_t* tp_list
    PREINIT:
        rd_kafka_resp_err_t err;
    CODE:
        err = rd_kafka_position(rdk->rk, tp_list);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error getting position: %s", rd_kafka_err2str(err));
        }

rd_kafka_topic_partition_list_t *
krd_assignment(rdk)
        plrd_kafka_t *rdk
    PREINIT:
        rd_kafka_topic_partition_list_t* tp_list;
        rd_kafka_resp_err_t err;
    CODE:
        err = rd_kafka_assignment(rdk->rk, &tp_list);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error retrieving assignments: %s", rd_kafka_err2str(err));
        }
        RETVAL = tp_list;
    OUTPUT:
        RETVAL

rd_kafka_event_t*
krd_queue_poll(rdk, timeout_ms = 0)
        plrd_kafka_t *rdk
        int timeout_ms
    PREINIT:
        rd_kafka_event_t* rke;
    PPCODE:
        rke = rd_kafka_queue_poll(rdk->queue, timeout_ms);
        if (! rke) {
            XSRETURN_EMPTY;
            return;
        }

        ST(0) = sv_newmortal();
        sv_setref_pv( ST(0), "Net::Kafka::Event", (void *)rke );
        XSRETURN(1);

long
krd_queue_length(rdk)
        plrd_kafka_t *rdk
    CODE:
        RETVAL = rd_kafka_queue_length(rdk->queue);
    OUTPUT:
        RETVAL

void
krd_commit(rdk, async = 0, tp_list = NULL)
        plrd_kafka_t* rdk
        int async
        rd_kafka_topic_partition_list_t* tp_list
    PREINIT:
        rd_kafka_resp_err_t err;
    CODE:
        err = rd_kafka_commit(rdk->rk, tp_list, async);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR && err != RD_KAFKA_RESP_ERR__NO_OFFSET) {
            croak("Error committing offsets: %s", rd_kafka_err2str(err));
        }

void
krd_commit_message(rdk, async = 0, rd_msg)
        plrd_kafka_t *rdk
        int async
        rd_kafka_message_t *rd_msg
    PREINIT:
        rd_kafka_resp_err_t err;
    CODE:
        err = rd_kafka_commit_message(rdk->rk, rd_msg, async);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error committing message: %s", rd_kafka_err2str(err));
        }

void
krd_committed(rdk, tp_list, timeout_ms)
        plrd_kafka_t* rdk
        rd_kafka_topic_partition_list_t* tp_list
        int timeout_ms
    PREINIT:
        rd_kafka_resp_err_t err;
    CODE:
        err = rd_kafka_committed(rdk->rk, tp_list, timeout_ms);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error retrieving commited offsets: %s", rd_kafka_err2str(err));
        }

void
krd_offsets_for_times(rdk, tp_list, timeout_ms)
        plrd_kafka_t* rdk

Kafka.xs  view on Meta::CPAN

    CODE:
        err = rd_kafka_pause_partitions(rdk->rk, tp_list);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error pausing partitions: %s", rd_kafka_err2str(err));
        }

void
krd_produce(rdk, topic, partition, key, payload, timestamp, msg_id, msgflags = 0, hdrs = NULL)
        plrd_kafka_t *rdk
        char *topic
        int partition
        SV *key
        SV *payload
        long timestamp
        IV msg_id
        int msgflags
        rd_kafka_headers_t *hdrs
    PREINIT:
        STRLEN plen = 0, klen = 0;
        char *plptr = NULL, *keyptr = NULL;
        rd_kafka_resp_err_t err;
    CODE:
        if (SvOK(payload))
            plptr = SvPVbyte(payload, plen);
        if (SvOK(key))
            keyptr = SvPVbyte(key, klen);

        err = rd_kafka_producev(
            rdk->rk,
            RD_KAFKA_V_TOPIC(topic),
            RD_KAFKA_V_PARTITION(partition),
            RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY | msgflags),
            RD_KAFKA_V_KEY(keyptr, klen),
            RD_KAFKA_V_TIMESTAMP(timestamp),
            RD_KAFKA_V_VALUE(plptr, plen),
            RD_KAFKA_V_OPAQUE((void *) msg_id),
            /* making a copy here avoids ownership nightmares */
            RD_KAFKA_V_HEADERS(hdrs ? rd_kafka_headers_copy(hdrs) : NULL),
            RD_KAFKA_V_END);

        if (err) {
            croak("Error producing: %s", rd_kafka_err2str(err));
        }

void
krd_resume(rdk, tp_list = NULL)
        plrd_kafka_t *rdk
        rd_kafka_topic_partition_list_t* tp_list
    PREINIT:
        rd_kafka_resp_err_t err;
    CODE:
        err = rd_kafka_resume_partitions(rdk->rk, tp_list);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error resuming partitions: %s", rd_kafka_err2str(err));
        }

void
krd_consumer_poll(rdk, timeout_ms = 0)
        plrd_kafka_t *rdk
        int timeout_ms
    PPCODE:
        rd_kafka_message_t *rd_msg = rd_kafka_consumer_poll( rdk->rk, timeout_ms );

        if (! rd_msg) {
            XSRETURN_EMPTY;
            return;
        }

        ST(0) = sv_newmortal();
        sv_setref_pv( ST(0), "Net::Kafka::Message", (void *)rd_msg );
        XSRETURN(1);

void
krd_topic(rdk, topic)
        plrd_kafka_t* rdk
        char *topic
    PPCODE:
        rd_kafka_topic_t* rd_topic = rd_kafka_topic_new(rdk->rk, topic, NULL);
        DEBUG2F(rdk->debug_xs, "Created Net::Kafka::Topic %s", rd_kafka_topic_name(rd_topic));
        ST(0) = sv_newmortal();
        sv_setref_pv( ST(0), "Net::Kafka::Topic", (void *)rd_topic );
        XSRETURN(1);

void
krd_close(rdk)
        plrd_kafka_t* rdk
    CODE:
        krd_close_handles(rdk);

void
krd_DESTROY(rdk)
        plrd_kafka_t* rdk
    CODE:
        krd_close_handles(rdk);
        if (rdk->thx == (IV)PERL_GET_THX)
            Safefree(rdk);

void
krd_dump(rdk)
        plrd_kafka_t* rdk
    CODE:
        rd_kafka_dump(stdout, rdk->rk);

void
krd_query_watermark_offsets(rdk, topic, partition, timeout_ms)
        plrd_kafka_t* rdk
        char *topic
        int partition
        long timeout_ms
    PREINIT:
        rd_kafka_resp_err_t err;
        long low, high;
    PPCODE:
        err = rd_kafka_query_watermark_offsets(rdk->rk, topic, partition, &low, &high, timeout_ms);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error querying watermark offsets: %s", rd_kafka_err2str(err));
        }
        EXTEND(SP, 2);
        PUSHs(sv_2mortal(newSViv(low)));
        PUSHs(sv_2mortal(newSViv(high)));

MODULE = Net::Kafka    PACKAGE = Net::Kafka::Event    PREFIX = krdev_
PROTOTYPES: DISABLE

const char *
krdev_event_name(rkev)
        rd_kafka_event_t* rkev
    CODE:
        RETVAL = rd_kafka_event_name(rkev);
    OUTPUT:
        RETVAL

int
krdev_event_error(rkev)
        rd_kafka_event_t* rkev
    CODE:
        RETVAL = rd_kafka_event_error(rkev);
    OUTPUT:
        RETVAL

const char *
krdev_event_error_string(rkev)
        rd_kafka_event_t* rkev
    CODE:
        RETVAL = rd_kafka_event_error_string(rkev);
    OUTPUT:
        RETVAL

const char*
krdev_event_stats(rkev)
        rd_kafka_event_t* rkev
    CODE:
        RETVAL = rd_kafka_event_stats(rkev);
    OUTPUT:
        RETVAL

int
krdev_event_message_count(rkev)
        rd_kafka_event_t* rkev
    CODE:
        RETVAL = rd_kafka_event_message_count(rkev);
    OUTPUT:
        RETVAL

HV*
krdev_event_delivery_report_next(rkev)
        rd_kafka_event_t* rkev
    PREINIT:
        const rd_kafka_message_t* rkm;
    CODE:
        rkm = rd_kafka_event_message_next(rkev);
        if (! rkm) {
            XSRETURN_UNDEF;

Kafka.xs  view on Meta::CPAN

MODULE = Net::Kafka    PACKAGE = Net::Kafka::Message    PREFIX = krdm_
PROTOTYPES: DISABLE

int
krdm_err(rd_msg)
        rd_kafka_message_t *rd_msg
    CODE:
        RETVAL = rd_msg->err;
    OUTPUT:
        RETVAL

const char *
krdm_err_name(rd_msg)
        rd_kafka_message_t *rd_msg
    CODE:
        RETVAL = rd_kafka_err2name(rd_msg->err);
    OUTPUT:
        RETVAL

int
krdm_partition(rd_msg)
        rd_kafka_message_t *rd_msg
    CODE:
        RETVAL = rd_msg->partition;
    OUTPUT:
        RETVAL

const char*
krdm_topic(rd_msg)
        rd_kafka_message_t *rd_msg
    CODE:
        RETVAL = rd_kafka_topic_name(rd_msg->rkt);
    OUTPUT:
        RETVAL

SV*
krdm_payload(rd_msg)
        rd_kafka_message_t *rd_msg
    CODE:
        RETVAL = newSVpvn(rd_msg->payload, rd_msg->len);
    OUTPUT:
        RETVAL

SV*
krdm_key(rd_msg)
        rd_kafka_message_t *rd_msg
    CODE:
        if (rd_msg->err == 0) {
            RETVAL = newSVpvn(rd_msg->key, rd_msg->key_len);
        } else {
            RETVAL = &PL_sv_undef;
        }
    OUTPUT:
        RETVAL

void
krdm_timestamp(rd_msg)
        rd_kafka_message_t *rd_msg
    PREINIT:
        rd_kafka_timestamp_type_t tstype;
    PPCODE:
        long timestamp = rd_kafka_message_timestamp(rd_msg, &tstype);
        EXTEND(SP, 2);
        PUSHs(sv_2mortal(newSViv(timestamp)));
        PUSHs(sv_2mortal(newSViv(tstype)));

long
krdm_offset(rd_msg)
        rd_kafka_message_t *rd_msg
    CODE:
        /* that will truncate offset if perl doesn't support 64bit ints */
        RETVAL = rd_msg->offset;
    OUTPUT:
        RETVAL

rd_kafka_headers_t*
krdm_headers(rd_msg)
        rd_kafka_message_t* rd_msg
    PREINIT:
        rd_kafka_headers_t *hdrs;
        rd_kafka_resp_err_t err;
    CODE:
        err = rd_kafka_message_headers(rd_msg, &hdrs);
        if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
            /* making a copy here avoids ownership nightmares */
            RETVAL = rd_kafka_headers_copy(hdrs);
        } else if (err == RD_KAFKA_RESP_ERR__NOENT) {
            XSRETURN_UNDEF;
        } else {
            croak("Error while getting headers: %s", rd_kafka_err2str(err));
        }
    OUTPUT:
        RETVAL

rd_kafka_headers_t*
krdm_detach_headers(rd_msg)
        rd_kafka_message_t* rd_msg
    PREINIT:
        rd_kafka_headers_t *hdrs;
        rd_kafka_resp_err_t err;
    CODE:
        err = rd_kafka_message_detach_headers(rd_msg, &hdrs);
        if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
            RETVAL = hdrs;
        } else if (err == RD_KAFKA_RESP_ERR__NOENT) {
            XSRETURN_UNDEF;
        } else {
            croak("Error while getting headers: %s", rd_kafka_err2str(err));
        }
    OUTPUT:
        RETVAL

void
krdm_DESTROY(rd_msg)
        rd_kafka_message_t *rd_msg
    CODE:
        rd_kafka_message_destroy( rd_msg );

MODULE = Net::Kafka    PACKAGE = Net::Kafka::Headers    PREFIX = krdh_
PROTOTYPES: DISABLE

rd_kafka_headers_t*
krdh_new(klass)
        SV *klass
    CODE:
        RETVAL = rd_kafka_headers_new(0);
    OUTPUT:
        RETVAL

void
krdh_add(hdrs, name, value)
    PREINIT:
        STRLEN name_len, value_len;
        rd_kafka_resp_err_t err;
    INPUT:
        rd_kafka_headers_t* hdrs
        const char *name = SvPV($arg, name_len);
        const char *value = SvPV($arg, value_len);
    CODE:
        err = rd_kafka_header_add(hdrs, name, name_len, value, value_len);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error while adding header: %s", rd_kafka_err2str(err));
        }

void
krdh_remove(hdrs, name)
    PREINIT:
        rd_kafka_resp_err_t err;
    INPUT:
        rd_kafka_headers_t* hdrs
        const char *name
    CODE:
        err = rd_kafka_header_remove(hdrs, name);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error while removing header: %s", rd_kafka_err2str(err));
        }

void
krdh_get_last(hdrs, name)
    PREINIT:
        rd_kafka_resp_err_t err;
        const void *value;
        size_t value_len;
    INPUT:
        rd_kafka_headers_t* hdrs
        const char *name
    PPCODE:
        err = rd_kafka_header_get_last(hdrs, name, &value, &value_len);
        if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
            PUSHs(sv_2mortal(newSVpvn(value, value_len)));
        } else if (err == RD_KAFKA_RESP_ERR__NOENT) {
            XSRETURN_UNDEF;
        } else {
            croak("Error while getting header: %s", rd_kafka_err2str(err));
        }

HV*
krdh_to_hash(hdrs)
        rd_kafka_headers_t* hdrs
    PREINIT:
        rd_kafka_resp_err_t err;
        int i;
        const char *name;
        const void *value;
        size_t value_len;
        SV **slot;
        AV *value_list;
    CODE:
        RETVAL = newHV();
        for (i = 0; ; ++i) {
            err = rd_kafka_header_get_all(hdrs, i, &name, &value, &value_len);
            if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
               break;
            }
            slot = hv_fetch(RETVAL, name, strlen(name), 1);
            if (slot == NULL) { /* should never happen */
                croak("Error while building hash, lvalue fetch returned a NULL value");
            }
            if (!SvOK(*slot)) {
                value_list = newAV();

                SvUPGRADE(*slot, SVt_RV);
                SvROK_on(*slot);
                SvRV_set(*slot, (SV *) value_list);
            } else {
                value_list = (AV *) SvRV(*slot);
            }
            av_push(value_list, newSVpvn(value, value_len));
        }
    OUTPUT:
        RETVAL

void
krdh_DESTROY(hdrs)
        rd_kafka_headers_t* hdrs
    CODE:
        rd_kafka_headers_destroy(hdrs);

MODULE = Net::Kafka    PACKAGE = Net::Kafka::Error    PREFIX = krde_
PROTOTYPES: DISABLE

HV *
krde_rd_kafka_get_err_descs()
    PREINIT:
        const struct rd_kafka_err_desc* descs;
        size_t cnt;
        int i;

Kafka.xs  view on Meta::CPAN

const char*
krde_to_string(code)
        int code
    CODE:
        RETVAL = rd_kafka_err2str(code);
    OUTPUT:
        RETVAL

const char*
krde_to_name(code)
        int code
    CODE:
        RETVAL = rd_kafka_err2name(code);
    OUTPUT:
        RETVAL

int
krde_last_error()
    CODE:
        RETVAL = rd_kafka_last_error();
    OUTPUT:
        RETVAL

MODULE = Net::Kafka    PACKAGE = Net::Kafka::TopicPartitionList    PREFIX = ktpl_
PROTOTYPES: DISABLE

rd_kafka_topic_partition_list_t *
ktpl_new(class, initial_size = 10)
    char *class
    int initial_size
    CODE:
        RETVAL = rd_kafka_topic_partition_list_new(initial_size);
    OUTPUT:
        RETVAL

void
ktpl_add(rktpl, topic, partition)
        rd_kafka_topic_partition_list_t *rktpl
        char *topic
        int partition
    PREINIT:
        rd_kafka_topic_partition_t *tp;
    CODE:
        tp = rd_kafka_topic_partition_list_find(rktpl, topic, partition);
        if (tp == NULL) {
            rd_kafka_topic_partition_list_add(rktpl, topic, partition);
        }

rd_kafka_topic_partition_list_t*
ktpl_copy(rktpl)
        rd_kafka_topic_partition_list_t* rktpl
    CODE:
        RETVAL = rd_kafka_topic_partition_list_copy(rktpl);
    OUTPUT:
        RETVAL

void
ktpl_get(rktpl, idx)
        rd_kafka_topic_partition_list_t *rktpl
        int idx
    PPCODE:
        if (!rktpl || idx < 0 || idx >= rktpl->cnt) {
            return;
        }
        char* tn = rktpl->elems[idx].topic;
        EXTEND(SP, 3);
        PUSHs(sv_2mortal(newSVpv(tn, strlen(tn))));
        PUSHs(sv_2mortal(newSViv(rktpl->elems[idx].partition)));
        PUSHs(sv_2mortal(newSViv(rktpl->elems[idx].offset)));

void
ktpl_set_offset(rktpl, topic, partition, offset)
        rd_kafka_topic_partition_list_t *rktpl
        char *topic
        int partition
        long offset
    PREINIT:
        rd_kafka_resp_err_t err;
    CODE:
        err = rd_kafka_topic_partition_list_set_offset(rktpl, topic, partition, offset);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error setting offset: %s", rd_kafka_err2str(err));
        }

void
ktpl_offset(rktpl, topic, partition)
        rd_kafka_topic_partition_list_t *rktpl
        char *topic
        int partition
    PREINIT:
        rd_kafka_topic_partition_t *tp;
    PPCODE:
        tp = rd_kafka_topic_partition_list_find(rktpl, topic, partition);
        if (tp == NULL) {
            XSRETURN_EMPTY;
            return;
        }

        ST(0) = sv_2mortal(newSViv(tp->offset));
        XSRETURN(1);

int
ktpl_del(rktpl, topic, partition)
        rd_kafka_topic_partition_list_t *rktpl
        char *topic
        int partition
    CODE:
        RETVAL = rd_kafka_topic_partition_list_del(rktpl, topic, partition);
    OUTPUT:
        RETVAL

int
ktpl_size(rktpl)
        rd_kafka_topic_partition_list_t *rktpl
    CODE:
        RETVAL = (rktpl == NULL ? 0 : rktpl->cnt);
    OUTPUT:
        RETVAL

void
ktpl_DESTROY(rktpl)
        rd_kafka_topic_partition_list_t *rktpl
    CODE:
        rd_kafka_topic_partition_list_destroy(rktpl);



( run in 2.648 seconds using v1.01-cache-2.11-cpan-5511b514fd6 )