Cache-Elasticache-Memcache

 view release on metacpan or  search on metacpan

MANIFEST  view on Meta::CPAN

META.json
META.yml
Makefile.PL
README
cpanfile
dist.ini
lib/Cache/Elasticache/Memcache.pm
maint/carton_install_with_configuredeps.pl
t/author-pod-syntax.t
t/basic.t
t/config_endpoint.t
t/methods.t

README  view on Meta::CPAN

NAME

    Cache::Elasticache::Memcache - A wrapper for Cache::Memacached::Fast
    with support for AWS's auto reconfiguration mechanism

SYNOPSIS

        use Cache::Elasticache::Memcache;
    
        my $memd = new Cache::Elasticache::Memcache->new({
            config_endpoint => 'foo.bar',
            update_period => 180,
            # All other options are passed on to Cache::Memcached::Fast
            ...
        });
    
        # Will update the server list from the configuration endpoint
        $memd->updateServers();
    
        # Will update the serverlist from the configuration endpoint if the time since
        # the last time the server list was checked is greater than the update period
        # specified when the $memd object was created.
        $memd->checkServers();
    
        # Class method to retrieve a server list from a configuration endpoint.
        Cache::Elasticache::Memcache->getServersFromEndpoint('foo.bar');
    
        # All other supported methods are handled by Cache::Memcached::Fast
    
        # N.B. This library is currently under development

DESCRIPTION

    A wrapper for Cache::Memacached::Fast with support for AWS's auto
    reconfiguration mechanism. It makes use of an AWS elasticache memcached
    cluster's configuration endpoint to discover the memcache servers in
    the cluster and periodically check the current server list to adapt to
    a changing cluster.

UNDER DEVELOPMENT DISCALIMER

    N.B. This module is still under development. It should work, but things
    may change under the hood. I plan to imporove the resiliance with
    better timeout handling of communication when updating the server list.
    I'm toying with the idea of making the server list lookup asyncronus,
    however that may add a level of complexity not worth the benefits. Also
    I'm investigating switching to Dist::Milla. I'm open to suggestions,
    ideas and pull requests.

CONSTRUCTOR

        Cache::Elasticache::Memcache->new({
            config_endpoint => 'foo.bar',
            update_period => 180,
            ...
        })

 Constructor parameters

    config_endpoint

      AWS elasticache memcached cluster config endpoint location

    update_period

      The minimum period (in seconds) to wait between updating the server
      list. Defaults to 180 seconds

METHODS

    Supported Cache::Memcached::Fast methods

README  view on Meta::CPAN

          $memd->touch($key, $expiration_time)
          $memd->touch_multi([$key],[$key, $expiration_time])
          $memd->flush_all($delay)
          $memd->nowait_push()
          $memd->server_versions()
          $memd->disconnect_all()

    checkServers

          my $memd = Cache::Elasticache::Memcache->new({
              config_endpoint => 'foo.bar'
          })
      
          ...
      
          $memd->checkServers();

      Trigger the the server list to be updated if the time passed since
      the server list was last updated is greater than the update period
      (default 180 seconds).

    updateServers

          my $memd = Cache::Elasticache::Memcache->new({
              config_endpoint => 'foo.bar'
          })
      
          ...
      
          $memd->updateServers();

      This method will update the server list regardles of how much time
      has passed since the server list was last checked.

CLASS METHODS

    getServersFromEndpoint

          Cache::Elasticache::Memcache->getServersFromEndpoint('foo.bar');

      This class method will retrieve the server list for a given
      configuration endpoint.

BUGS

    github issues
    <https://github.com/zebardy/cache-elasticache-memcache/issues>

SEE ALSO

    Cache::Memcached::Fast - The undelying library used to communicate with
    memcached servers (apart from autodiscovery)

lib/Cache/Elasticache/Memcache.pm  view on Meta::CPAN


=head1 NAME

Cache::Elasticache::Memcache - A wrapper for L<Cache::Memacached::Fast> with support for AWS's auto reconfiguration mechanism

=head1 SYNOPSIS

    use Cache::Elasticache::Memcache;

    my $memd = new Cache::Elasticache::Memcache->new({
        config_endpoint => 'foo.bar',
        update_period => 180,
        # All other options are passed on to Cache::Memcached::Fast
        ...
    });

    # Will update the server list from the configuration endpoint
    $memd->updateServers();

    # Will update the serverlist from the configuration endpoint if the time since
    # the last time the server list was checked is greater than the update period
    # specified when the $memd object was created.
    $memd->checkServers();

    # Class method to retrieve a server list from a configuration endpoint.
    Cache::Elasticache::Memcache->getServersFromEndpoint('foo.bar');

    # All other supported methods are handled by Cache::Memcached::Fast

    # N.B. This library is currently under development

=head1 DESCRIPTION

A wrapper for L<Cache::Memacached::Fast> with support for AWS's auto reconfiguration mechanism. It makes use of an AWS elasticache memcached cluster's configuration endpoint to discover the memcache servers in the cluster and periodically check the c...

=head1 UNDER DEVELOPMENT DISCALIMER

N.B. This module is still under development. It should work, but things may change under the hood. I plan to imporove the resiliance with better timeout handling of communication when updating the server list. I'm toying with the idea of making the s...

=cut

use Carp;
use IO::Socket::IP;
use IO::Socket::Timeout;

lib/Cache/Elasticache/Memcache.pm  view on Meta::CPAN

use Try::Tiny;
use Scalar::Util qw(blessed);

our $VERSION = '0.0.5';

=pod

=head1 CONSTRUCTOR

    Cache::Elasticache::Memcache->new({
        config_endpoint => 'foo.bar',
        update_period => 180,
        ...
    })

=head2 Constructor parameters

=over

=item config_endpoint

AWS elasticache memcached cluster config endpoint location

=item update_period

The minimum period (in seconds) to wait between updating the server list. Defaults to 180 seconds

=back

=cut

sub new {
    my $class = shift;
    my ($conf) = @_;
    my $self = bless {}, $class;

    my $args = (@_ == 1) ? shift : { @_ };  # hashref-ify args

    croak "config_endpoint must be speccified" if (!defined $args->{'config_endpoint'});
    croak "servers is not a valid constructors parameter" if (defined $args->{'servers'});

    $self->{'config_endpoint'} = delete @{$args}{'config_endpoint'};

    $args->{servers} = $self->getServersFromEndpoint($self->{'config_endpoint'}) if(defined $self->{'config_endpoint'});
    $self->{_last_update} = time;

    $self->{update_period} = exists $args->{update_period} ? $args->{update_period} : 180;

    $self->{'_args'} = $args;
    $self->{_memd} = Cache::Memcached::Fast->new($args);
    $self->{servers} = $args->{servers};

    return $self;
}

lib/Cache/Elasticache/Memcache.pm  view on Meta::CPAN

        $self->checkServers;
        return $self->{'_memd'}->$method(@_);
    };
}

=pod

=item checkServers

    my $memd = Cache::Elasticache::Memcache->new({
        config_endpoint => 'foo.bar'
    })

    ...

    $memd->checkServers();

Trigger the the server list to be updated if the time passed since the server list was last updated is greater than the update period (default 180 seconds).

=cut

sub checkServers {
    my $self = shift;
    $self->{_current_update_period} = (defined $self->{_current_update_period}) ? $self->{_current_update_period}: $self->{update_period} - rand(10);
    if ( defined $self->{'config_endpoint'} && (time - $self->{_last_update}) > $self->{_current_update_period} ) {
        $self->updateServers();
        $self->{_current_update_period} = undef;
    }
}

=pod

=item updateServers

    my $memd = Cache::Elasticache::Memcache->new({
        config_endpoint => 'foo.bar'
    })

    ...

    $memd->updateServers();

This method will update the server list regardles of how much time has passed since the server list was last checked.

=cut

sub updateServers {
    my $self = shift;

    my $servers = $self->getServersFromEndpoint($self->{'config_endpoint'});

    ## Cache::Memcached::Fast does not support updating the server list after creation
    ## Therefore we must create a new object.

    if ( $self->_hasServerListChanged($servers) ) {
        $self->{_args}->{servers} = $servers;
        $self->{_memd} = Cache::Memcached::Fast->new($self->{'_args'});
    }

    $self->{servers} = $servers;

lib/Cache/Elasticache/Memcache.pm  view on Meta::CPAN

=back

=head1 CLASS METHODS

=over

=item getServersFromEndpoint

    Cache::Elasticache::Memcache->getServersFromEndpoint('foo.bar');

This class method will retrieve the server list for a given configuration endpoint.

=cut

sub getServersFromEndpoint {
    my $invoker = shift;
    my $config_endpoint = shift;
    my $data = "";
    # TODO: make use of "connect_timeout" (default 0.5s) and "io_timeout" (default 0.2s) constructor parameters
    # my $args = shift;
    # $connect_timeout = exists $args->{connect_timeout} ? $args->{connect_timeout} : $class::default_connect_timeout;
    # $io_timeout = exists $args->{io_timeout} ? $args->{io_timeout} : $class::default_io_timeout;
    my $socket = (blessed($invoker)) ? $invoker->{_sockets}->{$config_endpoint} : undef;

    for my $i (0..2) {
        unless (defined $socket && $socket->connected()) {
            $socket = IO::Socket::IP->new(PeerAddr => $config_endpoint, Timeout => 10, Proto => 'tcp');
            croak "Unable to connect to server: ".$config_endpoint." - $!" unless $socket;
            $socket->sockopt(SO_KEEPALIVE,1);
            $socket->autoflush(1);
            IO::Socket::Timeout->enable_timeouts_on($socket);
            $socket->read_timeout(0.5);
# This is currently commented out as it was breaking under perl 5.24 for me. Need to investigate!
#            $socket->write_Timeout(0.5);
        }

        try {
            $socket->send("config get cluster\r\n");

lib/Cache/Elasticache/Memcache.pm  view on Meta::CPAN

            no warnings 'exiting';
            next;
        };
        if ($data ne "") {
            last;
        } else {
            $socket = undef;
        }
    }
    if (blessed $invoker) {
        $invoker->{_sockets}->{$config_endpoint} = $socket;
    } else {
        $socket->close() if (blessed $socket);
    }
    return $invoker->_parseConfigResponse($data);
}

sub _parseConfigResponse {
    my $class = shift;
    my $data = shift;
    return [] unless (defined $data && $data ne '');

lib/Cache/Elasticache/Memcache.pm  view on Meta::CPAN

                my ($host, $ip, $port) = split('\|',$node);
                push(@servers,$ip.':'.$port);
            }
        }
    }
    return \@servers;
}

sub DESTROY {
    my $self = shift;
    foreach my $config_endpoint (keys %{$self->{_sockets}}) {
        my $socket = $self->{_sockets}->{$config_endpoint};
        if (defined $self->{_socket} && $socket->connected()) {
            $self->{_socket}->close();
        }
    }
};

1;
__END__

=pod

t/basic.t  view on Meta::CPAN

        $mock->mock('sockopt', sub { return 1 });
        $mock->mock('setsockopt', sub { return 1 });
        $mock->mock('write_Timeout', sub { return 1 });
        $mock->mock('send', sub { return 1 });
        my @lines = @{$self->config_lines};
        $mock->mock('getline', sub { return shift @lines });
        $mock->mock('close', sub { return 1 });
        my $overrides = Sub::Override->new()
                                     ->replace('Cache::Memcached::Fast::new' , sub { my $object = shift; my @args = @_; $self->last_parent_object($object); $self->last_parent_args(\@args) })
                                     ->replace('Cache::Memcached::Fast::DESTROY' , sub { })
                                     ->replace('IO::Socket::IP::new', sub{ my $object = shift; my @args = @_; croak "config_endpoint:-".{@args}->{'PeerAddr'} unless {@args}->{'PeerAddr'} eq 'good:11211'; return $mock });
        return $overrides;
    }
);

has last_parent_object => (
    is => 'rw',
    default => undef
);

has last_parent_args => (

t/basic.t  view on Meta::CPAN

    default => undef,
);

test "hello world" => sub {
    my $self = shift;
    ok defined $self->test_class->VERSION;
};

test "instantiation" => sub {
    my $self = shift;
    isa_ok $self->test_class->new(config_endpoint => 'good:11211'), $self->test_class;
};

test "update_period defaults to 180 seconds" => sub {
    my $self = shift;
    my $object = $self->test_class->new( config_endpoint => 'good:11211' );
    is $object->{update_period}, 180;
};

test "requires config_endpoint" => sub {
    my $self = shift;
    dies_ok { $self->test_class->new( ) };
    like $@, '/^config_endpoint must be speccified/';
};

test "constructor does not accept servers argument" => sub {
    my $self = shift;
    dies_ok { $self->test_class->new(
        config_endpoint => 'good:11211', servers => ['good:11211']
    ) };
    like $@, '/^servers is not a valid constructors parameter/';
};

run_me;
done_testing;
1;

t/config_endpoint.t  view on Meta::CPAN

use Symbol;

use Cache::Elasticache::Memcache;

has test_class => (
    is => 'ro',
    lazy => 1,
    default => 'Cache::Elasticache::Memcache'
);

has endpoint_location => (
    is => 'ro',
    lazy => 1,
    default => 'test.lwgyhw.cfg.usw2.cache.amazonaws.com:11211',
);

has last_parent_object => (
    is => 'rw',
    default => undef
);

