EV-Kafka

 view release on metacpan or  search on metacpan

lib/EV/Kafka.pm  view on Meta::CPAN


    my $loop = delete $opts{loop};  # passed through to per-conn _new on demand

    # Parse brokers
    my $brokers = delete $opts{brokers} // '127.0.0.1:9092';
    my @bootstrap;
    for my $b (split /,/, $brokers) {
        $b =~ s/^\s+|\s+$//g;
        my ($h, $p) = split /:/, $b, 2;
        $p //= 9092;
        push @bootstrap, [$h, $p + 0];
    }

    # Store config
    my $cfg = {
        bootstrap    => \@bootstrap,
        client_id    => delete $opts{client_id} // 'ev-kafka',
        tls          => delete $opts{tls} // 0,
        tls_ca_file  => delete $opts{tls_ca_file},
        tls_skip_verify => delete $opts{tls_skip_verify} // 0,
        sasl         => delete $opts{sasl},
        on_error     => delete $opts{on_error} // sub { die "EV::Kafka: @_\n" },
        on_connect   => delete $opts{on_connect},
        on_message   => delete $opts{on_message},
        acks         => delete $opts{acks} // -1,
        linger_ms    => delete $opts{linger_ms} // 5,
        batch_size   => delete $opts{batch_size} // 16384,
        partitioner  => delete $opts{partitioner},
        compression      => delete $opts{compression},     # 'lz4', 'gzip', or undef
        idempotent       => delete $opts{idempotent} // 0,
        transactional_id => delete $opts{transactional_id}, # enables transactions
        fetch_max_wait_ms => delete $opts{fetch_max_wait_ms} // 500,
        fetch_max_bytes   => delete $opts{fetch_max_bytes} // 1048576,
        fetch_min_bytes   => delete $opts{fetch_min_bytes} // 1,
        metadata_refresh  => delete $opts{metadata_refresh} // 300,
    };

    # Internal state
    $cfg->{conns}     = {};    # node_id => EV::Kafka::Conn
    $cfg->{meta}      = undef; # latest metadata response
    $cfg->{leaders}   = {};    # "topic:partition" => node_id
    $cfg->{broker_map}= {};    # node_id => {host, port}
    $cfg->{connected} = 0;
    $cfg->{meta_pending} = 0;
    $cfg->{pending_ops} = [];  # ops waiting for metadata

    # Producer state
    $cfg->{batches}  = {};     # "topic:partition" => [{rec, cb}]
    $cfg->{next_sequence} = {}; # "topic:partition" => next sequence number
    $cfg->{producer_id}    = -1;
    $cfg->{producer_epoch} = -1;
    $cfg->{rr_counter} = 0;

    # Consumer state
    $cfg->{assignments} = [];  # [{topic, partition, offset}]
    $cfg->{fetch_active} = 0;
    $cfg->{group} = undef;

    my $self = bless { cfg => $cfg, loop => $loop }, "${class}::Client";

    # Warn on credentials over plaintext.
    if ($cfg->{sasl} && !$cfg->{tls}) {
        my $mech = $cfg->{sasl}{mechanism} // '';
        if ($mech eq 'PLAIN' || $mech =~ /^SCRAM-/) {
            warn "EV::Kafka: SASL $mech configured without TLS — "
                . "credentials will be sent over plaintext\n";
        }
    }

    return $self;
}

package EV::Kafka::Client;
use EV;
use Scalar::Util 'weaken';

sub _any_conn {
    my ($self) = @_;
    my $cfg = $self->{cfg};
    my $conn = $cfg->{bootstrap_conn};
    for my $c (values %{$cfg->{conns}}) {
        if ($c->connected) { $conn = $c; last }
    }
    return ($conn && $conn->connected) ? $conn : undef;
}

sub _get_or_create_conn {
    my ($self, $node_id) = @_;
    my $cfg = $self->{cfg};
    return $cfg->{conns}{$node_id} if $cfg->{conns}{$node_id};

    my $info = $cfg->{broker_map}{$node_id};
    return undef unless $info;

    my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', $self->{loop});
    $self->_configure_conn($conn);

    $cfg->{conns}{$node_id} = $conn;
    weaken(my $weak = $self);
    $conn->on_connect(sub {
        $weak->_drain_pending_for($node_id) if $weak;
    });
    $conn->connect($info->{host}, $info->{port}, 10.0);

    return $conn;
}

sub _configure_conn {
    my ($self, $conn) = @_;
    my $cfg = $self->{cfg};
    $conn->client_id($cfg->{client_id});
    if ($cfg->{tls}) {
        $conn->tls(1, $cfg->{tls_ca_file}, $cfg->{tls_skip_verify});
    }
    if ($cfg->{sasl}) {
        $conn->sasl($cfg->{sasl}{mechanism}, $cfg->{sasl}{username}, $cfg->{sasl}{password});
    }
    weaken(my $weak_cfg = $cfg);
    $conn->on_error(sub {
        $weak_cfg->{on_error}->($_[0]) if $weak_cfg && $weak_cfg->{on_error};
    });
}

sub _bootstrap_connect {
    my ($self, $cb) = @_;
    my $cfg = $self->{cfg};



( run in 1.077 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )