Kafka

 view release on metacpan or  search on metacpan

lib/Kafka/Connection.pm  view on Meta::CPAN


=item *

Alarms are not used internally (namely when performing C<gethostbyname>).

=item *

Default C<$REQUEST_TIMEOUT> is used for the rest of IO operations.

=back

=over 3

=item C<SEND_MAX_ATTEMPTS =E<gt> $attempts>

Optional, int32 signed integer, default = C<$Kafka::SEND_MAX_ATTEMPTS> .

In some circumstances (leader is temporarily unavailable, outdated metadata, etc) we may fail to send a message.
This property specifies the maximum number of attempts to send a message.
The C<$attempts> should be an integer number.

=item C<RETRY_BACKOFF =E<gt> $backoff>

Optional, default = C<$Kafka::RETRY_BACKOFF> .

Since leader election takes a bit of time, this property specifies the amount of time,
in milliseconds, that the producer waits before refreshing the metadata.
The C<$backoff> should be an integer number.

=item C<AutoCreateTopicsEnable =E<gt> $mode>

Optional, default value is 0 (false).

Kafka BUG "[KAFKA-1124]" (Fixed in Kafka 0.8.2):
I<AutoCreateTopicsEnable> controls how this module handles the first access to non-existent topic
when C<auto.create.topics.enable> in server configuration is C<true>.
If I<AutoCreateTopicsEnable> is false (default),
the first access to non-existent topic produces an exception;
however, the topic is created and next attempts to access it will succeed.

If I<AutoCreateTopicsEnable> is true, this module waits
(according to the C<SEND_MAX_ATTEMPTS> and C<RETRY_BACKOFF> properties)
until the topic is created,
to avoid errors on the first access to non-existent topic.

If C<auto.create.topics.enable> in server configuration is C<false>, this setting has no effect.

=item C<MaxLoggedErrors =E<gt> $number>

Optional, default value is 100.

Defines maximum number of last non-fatal errors that we keep in log. Use method L</nonfatal_errors> to
access those errors.

=item C<dont_load_supported_api_versions =E<gt> $boolean>

Optional, default value is 0 (false).

If set to false, when communicating with a broker, the client will
automatically try to find out the best version numbers to use for each of the
API endpoints.

If set to true, the client will always use
C<$Kafka::Protocol::DEFAULT_APIVERSION> as API version.

WARNING: API versions are supported starting from Kafka 0.10. Set this parameter to true
if you're connecting to 0.9.

=back

=cut

my %Param_mapping = (
    'timeout' => 'Timeout',
);

sub new {
    my ( $class, %params ) = @_;

    my $self = bless {
        host                    => q{},
        port                    => $KAFKA_SERVER_PORT,
        broker_list             => [],
        Timeout                 => $REQUEST_TIMEOUT,
        async                   => 0,
        ip_version              => undef,
        SEND_MAX_ATTEMPTS       => $SEND_MAX_ATTEMPTS,
        RETRY_BACKOFF           => $RETRY_BACKOFF,
        AutoCreateTopicsEnable  => 0,
        MaxLoggedErrors         => 100,
        dont_load_supported_api_versions => 0,
        sasl_username           => undef,
        sasl_password           => undef,
        sasl_mechanizm          => undef,
    }, $class;

    foreach my $p ( keys %params ) {
        my $attr = $Param_mapping{ $p } // $p;
        if( exists $self->{ $attr } ) {
            $self->{ $attr } = $params{ $p };
        }
        else {
            $self->_error( $ERROR_MISMATCH_ARGUMENT, $p );
        }
    }

    $self->_error( $ERROR_MISMATCH_ARGUMENT, 'host' )
        unless defined( $self->{host} ) && ( $self->{host} eq q{} || defined( _STRING( $self->{host} ) ) );
    $self->_error( $ERROR_NOT_BINARY_STRING, 'host' )
        if utf8::is_utf8( $self->{host} );
    $self->_error( $ERROR_MISMATCH_ARGUMENT, 'port' )
        unless _POSINT( $self->{port} );
    $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'Timeout (%s)', $self->{Timeout} ) )
        unless !defined( $self->{Timeout} ) || ( defined _NUMBER( $self->{Timeout} ) && int( 1000 * $self->{Timeout} ) >= 1 && int( $self->{Timeout} * 1000 ) <= $MAX_INT32 );
    $self->_error( $ERROR_MISMATCH_ARGUMENT, 'broker_list' )
        unless _ARRAY0( $self->{broker_list} );
    $self->_error( $ERROR_MISMATCH_ARGUMENT, 'SEND_MAX_ATTEMPTS' )
        unless _POSINT( $self->{SEND_MAX_ATTEMPTS} );
    $self->_error( $ERROR_MISMATCH_ARGUMENT, 'RETRY_BACKOFF' )
        unless _POSINT( $self->{RETRY_BACKOFF} );
    $self->_error( $ERROR_MISMATCH_ARGUMENT, 'MaxLoggedErrors' )