t/config_endpoint.t  view on Meta::CPAN

    is => 'ro',
    lazy => 1,
    clearer => '_clear_parent_overrides',
    default => sub {
        my $self = shift;
        my $overrides = Sub::Override->new()
                                     ->replace('IO::Socket::IP::new',
            sub{
                my $object = shift;
                my @args = @_;
                return $self->mock_sockets->{'good'} if ({@args}->{'PeerAddr'} eq $self->endpoint_location);
                return $self->mock_sockets->{'bad_send'} if ({@args}->{'PeerAddr'} eq 'bad_send:11211');
                croak "GAAAAAAAA";
            })
                                     ->replace('Cache::Memcached::Fast::new' ,
            sub {
                my $object = shift;
                my @args = @_;


                $self->last_parent_object($object);

t/config_endpoint.t  view on Meta::CPAN


   return $mock;
}

sub default_mock_method {
    my $self = shift;
    my $method_name = shift;
    return sub { return 1 };
}

test "get_servers_from_endpoint" => sub {
    my $self = shift;
    my $result = $self->test_class->getServersFromEndpoint($self->endpoint_location);
    cmp_deeply( $result, ['10.112.21.1:11211','10.112.21.2:11211', '10.112.21.3:11211'] );
};

test "get_servers_from_endpoint_split_END" => sub {
    my $self = shift;
    $self->config_lines(["\nmycluster.0001.cache.amazonaws.com|10.112.21.4|11211\n\r\n","E","ND\r\n"]);
    $self->reset_overrides;
    my $result = $self->test_class->getServersFromEndpoint($self->endpoint_location);
    cmp_deeply( $result, ['10.112.21.4:11211'] );
};

test "get_servers_from_endpoint_timeout" => sub {
    my $self = shift;
    $self->config_lines(["\nmycluster.0001.cache.amazonaws.com|10.112.21.4|11211\n\r\n"]);
    $self->reset_overrides;
    my $result = $self->test_class->getServersFromEndpoint($self->endpoint_location);
    cmp_deeply( $result, ['10.112.21.4:11211'] );
};

test "update_servers_no_change" => sub {
    my $self = shift;

    my $memd = $self->test_class->new(config_endpoint => $self->endpoint_location);
    my $original_update = $memd->{_last_update};
    my $original_servers = $memd->{servers};
    my $original_memd_obj = $memd->{_memd};
    sleep 1;

    $self->reset_overrides;
    delete $memd->{_sockets}->{$self->endpoint_location};
    $memd->updateServers;

    ok $original_update < $memd->{_last_update};
    cmp_deeply($original_servers, $memd->{servers});
    cmp_ok($original_memd_obj, '==', $memd->{_memd});
};

test "update_servers" => sub {
    my $self = shift;

    my $memd = $self->test_class->new(config_endpoint => $self->endpoint_location);
    my $original_update = $memd->{_last_update};
    my $original_servers = $memd->{servers};
    my $original_memd_obj = $memd->{_memd};
    sleep 1;

    $memd->{servers} = [ '10.112.21.1:11211' ];
    ok !eq_deeply($original_servers, $memd->{servers});

    $self->reset_overrides;
    delete $memd->{_sockets}->{$self->endpoint_location};
    $memd->updateServers;

    ok $original_update < $memd->{_last_update};
    cmp_deeply($original_servers, $memd->{servers});
    cmp_ok($original_memd_obj, '!=', $memd->{_memd});
};

test "check_servers_within_update_period" => sub {
    my $self = shift;

    my $memd = $self->test_class->new(
        config_endpoint => $self->endpoint_location,
        update_period => 9999999,
    );

    my $original_update = $memd->{_last_update};
    my $original_servers = $memd->{servers};
    my $original_memd_obj = $memd->{_memd};
    sleep 2;

    $self->reset_overrides;
    delete $memd->{_sockets}->{$self->endpoint_location};
    $memd->updateServers;

    ok $original_update < $memd->{_last_update};
    cmp_deeply($original_servers, $memd->{servers});
    cmp_ok($original_memd_obj, '==', $memd->{_memd});
};

test "check_servers_outside_update_period" => sub {
    my $self = shift;

    my $memd = $self->test_class->new(
        config_endpoint => $self->endpoint_location,
        update_period => 1,
    );

    my $original_update = $memd->{_last_update};
    my $original_servers = $memd->{servers};
    my $original_memd_obj = $memd->{_memd};
    sleep 2;

    $memd->{servers} = [ '10.112.21.1:11211' ];
    ok !eq_deeply($original_servers, $memd->{servers});

    $self->reset_overrides;
    delete $memd->{_sockets}->{$self->endpoint_location};
    $memd->updateServers;

    ok $original_update < $memd->{_last_update};
    cmp_deeply($original_servers, $memd->{servers});
    cmp_ok($original_memd_obj, '!=', $memd->{_memd});
};

test "retry up to 3 times due to faiure" => sub {
    my $self = shift;

t/config_endpoint.t  view on Meta::CPAN

        $autoflush_count++ if $called->[0] eq 'autoflush';
    }
    is $send_count, 3;
    is $autoflush_count, 3;
};

test "Socket is reused if possible" => sub {
    my $self = shift;

    my $memd = $self->test_class->new(
        config_endpoint => $self->endpoint_location,
        update_period => 1,
    );

    $self->mock_sockets->{'good'}->clear;

    $memd->getServersFromEndpoint($self->endpoint_location);
    ok !$self->mock_sockets->{'good'}->called('autoflush');
};

run_me;
done_testing;
1;

t/methods.t  view on Meta::CPAN

);

before run_test => sub {
    my $self = shift;
    $self->mock_base_memd->clear();
};

test "methods" => sub {
    my $self = shift;
    my $memd = $self->test_class->new(
        config_endpoint => 'dave',
    );
    foreach my $method (@{$self->methods}) {
        subtest "Method: $method" => sub {
                $memd->{servers} = 0;
                ok !$self->mock_base_memd->called($method);
                is $memd->$method('test'), 'deadbeef';
                ok $self->mock_base_memd->called($method);
                ok $memd->{servers};
        }
    }



( run in 0.698 second using v1.01-cache-2.11-cpan-b61123c0432 )