EV-Kafka

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN


        EV::run;

COOKBOOK
  Produce JSON with headers
        use JSON::PP;
        my $json = JSON::PP->new->utf8;

        $kafka->produce('events', 'user-42',
            $json->encode({ action => 'click', page => '/home' }),
            { headers => { 'content-type' => 'application/json' } },
            sub { ... }
        );

  Consume from latest offset only
        $kafka->subscribe('live-feed',
            group_id          => 'realtime',
            auto_offset_reset => 'latest',
            on_assign         => sub { print "ready\n" },
        );

eg/produce_headers.pl  view on Meta::CPAN

my $kafka = EV::Kafka->new(
    brokers  => $ENV{KAFKA_BROKER} // '127.0.0.1:9092',
    acks     => 1,
    on_error => sub { warn "kafka error: @_\n" },
);

$kafka->connect(sub {
    $kafka->produce('test-topic', 'event-key', '{"action":"click","page":"/home"}',
        {
            headers => {
                'content-type' => 'application/json',
                'trace-id'     => 'abc-123-def',
                'source'       => 'web-frontend',
            },
        },
        sub {
            my ($result, $err) = @_;
            die "produce failed: $err" if $err;
            print "produced with headers at offset "
                . $result->{topics}[0]{partitions}[0]{base_offset} . "\n";
            $kafka->close(sub { EV::break });

lib/EV/Kafka.pm  view on Meta::CPAN


=head1 COOKBOOK

=head2 Produce JSON with headers

    use JSON::PP;
    my $json = JSON::PP->new->utf8;

    $kafka->produce('events', 'user-42',
        $json->encode({ action => 'click', page => '/home' }),
        { headers => { 'content-type' => 'application/json' } },
        sub { ... }
    );

=head2 Consume from latest offset only

    $kafka->subscribe('live-feed',
        group_id          => 'realtime',
        auto_offset_reset => 'latest',
        on_assign         => sub { print "ready\n" },
    );



( run in 1.497 second using v1.01-cache-2.11-cpan-524268b4103 )