lib/Kafka/Connection.pm  view on Meta::CPAN

            unless $self->_is_like_server( $server );
        my ( $host, $port ) = _split_host_port( $server );
        my $correct_server = $self->_build_server_name( $host, $port );
        $IO_cache->{ $correct_server } = {
            NodeId  => undef,
            IO      => undef,
            host    => $host,
            port    => $port,
        };
    }

    $self->_error( $ERROR_MISMATCH_ARGUMENT, 'server is not specified' )
        unless keys( %$IO_cache );

    return $self;
}

#-- public attributes ----------------------------------------------------------

=head2 METHODS

The following methods are defined for the C<Kafka::Producer> class:

=cut

#-- public methods -------------------------------------------------------------

=head3 C<get_known_servers>

Returns the list of known Kafka servers (in host:port or [IPv6_host]:port format).

=cut
sub get_known_servers {
    my ( $self ) = @_;

    return keys %{ $self->{_IO_cache} };
}

sub _get_api_versions {
    my ( $self, $server ) = @_;

    my $server_metadata = $self->{_IO_cache}->{$server};
    defined $server_metadata
      or die "Fatal error: server '$server' is unknown in IO cache, which should not happen";

    # if we have cached data, just use it
    defined $server_metadata->{_api_versions}
      and return $server_metadata->{_api_versions};

    # no cached data. Initialize empty one
    my $server_api_versions = $server_metadata->{_api_versions} = {};

    # use empty data if client doesn't want to detect API versions
    $self->{dont_load_supported_api_versions}
      and return $server_api_versions;

    # call the server and try to get the supported API versions
    my $api_versions = [];
    my $error;
    try {
        # The ApiVersions API endpoint is only supported on Kafka versions >
        # 0.10.0.0 so this call may fail. We simply ignore this failure and
        # carry on.
        $api_versions = $self->_get_supported_api_versions( $server );
    }
    catch {
        $error = $_;
    };

    if( defined $error ) {
        if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
            if( $error->code == $ERROR_MISMATCH_ARGUMENT ) {
                # rethrow known fatal errors
                die $error;
            }
            $self->_remember_nonfatal_error( $error->code, $error, $server );
        } else {
            die $error;
        }
    }

    foreach my $element (@$api_versions) {
        # we want to choose which api version to use for each API call. We
        # try to use the max version that the server supports, with
        # fallback to the max version the protocol implements. If it's
        # lower than the min version the kafka server supports, we set it
        # to -1. If thie API endpoint is called, it'll die.
        my $kafka_min_version = $element->{MinVersion};
        my $kafka_max_version = $element->{MaxVersion};
        my $api_key = $element->{ApiKey};
        my $implemented_max_version = $IMPLEMENTED_APIVERSIONS->{$api_key} // -1;
        my $version = $kafka_max_version;
        $version > $implemented_max_version
          and $version = $implemented_max_version;
        $version < $kafka_min_version
          and $version = -1;
        $server_api_versions->{$api_key} = $version;
    }

    return $server_api_versions;
}

# Returns the list of supported API versions. This is not really. *Warning*,
# this call works only against Kafka 1.10.0.0

sub _get_supported_api_versions {
    my ( $self, $broker ) = @_;

    my $CorrelationId = _get_CorrelationId();
    my $decoded_request = {
        CorrelationId   => $CorrelationId,
        ClientId        => q{},
        ApiVersion => 0,
    };
    say STDERR format_message( '[%s] apiversions request: %s',
            scalar( localtime ),
            $decoded_request,
        ) if $self->debug_level;
    my $encoded_request = $protocol{ $APIKEY_APIVERSIONS }->{encode}->( $decoded_request );

    my $encoded_response_ref;

    # receive apiversions. We use a code block because it's actually a loop where
    # you can do last.
    {
        $self->_connectIO( $broker )
          or last;
        my $sent = $self->_sendIO( $broker, $encoded_request )
          or last;
        $encoded_response_ref = $self->_receiveIO( $broker );
    }

    unless ( $encoded_response_ref ) {
        # NOTE: it is possible to repeat the operation here
        $self->_error( $ERROR_CANNOT_RECV );
    }

    my $decoded_response = $protocol{ $APIKEY_APIVERSIONS }->{decode}->( $encoded_response_ref );
    say STDERR format_message( '[%s] apiversions response: %s',
            scalar( localtime ),
            $decoded_response,
        ) if $self->debug_level;
    ( defined( $decoded_response->{CorrelationId} ) && $decoded_response->{CorrelationId} == $CorrelationId )
        # FATAL error
        or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
    my $ErrorCode = $decoded_response->{ErrorCode};



( run in 0.837 second using v1.01-cache-2.11-cpan-524268b4103 )