EV-Kafka
view release on metacpan or search on metacpan
lib/EV/Kafka.pm view on Meta::CPAN
my $loop = delete $opts{loop}; # passed through to per-conn _new on demand
# Parse brokers
my $brokers = delete $opts{brokers} // '127.0.0.1:9092';
my @bootstrap;
for my $b (split /,/, $brokers) {
$b =~ s/^\s+|\s+$//g;
my ($h, $p) = split /:/, $b, 2;
$p //= 9092;
push @bootstrap, [$h, $p + 0];
}
# Store config
my $cfg = {
bootstrap => \@bootstrap,
client_id => delete $opts{client_id} // 'ev-kafka',
tls => delete $opts{tls} // 0,
tls_ca_file => delete $opts{tls_ca_file},
tls_skip_verify => delete $opts{tls_skip_verify} // 0,
sasl => delete $opts{sasl},
on_error => delete $opts{on_error} // sub { die "EV::Kafka: @_\n" },
on_connect => delete $opts{on_connect},
on_message => delete $opts{on_message},
acks => delete $opts{acks} // -1,
linger_ms => delete $opts{linger_ms} // 5,
batch_size => delete $opts{batch_size} // 16384,
partitioner => delete $opts{partitioner},
compression => delete $opts{compression}, # 'lz4', 'gzip', or undef
idempotent => delete $opts{idempotent} // 0,
transactional_id => delete $opts{transactional_id}, # enables transactions
fetch_max_wait_ms => delete $opts{fetch_max_wait_ms} // 500,
fetch_max_bytes => delete $opts{fetch_max_bytes} // 1048576,
fetch_min_bytes => delete $opts{fetch_min_bytes} // 1,
metadata_refresh => delete $opts{metadata_refresh} // 300,
};
# Internal state
$cfg->{conns} = {}; # node_id => EV::Kafka::Conn
$cfg->{meta} = undef; # latest metadata response
$cfg->{leaders} = {}; # "topic:partition" => node_id
$cfg->{broker_map}= {}; # node_id => {host, port}
$cfg->{connected} = 0;
$cfg->{meta_pending} = 0;
$cfg->{pending_ops} = []; # ops waiting for metadata
# Producer state
$cfg->{batches} = {}; # "topic:partition" => [{rec, cb}]
$cfg->{next_sequence} = {}; # "topic:partition" => next sequence number
$cfg->{producer_id} = -1;
$cfg->{producer_epoch} = -1;
$cfg->{rr_counter} = 0;
# Consumer state
$cfg->{assignments} = []; # [{topic, partition, offset}]
$cfg->{fetch_active} = 0;
$cfg->{group} = undef;
my $self = bless { cfg => $cfg, loop => $loop }, "${class}::Client";
# Warn on credentials over plaintext.
if ($cfg->{sasl} && !$cfg->{tls}) {
my $mech = $cfg->{sasl}{mechanism} // '';
if ($mech eq 'PLAIN' || $mech =~ /^SCRAM-/) {
warn "EV::Kafka: SASL $mech configured without TLS â "
. "credentials will be sent over plaintext\n";
}
}
return $self;
}
package EV::Kafka::Client;
use EV;
use Scalar::Util 'weaken';
sub _any_conn {
my ($self) = @_;
my $cfg = $self->{cfg};
my $conn = $cfg->{bootstrap_conn};
for my $c (values %{$cfg->{conns}}) {
if ($c->connected) { $conn = $c; last }
}
return ($conn && $conn->connected) ? $conn : undef;
}
sub _get_or_create_conn {
my ($self, $node_id) = @_;
my $cfg = $self->{cfg};
return $cfg->{conns}{$node_id} if $cfg->{conns}{$node_id};
my $info = $cfg->{broker_map}{$node_id};
return undef unless $info;
my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', $self->{loop});
$self->_configure_conn($conn);
$cfg->{conns}{$node_id} = $conn;
weaken(my $weak = $self);
$conn->on_connect(sub {
$weak->_drain_pending_for($node_id) if $weak;
});
$conn->connect($info->{host}, $info->{port}, 10.0);
return $conn;
}
sub _configure_conn {
my ($self, $conn) = @_;
my $cfg = $self->{cfg};
$conn->client_id($cfg->{client_id});
if ($cfg->{tls}) {
$conn->tls(1, $cfg->{tls_ca_file}, $cfg->{tls_skip_verify});
}
if ($cfg->{sasl}) {
$conn->sasl($cfg->{sasl}{mechanism}, $cfg->{sasl}{username}, $cfg->{sasl}{password});
}
weaken(my $weak_cfg = $cfg);
$conn->on_error(sub {
$weak_cfg->{on_error}->($_[0]) if $weak_cfg && $weak_cfg->{on_error};
});
}
sub _bootstrap_connect {
my ($self, $cb) = @_;
my $cfg = $self->{cfg};
( run in 1.077 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )