MogileFS-Server

 view release on metacpan or  search on metacpan

lib/MogileFS/ConnectionPool.pm  view on Meta::CPAN

    my ($self, $ip, $port) = @_;

    # calls MogileFS::Connection::{HTTP,Mogstored}->new:
    my $conn = $self->{class}->new($ip, $port);
    if ($conn) {
        # register the connection
        $self->{fdmap}->{$conn->fd} = $conn;
        $conn->set_pool($self);

        return $conn;
    } else {
        # EMFILE/ENFILE should never happen as the capacity for this
        # pool is far under the system defaults.  Just give up on
        # EMFILE/ENFILE like any other error.
        return "failed to create socket to $ip:$port ($!)";
    }
}

# retrieves a connection, may return undef if at capacity
sub _conn_get {
    my ($self, $ip, $port) = @_;

    # if we have idle connections, always use them first
    $self->_conn_idle_get($ip, $port) || $self->_conn_new_maybe($ip, $port);
}

# Pulls a connection out of the pool for synchronous use.
# This may create a new connection (independent of pool limits).
# The connection returned by this is _blocking_.  This is currently
# only used by replicate.
sub conn_get {
    my ($self, $ip, $port) = @_;
    my $conn = $self->_conn_idle_get($ip, $port);

    if ($conn) {
        # in case the connection never comes back, let refcounting close() it:
        delete $self->{fdmap}->{$conn->fd};
    } else {
        $conn = $self->_conn_new($ip, $port);
        unless (ref $conn) {
            $! = $conn; # $conn is an error message :<
            return;
        }
        delete $self->{fdmap}->{$conn->fd};
        my $timeout = MogileFS->config("node_timeout");
        MogileFS::Util::wait_for_writeability($conn->fd, $timeout) or return;
    }

    return $conn;
}

# retrieves a connection from the connection pool and executes
# inflight_cb on it.  If the pool is at capacity, this will queue the task.
# This relies on Danga::Socket->EventLoop
sub start {
    my ($self, $ip, $port, $inflight_cb) = @_;

    my $conn = $self->_conn_get($ip, $port);
    if ($conn) {
        $self->_conn_run($conn, $inflight_cb);
    } else { # we're too busy right now, queue up
        $self->enqueue($ip, $port, $inflight_cb);
    }
}

# returns the total number of connections we have
sub _total_connections {
    my ($self) = @_;
    return scalar keys %{$self->{fdmap}};
}

# marks a connection as no longer inflight, returns the inflight
# callback if the connection was active, undef if not
sub inflight_cb_expire {
    my ($self, $conn) = @_;
    my $inflight_cb = delete $self->{inflight}->{$conn->key}->{$conn->fd};
    $self->{total_inflight}-- if $inflight_cb;

    return $inflight_cb;
}

# schedules the event loop to dequeue and run a task on the next
# tick of the Danga::Socket event loop.  Call this
# 1) whenever a task is enqueued
# 2) whenever a task is complete
sub schedule_queued {
    my ($self) = @_;

    # AddTimer(0) to avoid potential stack overflow
    $self->{scheduled} ||= Danga::Socket->AddTimer(0, sub {
        $self->{scheduled} = undef;
        my $queue = $self->{queue};

        my $total_capacity = $self->{total_capacity};
        my $i = 0;

        while ($self->{total_inflight} < $total_capacity
               && $i <= (scalar(@$queue) - 1)) {
            my ($ip, $port, $cb) = @{$queue->[$i]};

            my $conn = $self->_conn_get($ip, $port);
            if ($conn) {
                splice(@$queue, $i, 1); # remove from queue
                $self->_conn_run($conn, $cb);
            } else {
                # this queue object cannot be dequeued, skip it for now
                $i++;
            }
        }
    });
}

# Call this when done using an (inflight) connection
# This possibly places a connection in the connection pool.
# This will close the connection of the pool is already at capacity.
# This will also start the next queued callback, or retry if needed
sub conn_persist {
    my ($self, $conn) = @_;

    # schedule the next request if we're done with any connection
    $self->schedule_queued;



( run in 0.543 second using v1.01-cache-2.11-cpan-39bf76dae61 )