Cache-Memcached-AnyEvent

 view release on metacpan or  search on metacpan

lib/Cache/Memcached/AnyEvent.pm  view on Meta::CPAN

    $self->{_active_server_count}++;
    $self->{_server_handles}->{ $server } = $h;
}

sub add_server {
    my ($self, @servers) = @_;
    push @{$self->{servers}}, @servers;
    $self->selector->set_servers( $self->{servers} );
}

sub connect {
    my $self = shift;

    return if $self->{_is_connecting} || $self->{_is_connected};
    $self->disconnect();
    delete $self->{_is_disconnecting};

    $self->{_is_connecting} = {};
    $self->{_active_servers} = [];
    $self->{_active_server_count} = 0;
    my $connect_cv = AE::cv {
        delete $self->{_is_connecting};
        if (! $self->{_active_server_count}) {
            die "Failed to connect to any memcached servers";
        }

        $self->{_is_connected} = 1;

        if (my $cb = $self->{ on_connect }) {
            $cb->($self);
        }
        $self->_drain_queue;
    };

    foreach my $server ( @{ $self->{ servers } }) {
        $self->_connect_one($server, $connect_cv);
    }
}

sub delete {
    my ($self, @args) = @_;
    my $cb       = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
    my $noreply  = !defined $cb;
    $self->_push_queue( $self->protocol->delete($self, @args, $noreply, $cb ) );
}

sub get_handle { shift->{_server_handles}->{ $_[0] } }

{
    my $installer = sub {
        my ($name, $code) = @_;
        {
            no strict 'refs';
            *{$name} = $code;
        }
    };

    foreach my $method ( qw( get get_multi ) ) {
        $installer->( $method, sub {
            my ($self, $keys, $cb) = @_;
            Scalar::Util::weaken($self);

            $self->_push_queue( $self->protocol->$method($self, $keys, $cb) );
        } );
    }

    foreach my $method ( qw( decr incr ) ) {
        $installer->($method, sub {
            my ($self, @args) = @_;
            my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
            my ($key, $value, $initial) = @args;
            Scalar::Util::weaken($self);
            $self->_push_queue( $self->protocol->$method( $self, $key, $value, $initial, $cb ) );
        });
    }

    foreach my $method ( qw(add append prepend replace set) ) {
        $installer->($method, sub {
            my ($self, @args) = @_;
            my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
            my ($key, $value, $exptime, $noreply) = @args;
            Scalar::Util::weaken($self);
            $self->_push_queue( $self->protocol->$method( $self, $key, $value, $exptime, $noreply, $cb ) );
        });
    }
}

sub stats {
    my ($self, @args) = @_;
    my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
    my ($name) = @args;
    $self->_push_queue( $self->protocol->stats($self, $name, $cb) );
}

sub version {
    my ($self, $cb) = @_;
    $self->_push_queue( $self->protocol->version($self, $cb) );
}

sub flush_all {
    my ($self, @args) = @_;
    my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
    my $noreply = !!$cb;
    my $delay = shift @args || 0;
    $self->_push_queue( $self->protocol->flush_all($self, $delay, $noreply, $cb) );
}

sub _push_queue {
    my ($self, $cb) = @_;
    push @{$self->{queue}}, $cb;
    $self->_drain_queue unless $self->{_is_draining};
}

sub _drain_queue {
    my $self = shift;
    if (! $self->{_is_connected}) {
        if ($self->{_is_connecting} or $self->{_is_disconnecting}) {
            return;
        }
        $self->connect;
        return;
    }

    if ($self->{_is_draining}) {
        return;
    }
    my $cb = shift @{$self->{queue}};
    return unless $cb;

    my $guard = AnyEvent::Util::guard {
        my $t; $t = AE::timer 0, 0, sub {
            $self->{_is_draining}--;
            undef $t;
            $self->_drain_queue;
        };
    };
    $self->{_is_draining}++;
    $cb->($guard);
}

sub disconnect {
    my $self = shift;



( run in 2.106 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )