AnyEvent-ConnPool
view release on metacpan or search on metacpan
lib/AnyEvent/ConnPool.pm view on Meta::CPAN
}
elsif (ref $p1 eq __PACKAGE__) {
$dispatcher = $p2;
}
else {
$dispatcher = $p1;
}
}
$dispatcher ||= $p2;
if (!$dispatcher || ref $dispatcher ne 'AnyEvent::ConnPool::Dispatcher') {
croak "No dispatcher";
}
return $dispatcher->{_pool};
}
=item B<add>
Adds connection to the pool.
=cut
sub add {
my ($self, $count) = @_;
# TODO: add count support
my $conn = $self->{constructor}->();
my $unit = AnyEvent::ConnPool::Unit->new($conn,
index => $self->{count},
constructor => $self->{constructor},
);
$self->_add_object_raw($unit);
}
=item B<get>
Returns AnyEvent::ConnPool::Unit object from the pool.
my $unit = $pool->get();
my $connection = $unit->conn();
=cut
sub get {
my ($self, $index) = @_;
if (defined $index) {
return $self->{_payload}->[$index];
}
if ($self->{index} + 1 > $self->{count}) {
$self->{index} = 0;
}
my $retval = $self->{_payload}->[$self->{index}];
if ($retval->locked()) {
$self->{locked}->{$self->{index}} = 1;
$retval = $self->get_free_connection($self->{index});
}
else {
delete $self->{locked}->{$self->{index}};
}
if (wantarray) {
$self->{index}++;
return ($index, $retval);
}
else {
$self->{index}++;
return $retval;
}
}
sub grow {
my ($self, $count) = @_;
$count ||= 1;
for (1 .. $count) {
$self->add();
}
return 1;
}
sub shrink {
my ($self, $count) = @_;
$count ||= 1;
for (1 .. $count) {
pop @{$self->{_payload}};
}
return 1;
}
# utility functions
sub get_free_connection {
my ($self, $desired_index) = @_;
my $retval = undef;
my @balanced_array = $self->balance_array($desired_index);
for my $i (@balanced_array) {
my $conn = $self->{_payload}->[$i];
unless ($conn->locked) {
$retval = $conn;
$self->{index} = $i;
last;
}
}
return $retval;
}
sub balance_array {
my ($self, $index) = @_;
my $count = $self->{count};
$index++;
$count--;
if ($index == 0 || $index >= $count) {
return (0 .. $count);
}
return (($index .. $count), 0 .. $index - 1);
}
sub _add_object_raw {
my ($self, $object, $position) = @_;
if (defined $position) {
$self->{_payload}->[$self->{index}] = $object;
}
else {
push @{$self->{_payload}}, $object;
}
$self->{count} = scalar @{$self->{_payload}};
return 1;
}
=back
=cut
1;
package AnyEvent::ConnPool::Dispatcher;
use strict;
no strict qw/refs/;
use warnings;
use Carp;
our $AUTOLOAD;
sub AUTOLOAD {
my ($d, @params) = @_;
my $program = $AUTOLOAD;
$program =~ s/.*:://;
my $conn = $d->{_pool}->get()->conn();
my $reference = sprintf("%s::%s", ref ($conn), $program);
shift;
unshift @_, $conn;
goto &{$reference};
}
1;
package AnyEvent::ConnPool::Unit;
=head1 NAME
AnyEvent::ConnPool::Unit
=head1 DESCRIPTION
Connection unit. Just wrapper around user-specified connection.
Required for transactions support.
=head1 METHODS
=over
=cut
use strict;
use warnings;
sub new {
my ($class, $object, %opts) = @_;
my ($index, $constructor) = ($opts{index}, $opts{constructor});
my $unit = {
_conn => $object,
_locked => 0,
_index => $index,
_constructor => $constructor,
};
bless $unit, $class;
return $unit;
}
=item B<conn>
Returns connection from unit object.
=cut
sub conn {
my $self = shift;
return $self->{_conn};
}
=item B<lock>
Locks current connection. After that connection shouldn't be used in balancing mechanism and never will be
returned from pool. To unlock connection you should use unlock method.
$connection->lock();
=cut
sub lock {
my ($self) = @_;
$self->{_locked} = 1;
return 1;
}
=item B<unlock>
Unlocks connection and returns it to the balancing scheme.
$connection->unlock();
=cut
sub unlock {
my ($self) = @_;
delete $self->{_locked};
return 1;
}
=item B<locked>
Returns true if connection is locked.
if ($connection->locked()) {
...
}
=cut
sub locked {
my ($self) = @_;
return $self->{_locked};
}
sub index {
my ($self) = @_;
return $self->{_index};
}
sub reconnect {
my ($self) = @_;
if ($self->{_constructor}) {
$self->{_conn} = $self->{_constructor}->();
}
return 1;
}
=back
=cut
1;
( run in 0.824 second using v1.01-cache-2.11-cpan-df04353d9ac )