Kafka

 view release on metacpan or  search on metacpan

lib/Kafka/Connection.pm  view on Meta::CPAN


Defines maximum number of last non-fatal errors that we keep in log. Use method L</nonfatal_errors> to
access those errors.

=item C<dont_load_supported_api_versions =E<gt> $boolean>

Optional, default value is 0 (false).

If set to false, when communicating with a broker, the client will
automatically try to find out the best version numbers to use for each of the
API endpoints.

If set to true, the client will always use
C<$Kafka::Protocol::DEFAULT_APIVERSION> as API version.

WARNING: API versions are supported starting from Kafka 0.10. Set this parameter to true
if you're connecting to 0.9.

=back

=cut

lib/Kafka/Connection.pm  view on Meta::CPAN

    my $server_api_versions = $server_metadata->{_api_versions} = {};

    # use empty data if client doesn't want to detect API versions
    $self->{dont_load_supported_api_versions}
      and return $server_api_versions;

    # call the server and try to get the supported API versions
    my $api_versions = [];
    my $error;
    try {
        # The ApiVersions API endpoint is only supported on Kafka versions >
        # 0.10.0.0 so this call may fail. We simply ignore this failure and
        # carry on.
        $api_versions = $self->_get_supported_api_versions( $server );
    }
    catch {
        $error = $_;
    };

    if( defined $error ) {
        if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {

lib/Kafka/Connection.pm  view on Meta::CPAN

        } else {
            die $error;
        }
    }

    foreach my $element (@$api_versions) {
        # we want to choose which api version to use for each API call. We
        # try to use the max version that the server supports, with
        # fallback to the max version the protocol implements. If it's
        # lower than the min version the kafka server supports, we set it
        # to -1. If thie API endpoint is called, it'll die.
        my $kafka_min_version = $element->{MinVersion};
        my $kafka_max_version = $element->{MaxVersion};
        my $api_key = $element->{ApiKey};
        my $implemented_max_version = $IMPLEMENTED_APIVERSIONS->{$api_key} // -1;
        my $version = $kafka_max_version;
        $version > $implemented_max_version
          and $version = $implemented_max_version;
        $version < $kafka_min_version
          and $version = -1;
        $server_api_versions->{$api_key} = $version;



( run in 1.398 second using v1.01-cache-2.11-cpan-524268b4103 )