Net-Kafka

 view release on metacpan or  search on metacpan

Kafka.xs  view on Meta::CPAN

void
krd_new(package, type, params = NULL)
        char *package
        int type
        HV* params
    PREINIT:
        plrd_kafka_t *krd;
        rd_kafka_conf_t* conf;
        rd_kafka_t* rk;
        char errstr[ERRSTR_SIZE];
    PPCODE:
        Newxz(krd, 1, plrd_kafka_t);
        conf = krd_parse_config(aTHX_ krd, params);
        if (type == RD_KAFKA_PRODUCER) {
            DEBUGF(krd->debug_xs, "Creating producer");
            prd_init(krd, conf);
        } else {
            DEBUGF(krd->debug_xs, "Creating consumer");
            cns_init(krd, conf);
        }
        rk = rd_kafka_new(type, conf, errstr, ERRSTR_SIZE);

Kafka.xs  view on Meta::CPAN

        RETVAL = tp_list;
    OUTPUT:
        RETVAL

rd_kafka_event_t*
krd_queue_poll(rdk, timeout_ms = 0)
        plrd_kafka_t *rdk
        int timeout_ms
    PREINIT:
        rd_kafka_event_t* rke;
    PPCODE:
        rke = rd_kafka_queue_poll(rdk->queue, timeout_ms);
        if (! rke) {
            XSRETURN_EMPTY;
            return;
        }

        ST(0) = sv_newmortal();
        sv_setref_pv( ST(0), "Net::Kafka::Event", (void *)rke );
        XSRETURN(1);

Kafka.xs  view on Meta::CPAN

    CODE:
        err = rd_kafka_resume_partitions(rdk->rk, tp_list);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error resuming partitions: %s", rd_kafka_err2str(err));
        }

void
krd_consumer_poll(rdk, timeout_ms = 0)
        plrd_kafka_t *rdk
        int timeout_ms
    PPCODE:
        rd_kafka_message_t *rd_msg = rd_kafka_consumer_poll( rdk->rk, timeout_ms );

        if (! rd_msg) {
            XSRETURN_EMPTY;
            return;
        }

        ST(0) = sv_newmortal();
        sv_setref_pv( ST(0), "Net::Kafka::Message", (void *)rd_msg );
        XSRETURN(1);

void
krd_topic(rdk, topic)
        plrd_kafka_t* rdk
        char *topic
    PPCODE:
        rd_kafka_topic_t* rd_topic = rd_kafka_topic_new(rdk->rk, topic, NULL);
        DEBUG2F(rdk->debug_xs, "Created Net::Kafka::Topic %s", rd_kafka_topic_name(rd_topic));
        ST(0) = sv_newmortal();
        sv_setref_pv( ST(0), "Net::Kafka::Topic", (void *)rd_topic );
        XSRETURN(1);

void
krd_close(rdk)
        plrd_kafka_t* rdk
    CODE:

Kafka.xs  view on Meta::CPAN


void
krd_query_watermark_offsets(rdk, topic, partition, timeout_ms)
        plrd_kafka_t* rdk
        char *topic
        int partition
        long timeout_ms
    PREINIT:
        rd_kafka_resp_err_t err;
        long low, high;
    PPCODE:
        err = rd_kafka_query_watermark_offsets(rdk->rk, topic, partition, &low, &high, timeout_ms);
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
            croak("Error querying watermark offsets: %s", rd_kafka_err2str(err));
        }
        EXTEND(SP, 2);
        PUSHs(sv_2mortal(newSViv(low)));
        PUSHs(sv_2mortal(newSViv(high)));

MODULE = Net::Kafka    PACKAGE = Net::Kafka::Event    PREFIX = krdev_
PROTOTYPES: DISABLE

Kafka.xs  view on Meta::CPAN

            RETVAL = &PL_sv_undef;
        }
    OUTPUT:
        RETVAL

void
krdm_timestamp(rd_msg)
        rd_kafka_message_t *rd_msg
    PREINIT:
        rd_kafka_timestamp_type_t tstype;
    PPCODE:
        long timestamp = rd_kafka_message_timestamp(rd_msg, &tstype);
        EXTEND(SP, 2);
        PUSHs(sv_2mortal(newSViv(timestamp)));
        PUSHs(sv_2mortal(newSViv(tstype)));

long
krdm_offset(rd_msg)
        rd_kafka_message_t *rd_msg
    CODE:
        /* that will truncate offset if perl doesn't support 64bit ints */

Kafka.xs  view on Meta::CPAN


void
krdh_get_last(hdrs, name)
    PREINIT:
        rd_kafka_resp_err_t err;
        const void *value;
        size_t value_len;
    INPUT:
        rd_kafka_headers_t* hdrs
        const char *name
    PPCODE:
        err = rd_kafka_header_get_last(hdrs, name, &value, &value_len);
        if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
            PUSHs(sv_2mortal(newSVpvn(value, value_len)));
        } else if (err == RD_KAFKA_RESP_ERR__NOENT) {
            XSRETURN_UNDEF;
        } else {
            croak("Error while getting header: %s", rd_kafka_err2str(err));
        }

HV*

Kafka.xs  view on Meta::CPAN

        rd_kafka_topic_partition_list_t* rktpl
    CODE:
        RETVAL = rd_kafka_topic_partition_list_copy(rktpl);
    OUTPUT:
        RETVAL

void
ktpl_get(rktpl, idx)
        rd_kafka_topic_partition_list_t *rktpl
        int idx
    PPCODE:
        if (!rktpl || idx < 0 || idx >= rktpl->cnt) {
            return;
        }
        char* tn = rktpl->elems[idx].topic;
        EXTEND(SP, 3);
        PUSHs(sv_2mortal(newSVpv(tn, strlen(tn))));
        PUSHs(sv_2mortal(newSViv(rktpl->elems[idx].partition)));
        PUSHs(sv_2mortal(newSViv(rktpl->elems[idx].offset)));

void

Kafka.xs  view on Meta::CPAN

            croak("Error setting offset: %s", rd_kafka_err2str(err));
        }

void
ktpl_offset(rktpl, topic, partition)
        rd_kafka_topic_partition_list_t *rktpl
        char *topic
        int partition
    PREINIT:
        rd_kafka_topic_partition_t *tp;
    PPCODE:
        tp = rd_kafka_topic_partition_list_find(rktpl, topic, partition);
        if (tp == NULL) {
            XSRETURN_EMPTY;
            return;
        }

        ST(0) = sv_2mortal(newSViv(tp->offset));
        XSRETURN(1);

int



( run in 1.224 second using v1.01-cache-2.11-cpan-5511b514fd6 )