EV-Kafka
view release on metacpan or search on metacpan
EV::run;
COOKBOOK
Produce JSON with headers
use JSON::PP;
my $json = JSON::PP->new->utf8;
$kafka->produce('events', 'user-42',
$json->encode({ action => 'click', page => '/home' }),
{ headers => { 'content-type' => 'application/json' } },
sub { ... }
);
Consume from latest offset only
$kafka->subscribe('live-feed',
group_id => 'realtime',
auto_offset_reset => 'latest',
on_assign => sub { print "ready\n" },
);
eg/produce_headers.pl view on Meta::CPAN
my $kafka = EV::Kafka->new(
brokers => $ENV{KAFKA_BROKER} // '127.0.0.1:9092',
acks => 1,
on_error => sub { warn "kafka error: @_\n" },
);
$kafka->connect(sub {
$kafka->produce('test-topic', 'event-key', '{"action":"click","page":"/home"}',
{
headers => {
'content-type' => 'application/json',
'trace-id' => 'abc-123-def',
'source' => 'web-frontend',
},
},
sub {
my ($result, $err) = @_;
die "produce failed: $err" if $err;
print "produced with headers at offset "
. $result->{topics}[0]{partitions}[0]{base_offset} . "\n";
$kafka->close(sub { EV::break });
lib/EV/Kafka.pm view on Meta::CPAN
=head1 COOKBOOK
=head2 Produce JSON with headers
use JSON::PP;
my $json = JSON::PP->new->utf8;
$kafka->produce('events', 'user-42',
$json->encode({ action => 'click', page => '/home' }),
{ headers => { 'content-type' => 'application/json' } },
sub { ... }
);
=head2 Consume from latest offset only
$kafka->subscribe('live-feed',
group_id => 'realtime',
auto_offset_reset => 'latest',
on_assign => sub { print "ready\n" },
);
( run in 1.497 second using v1.01-cache-2.11-cpan-524268b4103 )