Cache-Memcached-AnyEvent

 view release on metacpan or  search on metacpan

lib/Cache/Memcached/AnyEvent.pm  view on Meta::CPAN

sub serializer {
    $_[0]->{serializer} ||=
        $_[0]->_build_helper('Serializer', $_[0]->{serializer_class});
}

sub _build_helper {
    my ($self, $prefix, $klass) = @_;
    if ($klass !~ s/^\+//) {
        $klass = "Cache::Memcached::AnyEvent::${prefix}::$klass";
    }

    Module::Runtime::check_module_name($klass);
    Module::Runtime::require_module($klass);
    return $klass->new(memcached => $self);
}

BEGIN {
    foreach my $attr ( qw(auto_reconnect compress_threshold reconnect_delay servers namespace) ) {
        eval <<EOSUB;
            sub $attr {
                my \$self = shift;
                my \$ret  = \$self->{$attr};
                if (\@_) {
                    \$self->{$attr} = shift;
                }
                return \$ret;
            }
EOSUB
        Carp::confess if $@;
    }
}

sub _connect_one {
    my ($self, $server, $cv) = @_;

    return if $self->{_is_connecting}->{$server};

    $cv->begin if $cv;
    my ($host, $port) = split( /:/, $server );
    $port ||= 11211;

    $self->{_is_connecting}->{$server} = tcp_connect $host, $port, sub {
        $self->_on_tcp_connect($server, @_);
        $cv->end if $cv;
    };
}

sub _on_tcp_connect {
    my ($self, $server, $fh, $host, $port) = @_;

    delete $self->{_is_connecting}->{$server}; # thanks, buddy
    if (! $fh) {
        # connect failed
        warn("failed to connect to $server [$!]");

        if ($self->{auto_reconnect} > $self->{_connect_attempts}->{ $server }++) {
            # XXX this watcher holds a reference to $self, which means
            # it will make your program wait for it to fire until 
            # auto_reconnect attempts have been made. 
            # if you need to close immediately, you need to call disconnect
            $self->{_reconnect}->{$server} = AE::timer $self->{reconnect_delay}, 0, sub {
                delete $self->{_reconnect}->{$server};
                $self->_connect_one($server);
            };
        }
    } else {
        my $h; $h = AnyEvent::Handle->new(
            fh => $fh,
            on_drain => sub {
                my $h = shift;
                if (defined $h->{wbuf} && $h->{wbuf} eq "") {
                    delete $h->{wbuf}; $h->{wbuf} = "";
                }
                if (defined $h->{rbuf} && $h->{rbuf} eq "") {
                    delete $h->{rbuf}; $h->{rbuf} = "";
                }
            },
            on_eof => sub {
                my $h = delete $self->{_server_handles}->{$server};
                $h->destroy();
                undef $h;
            },
            on_error => sub {
                my $h = delete $self->{_server_handles}->{$server};
                $h->destroy();
                $self->_connect_one($server) if $self->{auto_reconnect};
                undef $h;
            },
        );

        $self->_add_active_server( $server, $h );
        delete $self->{_connect_attempts}->{ $server };
        $self->protocol->prepare_handle( $fh );
    }
}

sub _add_active_server {
    my ($self, $server, $h) = @_;
    push @{$self->{_active_servers}}, $server;
    $self->{_active_server_count}++;
    $self->{_server_handles}->{ $server } = $h;
}

sub add_server {
    my ($self, @servers) = @_;
    push @{$self->{servers}}, @servers;
    $self->selector->set_servers( $self->{servers} );
}

sub connect {
    my $self = shift;

    return if $self->{_is_connecting} || $self->{_is_connected};
    $self->disconnect();
    delete $self->{_is_disconnecting};

    $self->{_is_connecting} = {};
    $self->{_active_servers} = [];
    $self->{_active_server_count} = 0;
    my $connect_cv = AE::cv {
        delete $self->{_is_connecting};

lib/Cache/Memcached/AnyEvent.pm  view on Meta::CPAN

            my ($key, $value, $initial) = @args;
            Scalar::Util::weaken($self);
            $self->_push_queue( $self->protocol->$method( $self, $key, $value, $initial, $cb ) );
        });
    }

    foreach my $method ( qw(add append prepend replace set) ) {
        $installer->($method, sub {
            my ($self, @args) = @_;
            my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
            my ($key, $value, $exptime, $noreply) = @args;
            Scalar::Util::weaken($self);
            $self->_push_queue( $self->protocol->$method( $self, $key, $value, $exptime, $noreply, $cb ) );
        });
    }
}

sub stats {
    my ($self, @args) = @_;
    my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
    my ($name) = @args;
    $self->_push_queue( $self->protocol->stats($self, $name, $cb) );
}

sub version {
    my ($self, $cb) = @_;
    $self->_push_queue( $self->protocol->version($self, $cb) );
}

sub flush_all {
    my ($self, @args) = @_;
    my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
    my $noreply = !!$cb;
    my $delay = shift @args || 0;
    $self->_push_queue( $self->protocol->flush_all($self, $delay, $noreply, $cb) );
}

sub _push_queue {
    my ($self, $cb) = @_;
    push @{$self->{queue}}, $cb;
    $self->_drain_queue unless $self->{_is_draining};
}

sub _drain_queue {
    my $self = shift;
    if (! $self->{_is_connected}) {
        if ($self->{_is_connecting} or $self->{_is_disconnecting}) {
            return;
        }
        $self->connect;
        return;
    }

    if ($self->{_is_draining}) {
        return;
    }
    my $cb = shift @{$self->{queue}};
    return unless $cb;

    my $guard = AnyEvent::Util::guard {
        my $t; $t = AE::timer 0, 0, sub {
            $self->{_is_draining}--;
            undef $t;
            $self->_drain_queue;
        };
    };
    $self->{_is_draining}++;
    $cb->($guard);
}

sub disconnect {
    my $self = shift;

    my $handles = delete $self->{_server_handles};
    foreach my $handle ( values %$handles ) {
        if ($handle) {
            eval {
                $handle->stop_read;
                $handle->push_shutdown();
                $handle->destroy();
            };
        }
    }

    delete $self->{_is_connecting};
    delete $self->{_is_connected};
    delete $self->{_is_draining};

    $self->{_server_handles} = {};
    $self->{_is_disconnecting}++;
}

sub DESTROY {
    my $self = shift;
    $self->disconnect;
}
    
sub _get_handle_for {
    $_[0]->selector->get_handle($_[1]);
}

sub _prepare_key {
    my ($self, $key) = @_;
    if (my $ns = $self->{namespace}) {
        $key = $ns . $key;
    }
    return $key;
}

sub normalize_key {
    my ($self, $key_ref) = @_;

    if (my $ns = $self->{namespace}) {
        $$key_ref =~ s/^$ns//;
    }
}

sub deserialize {
    my ($self, $flags_ref, $data_ref) = @_;
    if (defined $$flags_ref && defined $$data_ref) {
        if (HAVE_ZLIB && $$flags_ref & F_COMPRESS()) {



( run in 0.620 second using v1.01-cache-2.11-cpan-39bf76dae61 )