Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client/Pool.pm  view on Meta::CPAN


sub connect_if_needed {
    my ($self, $callback)= @_;

    my $max_connect= $self->{max_connections} - $self->{count};
    return if $max_connect <= 0;

    $max_connect -= keys %{$self->{connecting}};
    return if $max_connect <= 0;

    return if $self->{shutdown};

    if ($self->{_in_connect}) {
        return;
    }
    local $self->{_in_connect}= 1;

    my $done= 0;
    my $expect= $max_connect;
    for (1..$max_connect) {
        $expect-- unless $self->spawn_new_connection(sub {
            $done++;

            if ($done == $expect) {
                $callback->() if $callback;
                undef $callback;
            }
        });
    }
    if ($callback && !$expect) {
        $callback->();
    }
}

sub spawn_new_connection {
    my ($self, $callback)= @_;

    my $host= $self->{policy}->get_next_candidate;
    return unless $host;

    my $connection= Cassandra::Client::Connection->new(
        client => $self->{client},
        options => $self->{options},
        host => $host,
        async_io => $self->{async_io},
        metadata => $self->{metadata},
    );

    $self->{connecting}{$host}= $connection;
    $self->{policy}->set_connecting($host);

    $connection->connect(sub {
        my ($error)= @_;

        delete $self->{connecting}{$host};
        if ($error) {
            $self->{policy}->set_disconnected($host);

            if (my $waiters= delete $self->{wait_connect}) {
                if ($self->{count} && @$waiters) {
                    warn 'We have callbacks waiting for a connection while we\'re connected';
                }

                my $max_conn= $self->{max_connections};
                my $known_node_count= $self->{policy}->known_node_count;
                my $max_attempts = ($max_conn < $known_node_count ? $max_conn : $known_node_count) + 1;

                for my $waiter (@$waiters) {
                    if ((++$waiter->{attempts}) >= $max_attempts || !%{$self->{connecting}}) {
                        $waiter->{callback}->("Failed to connect to server: $error");
                    } else {
                        push @{$self->{wait_connect} ||= []}, $waiter;
                    }
                }
            }

            $self->connect_if_needed;
        } else {
            $self->{policy}->set_connected($host);

            $self->add($connection);
        }

        $callback->($error);
    });

    return 1;
}

# Events coming from the network
sub event_added_node {
    my ($self, $ipaddress)= @_;
    $self->{network_status}->event_added_node($ipaddress);
}

sub event_removed_node {
    my ($self, $ipaddress)= @_;
    $self->{network_status}->event_removed_node($ipaddress);

    if (my $conn= $self->{pool}{$ipaddress}) {
        $conn->shutdown("Removed from pool");
    }
}

# Events coming from network_status
sub on_new_node {
    my ($self, $node)= @_;
    $self->{policy}->on_new_node($node);
}

sub on_removed_node {
    my ($self, $node)= @_;
    $self->{policy}->on_removed_node($node);
}

1;

__END__

=pod



( run in 0.596 second using v1.01-cache-2.11-cpan-39bf76dae61 )