Kafka
view release on metacpan or search on metacpan
lib/Kafka/Connection.pm view on Meta::CPAN
=item *
Alarms are not used internally (namely when performing C<gethostbyname>).
=item *
Default C<$REQUEST_TIMEOUT> is used for the rest of IO operations.
=back
=over 3
=item C<SEND_MAX_ATTEMPTS =E<gt> $attempts>
Optional, int32 signed integer, default = C<$Kafka::SEND_MAX_ATTEMPTS> .
In some circumstances (leader is temporarily unavailable, outdated metadata, etc) we may fail to send a message.
This property specifies the maximum number of attempts to send a message.
The C<$attempts> should be an integer number.
=item C<RETRY_BACKOFF =E<gt> $backoff>
Optional, default = C<$Kafka::RETRY_BACKOFF> .
Since leader election takes a bit of time, this property specifies the amount of time,
in milliseconds, that the producer waits before refreshing the metadata.
The C<$backoff> should be an integer number.
=item C<AutoCreateTopicsEnable =E<gt> $mode>
Optional, default value is 0 (false).
Kafka BUG "[KAFKA-1124]" (Fixed in Kafka 0.8.2):
I<AutoCreateTopicsEnable> controls how this module handles the first access to non-existent topic
when C<auto.create.topics.enable> in server configuration is C<true>.
If I<AutoCreateTopicsEnable> is false (default),
the first access to non-existent topic produces an exception;
however, the topic is created and next attempts to access it will succeed.
If I<AutoCreateTopicsEnable> is true, this module waits
(according to the C<SEND_MAX_ATTEMPTS> and C<RETRY_BACKOFF> properties)
until the topic is created,
to avoid errors on the first access to non-existent topic.
If C<auto.create.topics.enable> in server configuration is C<false>, this setting has no effect.
=item C<MaxLoggedErrors =E<gt> $number>
Optional, default value is 100.
Defines maximum number of last non-fatal errors that we keep in log. Use method L</nonfatal_errors> to
access those errors.
=item C<dont_load_supported_api_versions =E<gt> $boolean>
Optional, default value is 0 (false).
If set to false, when communicating with a broker, the client will
automatically try to find out the best version numbers to use for each of the
API endpoints.
If set to true, the client will always use
C<$Kafka::Protocol::DEFAULT_APIVERSION> as API version.
WARNING: API versions are supported starting from Kafka 0.10. Set this parameter to true
if you're connecting to 0.9.
=back
=cut
my %Param_mapping = (
'timeout' => 'Timeout',
);
sub new {
my ( $class, %params ) = @_;
my $self = bless {
host => q{},
port => $KAFKA_SERVER_PORT,
broker_list => [],
Timeout => $REQUEST_TIMEOUT,
async => 0,
ip_version => undef,
SEND_MAX_ATTEMPTS => $SEND_MAX_ATTEMPTS,
RETRY_BACKOFF => $RETRY_BACKOFF,
AutoCreateTopicsEnable => 0,
MaxLoggedErrors => 100,
dont_load_supported_api_versions => 0,
sasl_username => undef,
sasl_password => undef,
sasl_mechanizm => undef,
}, $class;
foreach my $p ( keys %params ) {
my $attr = $Param_mapping{ $p } // $p;
if( exists $self->{ $attr } ) {
$self->{ $attr } = $params{ $p };
}
else {
$self->_error( $ERROR_MISMATCH_ARGUMENT, $p );
}
}
$self->_error( $ERROR_MISMATCH_ARGUMENT, 'host' )
unless defined( $self->{host} ) && ( $self->{host} eq q{} || defined( _STRING( $self->{host} ) ) );
$self->_error( $ERROR_NOT_BINARY_STRING, 'host' )
if utf8::is_utf8( $self->{host} );
$self->_error( $ERROR_MISMATCH_ARGUMENT, 'port' )
unless _POSINT( $self->{port} );
$self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'Timeout (%s)', $self->{Timeout} ) )
unless !defined( $self->{Timeout} ) || ( defined _NUMBER( $self->{Timeout} ) && int( 1000 * $self->{Timeout} ) >= 1 && int( $self->{Timeout} * 1000 ) <= $MAX_INT32 );
$self->_error( $ERROR_MISMATCH_ARGUMENT, 'broker_list' )
unless _ARRAY0( $self->{broker_list} );
$self->_error( $ERROR_MISMATCH_ARGUMENT, 'SEND_MAX_ATTEMPTS' )
unless _POSINT( $self->{SEND_MAX_ATTEMPTS} );
$self->_error( $ERROR_MISMATCH_ARGUMENT, 'RETRY_BACKOFF' )
unless _POSINT( $self->{RETRY_BACKOFF} );
$self->_error( $ERROR_MISMATCH_ARGUMENT, 'MaxLoggedErrors' )
lib/Kafka/Connection.pm view on Meta::CPAN
unless $self->_is_like_server( $server );
my ( $host, $port ) = _split_host_port( $server );
my $correct_server = $self->_build_server_name( $host, $port );
$IO_cache->{ $correct_server } = {
NodeId => undef,
IO => undef,
host => $host,
port => $port,
};
}
$self->_error( $ERROR_MISMATCH_ARGUMENT, 'server is not specified' )
unless keys( %$IO_cache );
return $self;
}
#-- public attributes ----------------------------------------------------------
=head2 METHODS
The following methods are defined for the C<Kafka::Producer> class:
=cut
#-- public methods -------------------------------------------------------------
=head3 C<get_known_servers>
Returns the list of known Kafka servers (in host:port or [IPv6_host]:port format).
=cut
sub get_known_servers {
my ( $self ) = @_;
return keys %{ $self->{_IO_cache} };
}
sub _get_api_versions {
my ( $self, $server ) = @_;
my $server_metadata = $self->{_IO_cache}->{$server};
defined $server_metadata
or die "Fatal error: server '$server' is unknown in IO cache, which should not happen";
# if we have cached data, just use it
defined $server_metadata->{_api_versions}
and return $server_metadata->{_api_versions};
# no cached data. Initialize empty one
my $server_api_versions = $server_metadata->{_api_versions} = {};
# use empty data if client doesn't want to detect API versions
$self->{dont_load_supported_api_versions}
and return $server_api_versions;
# call the server and try to get the supported API versions
my $api_versions = [];
my $error;
try {
# The ApiVersions API endpoint is only supported on Kafka versions >
# 0.10.0.0 so this call may fail. We simply ignore this failure and
# carry on.
$api_versions = $self->_get_supported_api_versions( $server );
}
catch {
$error = $_;
};
if( defined $error ) {
if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
if( $error->code == $ERROR_MISMATCH_ARGUMENT ) {
# rethrow known fatal errors
die $error;
}
$self->_remember_nonfatal_error( $error->code, $error, $server );
} else {
die $error;
}
}
foreach my $element (@$api_versions) {
# we want to choose which api version to use for each API call. We
# try to use the max version that the server supports, with
# fallback to the max version the protocol implements. If it's
# lower than the min version the kafka server supports, we set it
# to -1. If thie API endpoint is called, it'll die.
my $kafka_min_version = $element->{MinVersion};
my $kafka_max_version = $element->{MaxVersion};
my $api_key = $element->{ApiKey};
my $implemented_max_version = $IMPLEMENTED_APIVERSIONS->{$api_key} // -1;
my $version = $kafka_max_version;
$version > $implemented_max_version
and $version = $implemented_max_version;
$version < $kafka_min_version
and $version = -1;
$server_api_versions->{$api_key} = $version;
}
return $server_api_versions;
}
# Returns the list of supported API versions. This is not really. *Warning*,
# this call works only against Kafka 1.10.0.0
sub _get_supported_api_versions {
my ( $self, $broker ) = @_;
my $CorrelationId = _get_CorrelationId();
my $decoded_request = {
CorrelationId => $CorrelationId,
ClientId => q{},
ApiVersion => 0,
};
say STDERR format_message( '[%s] apiversions request: %s',
scalar( localtime ),
$decoded_request,
) if $self->debug_level;
my $encoded_request = $protocol{ $APIKEY_APIVERSIONS }->{encode}->( $decoded_request );
my $encoded_response_ref;
# receive apiversions. We use a code block because it's actually a loop where
# you can do last.
{
$self->_connectIO( $broker )
or last;
my $sent = $self->_sendIO( $broker, $encoded_request )
or last;
$encoded_response_ref = $self->_receiveIO( $broker );
}
unless ( $encoded_response_ref ) {
# NOTE: it is possible to repeat the operation here
$self->_error( $ERROR_CANNOT_RECV );
}
my $decoded_response = $protocol{ $APIKEY_APIVERSIONS }->{decode}->( $encoded_response_ref );
say STDERR format_message( '[%s] apiversions response: %s',
scalar( localtime ),
$decoded_response,
) if $self->debug_level;
( defined( $decoded_response->{CorrelationId} ) && $decoded_response->{CorrelationId} == $CorrelationId )
# FATAL error
or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
my $ErrorCode = $decoded_response->{ErrorCode};
( run in 0.837 second using v1.01-cache-2.11-cpan-524268b4103 )