Cassandra-Client
view release on metacpan or search on metacpan
lib/Cassandra/Client/Pool.pm view on Meta::CPAN
sub connect_if_needed {
my ($self, $callback)= @_;
my $max_connect= $self->{max_connections} - $self->{count};
return if $max_connect <= 0;
$max_connect -= keys %{$self->{connecting}};
return if $max_connect <= 0;
return if $self->{shutdown};
if ($self->{_in_connect}) {
return;
}
local $self->{_in_connect}= 1;
my $done= 0;
my $expect= $max_connect;
for (1..$max_connect) {
$expect-- unless $self->spawn_new_connection(sub {
$done++;
if ($done == $expect) {
$callback->() if $callback;
undef $callback;
}
});
}
if ($callback && !$expect) {
$callback->();
}
}
sub spawn_new_connection {
my ($self, $callback)= @_;
my $host= $self->{policy}->get_next_candidate;
return unless $host;
my $connection= Cassandra::Client::Connection->new(
client => $self->{client},
options => $self->{options},
host => $host,
async_io => $self->{async_io},
metadata => $self->{metadata},
);
$self->{connecting}{$host}= $connection;
$self->{policy}->set_connecting($host);
$connection->connect(sub {
my ($error)= @_;
delete $self->{connecting}{$host};
if ($error) {
$self->{policy}->set_disconnected($host);
if (my $waiters= delete $self->{wait_connect}) {
if ($self->{count} && @$waiters) {
warn 'We have callbacks waiting for a connection while we\'re connected';
}
my $max_conn= $self->{max_connections};
my $known_node_count= $self->{policy}->known_node_count;
my $max_attempts = ($max_conn < $known_node_count ? $max_conn : $known_node_count) + 1;
for my $waiter (@$waiters) {
if ((++$waiter->{attempts}) >= $max_attempts || !%{$self->{connecting}}) {
$waiter->{callback}->("Failed to connect to server: $error");
} else {
push @{$self->{wait_connect} ||= []}, $waiter;
}
}
}
$self->connect_if_needed;
} else {
$self->{policy}->set_connected($host);
$self->add($connection);
}
$callback->($error);
});
return 1;
}
# Events coming from the network
sub event_added_node {
my ($self, $ipaddress)= @_;
$self->{network_status}->event_added_node($ipaddress);
}
sub event_removed_node {
my ($self, $ipaddress)= @_;
$self->{network_status}->event_removed_node($ipaddress);
if (my $conn= $self->{pool}{$ipaddress}) {
$conn->shutdown("Removed from pool");
}
}
# Events coming from network_status
sub on_new_node {
my ($self, $node)= @_;
$self->{policy}->on_new_node($node);
}
sub on_removed_node {
my ($self, $node)= @_;
$self->{policy}->on_removed_node($node);
}
1;
__END__
=pod
( run in 0.596 second using v1.01-cache-2.11-cpan-39bf76dae61 )