MogileFS-Server
view release on metacpan or search on metacpan
lib/MogileFS/ConnectionPool.pm view on Meta::CPAN
my ($self, $ip, $port) = @_;
# calls MogileFS::Connection::{HTTP,Mogstored}->new:
my $conn = $self->{class}->new($ip, $port);
if ($conn) {
# register the connection
$self->{fdmap}->{$conn->fd} = $conn;
$conn->set_pool($self);
return $conn;
} else {
# EMFILE/ENFILE should never happen as the capacity for this
# pool is far under the system defaults. Just give up on
# EMFILE/ENFILE like any other error.
return "failed to create socket to $ip:$port ($!)";
}
}
# retrieves a connection, may return undef if at capacity
sub _conn_get {
my ($self, $ip, $port) = @_;
# if we have idle connections, always use them first
$self->_conn_idle_get($ip, $port) || $self->_conn_new_maybe($ip, $port);
}
# Pulls a connection out of the pool for synchronous use.
# This may create a new connection (independent of pool limits).
# The connection returned by this is _blocking_. This is currently
# only used by replicate.
sub conn_get {
my ($self, $ip, $port) = @_;
my $conn = $self->_conn_idle_get($ip, $port);
if ($conn) {
# in case the connection never comes back, let refcounting close() it:
delete $self->{fdmap}->{$conn->fd};
} else {
$conn = $self->_conn_new($ip, $port);
unless (ref $conn) {
$! = $conn; # $conn is an error message :<
return;
}
delete $self->{fdmap}->{$conn->fd};
my $timeout = MogileFS->config("node_timeout");
MogileFS::Util::wait_for_writeability($conn->fd, $timeout) or return;
}
return $conn;
}
# retrieves a connection from the connection pool and executes
# inflight_cb on it. If the pool is at capacity, this will queue the task.
# This relies on Danga::Socket->EventLoop
sub start {
my ($self, $ip, $port, $inflight_cb) = @_;
my $conn = $self->_conn_get($ip, $port);
if ($conn) {
$self->_conn_run($conn, $inflight_cb);
} else { # we're too busy right now, queue up
$self->enqueue($ip, $port, $inflight_cb);
}
}
# returns the total number of connections we have
sub _total_connections {
my ($self) = @_;
return scalar keys %{$self->{fdmap}};
}
# marks a connection as no longer inflight, returns the inflight
# callback if the connection was active, undef if not
sub inflight_cb_expire {
my ($self, $conn) = @_;
my $inflight_cb = delete $self->{inflight}->{$conn->key}->{$conn->fd};
$self->{total_inflight}-- if $inflight_cb;
return $inflight_cb;
}
# schedules the event loop to dequeue and run a task on the next
# tick of the Danga::Socket event loop. Call this
# 1) whenever a task is enqueued
# 2) whenever a task is complete
sub schedule_queued {
my ($self) = @_;
# AddTimer(0) to avoid potential stack overflow
$self->{scheduled} ||= Danga::Socket->AddTimer(0, sub {
$self->{scheduled} = undef;
my $queue = $self->{queue};
my $total_capacity = $self->{total_capacity};
my $i = 0;
while ($self->{total_inflight} < $total_capacity
&& $i <= (scalar(@$queue) - 1)) {
my ($ip, $port, $cb) = @{$queue->[$i]};
my $conn = $self->_conn_get($ip, $port);
if ($conn) {
splice(@$queue, $i, 1); # remove from queue
$self->_conn_run($conn, $cb);
} else {
# this queue object cannot be dequeued, skip it for now
$i++;
}
}
});
}
# Call this when done using an (inflight) connection
# This possibly places a connection in the connection pool.
# This will close the connection of the pool is already at capacity.
# This will also start the next queued callback, or retry if needed
sub conn_persist {
my ($self, $conn) = @_;
# schedule the next request if we're done with any connection
$self->schedule_queued;
( run in 0.543 second using v1.01-cache-2.11-cpan-39bf76dae61 )