EV-Kafka

 view release on metacpan or  search on metacpan

eg/tls_sasl.pl  view on Meta::CPAN

#!/usr/bin/env perl
# TLS + SASL/SCRAM-SHA-256 example.
#
# Set KAFKA_BROKER to a TLS listener (e.g. host:9093). Provide the CA cert
# in KAFKA_TLS_CA, and credentials via KAFKA_USER and KAFKA_PASS. The
# client warns at construction if SASL is configured without TLS.
#
# Quick local test with Redpanda:
#   docker run -e RP_BOOTSTRAP_USER=admin:secret123 \
#     redpandadata/redpanda:latest start --kafka-addr=...

use strict;
use warnings;
use EV;
use EV::Kafka;

my $kafka = EV::Kafka->new(
    brokers          => $ENV{KAFKA_BROKER}  // '127.0.0.1:9093',
    tls              => 1,
    tls_ca_file      => $ENV{KAFKA_TLS_CA},      # may be undef for system CA
    tls_skip_verify  => $ENV{KAFKA_TLS_INSECURE} ? 1 : 0,
    sasl             => {
        mechanism => $ENV{KAFKA_SASL_MECH} // 'SCRAM-SHA-256',
        username  => $ENV{KAFKA_USER}      // 'admin',
        password  => $ENV{KAFKA_PASS}      // 'secret123',
    },
    on_error => sub { warn "kafka: @_\n"; EV::break },
);

$kafka->connect(sub {
    my $meta = shift;
    printf "connected; cluster has %d broker(s)\n",
        scalar @{$meta->{brokers}};

    $kafka->produce('tls-test', 'k', 'authenticated payload', sub {
        my ($r, $err) = @_;
        die "produce: $err" if $err;
        print "produced over TLS+SASL at offset ",
            $r->{topics}[0]{partitions}[0]{base_offset}, "\n";
        $kafka->close(sub { EV::break });
    });
});

EV::run;



( run in 1.434 second using v1.01-cache-2.11-cpan-13bb782fe5a )