Kafka
view release on metacpan or search on metacpan
lib/Kafka/Connection.pm view on Meta::CPAN
Defines maximum number of last non-fatal errors that we keep in log. Use method L</nonfatal_errors> to
access those errors.
=item C<dont_load_supported_api_versions =E<gt> $boolean>
Optional, default value is 0 (false).
If set to false, when communicating with a broker, the client will
automatically try to find out the best version numbers to use for each of the
API endpoints.
If set to true, the client will always use
C<$Kafka::Protocol::DEFAULT_APIVERSION> as API version.
WARNING: API versions are supported starting from Kafka 0.10. Set this parameter to true
if you're connecting to 0.9.
=back
=cut
lib/Kafka/Connection.pm view on Meta::CPAN
my $server_api_versions = $server_metadata->{_api_versions} = {};
# use empty data if client doesn't want to detect API versions
$self->{dont_load_supported_api_versions}
and return $server_api_versions;
# call the server and try to get the supported API versions
my $api_versions = [];
my $error;
try {
# The ApiVersions API endpoint is only supported on Kafka versions >
# 0.10.0.0 so this call may fail. We simply ignore this failure and
# carry on.
$api_versions = $self->_get_supported_api_versions( $server );
}
catch {
$error = $_;
};
if( defined $error ) {
if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
lib/Kafka/Connection.pm view on Meta::CPAN
} else {
die $error;
}
}
foreach my $element (@$api_versions) {
# we want to choose which api version to use for each API call. We
# try to use the max version that the server supports, with
# fallback to the max version the protocol implements. If it's
# lower than the min version the kafka server supports, we set it
# to -1. If thie API endpoint is called, it'll die.
my $kafka_min_version = $element->{MinVersion};
my $kafka_max_version = $element->{MaxVersion};
my $api_key = $element->{ApiKey};
my $implemented_max_version = $IMPLEMENTED_APIVERSIONS->{$api_key} // -1;
my $version = $kafka_max_version;
$version > $implemented_max_version
and $version = $implemented_max_version;
$version < $kafka_min_version
and $version = -1;
$server_api_versions->{$api_key} = $version;
( run in 1.398 second using v1.01-cache-2.11-cpan-524268b4103 )