Cache-Memcached-Queue
view release on metacpan or search on metacpan
lib/Cache/Memcached/Queue.pm view on Meta::CPAN
#This queue already exists?
my $real_first = $self->memcached->get($first);
confess "Fatal error! Can't load or create queue! Check memcached server!" if $flag and !defined($real_first);
if ( defined($real_first) ) {
$self->first( $self->memcached->get($first) );
$self->last( $self->memcached->get($last) );
$self->size( $self->memcached->get($size) );
$self->name( $self->memcached->get($name) ) if !defined $self->name;
$self->qid($qid);
$ok = 1;
}
else {
say q[Queue '] . $self->qid . q[' doesn't exists! Creating...];
$self->memcached->set($qid . 'LOCKED',$$,0);
$self->memcached->set($name,$self->name,0);
$self->memcached->set($first,$self->qid . '1',0,0);
$self->memcached->set($last,$self->qid . '1',0,0);
$self->memcached->set($size,0,0);
$self->memcached->set($qid . 'LOCKED',0,0);
say q[Queue '] . $self->qid . q[' was created!];
$self->load(1);
}
}
return $ok;
}
sub enq {
my ( $self, $parameters ) = @_;
my ( $ok, $expire, ) = ( 0, undef, undef );
if(!defined($parameters)){
say 'No value was defined to enqueue!';
}
else {
my $value = undef;
if(ref($parameters) eq ''){
$value = $parameters // '';
}
elsif(!defined($parameters->{value})){
$value = $parameters || '';
}
else {
$value = $parameters->{value} || '';
}
#checar se é necessário a serialização
if(ref($value)){
#serializar
my $serialized = $self->serializer->serialize($value);
$value = $serialized;
undef $serialized;
}
$self->load;
if(!$self->_is_locked || $self->_unlock){
$self->_lock;
my $size = $self->size // 0;
#checando se a fila esta cheia
if($self->max_enq > 0 && $self->size >= $self->max_enq){
say "Queue is full!";
}
else {
my $last = $1 if $self->last =~ /_(\d+)$/ // 1;
#checando se last == first e se existe algum valor
my $first_value = $self->memcached->get($self->first);
if( $first_value) {
$last++;
}
$size++;
my $new_last = $self->qid . $last;
$self->last($new_last);
$self->memcached->set($new_last,$value,0);
}
$self->size($size);
$self->_save(['last','size']);
$self->_unlock if($self->_is_locked);
}
}
return $ok;
}
sub deq {
my ( $self, ) = @_;
my ( $last_item,$value ) = ( undef,undef );
$self->load;
if(!$self->_is_locked || $self->_unlock ){
$self->_lock;
my $size = $self->size;
if(!$size){
say 'Queue is empty!';
}
else {
my $first = $1 if $self->first =~ /_(\d+)$/ // 1;
$value = $self->memcached->get($self->first) // '';
if($value =~ /^\^.*?Storable/i){
my $unserialized = $self->serializer->deserialize($value);
$value = $unserialized;
undef $unserialized;
}
$self->memcached->delete($self->first);
if($self->last ne $self->first){
$first++;
$self->first($self->qid . $first);
$size-- if($size > 0);
}
else {
$size = 0;
$self->first($self->qid . '1',0);
$self->last($self->qid . '1',0);
$self->_save(['last']);
}
}
$self->size($size);
$self->_save(['first','size']);
$self->_unlock if($self->_is_locked);
}
return $value // '';
}
sub show {
my ( $self, ) = @_;
while(!$self->_lock){
$self->load;
sleep .3;
}
my $first = $1 if $self->first =~ /_(\d+)$/ // 1;
my $last = $1 if $self->last =~ /_(\d+)$/ // 1;
foreach my $i($first..$last){
my $value = $self->memcached->get($self->qid . $i);
say "$i - $value";
}
$self->_unlock;
}
sub cleanup {
my ( $self, ) = @_;
$self->load;
$self->iterate(sub {
my $index = shift;
$self->memcached->delete($index);
});
}
sub _save {
my ( $self, $parameters ) = @_;
my $last = $self->last;
my $ok = 0;
if ( ref($parameters) !~ /ARRAY/ ) {
confess "The parameters to save data MUST BE AN ARRAYREF";
}
foreach my $k ( @{$parameters} ) {
if ( $k !~ /^name|first|last|size|max_enq|qid$/ ) {
confess "The parameter '$k' is invalid!";
}
else {
my $index = $self->qid . $k;
if ( !$self->memcached->set( $index, $self->{$k},0 ) ) {
confess "Memcached can't set a value!";
}
else {
$ok = 1;
}
}
}
return $ok;
}
sub iterate {
my ( $self, $action, $action_params ) = @_;
$self->load;
if( (!defined($action) || !$action ) ||
(defined($action) && ref($action) !~ /CODE/)
){
confess "'action' MUST be a CODE reference!";
}
elsif(defined($action_params) && ref($action_params) !~ /ARRAY/){
confess "'action_parameters' MUST be Array";
}
elsif($self->size == 0){
say STDERR "Queue '" . $self->qid . "' is empty!";
}
else {
my $first_index = $1 if $self->first =~ /(\d+)$/;
my $last_index = $1 if $self->last =~ /(\d+)$/;
say "The queue is " . $self->name;
foreach my $i($first_index .. $last_index){
#mounting index for memcached
my $mc_index = $self->qid;
$mc_index .= '_' if $mc_index !~ /_$/;
$mc_index .= $i;
my $value = $self->memcached->get($mc_index);
if(!defined($value)){
confess "An error occured trying make a 'get' operation. No value found for '$mc_index' index";
}
$action->($mc_index,$value,$action_params);
}
}
}
sub _lock {
my ($self,$pid,$lock_pid) = (shift,$$,0);
$self->load;
my $qid = $self->qid;
confess "Panic! No 'qid'!" if (!defined($qid) || !$qid);
my $lock_idx = $qid . 'LOCKED';
$lock_pid = $self->_is_locked($lock_idx);
if(!$lock_pid){
my $rs = $self->memcached->set($lock_idx,$pid,0);
confess "Memcached server can't write!" if !defined($rs);
$lock_pid = $pid;
}
else {
say "is already locked!";
$lock_pid = 0;
}
$self->load;
return $lock_pid || 0;
}
sub _unlock {
my ($self,$pid,$ok) = (shift,$$,0);
$self->load;
my $qid = $self->qid;
confess "Panic! No 'qid'!" if (!defined($qid) || !$qid);
my $lock_idx = $qid . 'LOCKED';
my $lock_pid = $self->_is_locked($lock_idx);
if($lock_pid && $lock_pid == $pid){
my $rs = $self->memcached->set($lock_idx,0,0);
confess "Memcached can't write!" if !defined($rs);
$ok = 1;
}
elsif($lock_pid && $lock_pid != $pid){
say "Is locked by another process! $lock_pid";
}
$self->load;
return $ok;
}
sub _is_locked {
my ($self,$lock_idx) = @_;
$lock_idx = 0 if !defined $lock_idx;
my $found = 0;
# confess "Parameter 'lock_idx' is mandatory!" if (!defined($lock_idx) || !$lock_idx);
if(!defined($lock_idx) || !$lock_idx){
$lock_idx = $self->qid . 'LOCKED';
}
my $lock_pid = $self->memcached->get($lock_idx); #this pid locked the queue!
# $lock_pid = 0 if $$ == $lock_pid;
# foreach my $p(@{$t->table}){
# if($p->pid == $lock_pid){
# $found = $p->pid;
# last;
# }
# }
# $lock_pid = 0 if !$found;
return $lock_pid ;
}
__PACKAGE__->meta->make_immutable;
=head1 NAME
Cache::Memcached::Queue - Simple and elegant way to persist queues on Memcached
=head1 VERSION
Version 0.1.8
unstable version
=cut
=head1 DESCRIPTION
The idea is take advantage from Cache::Memcached::Fast module using it as a back-end for
queue structures without sockets, extra protocols or extra databases to maintain queues-metadata.
All stuff is stored on Memcached! Including metadata.
This can be done adding some metadata on Memcached hash structure that controls data on
a queue structure(strict FIFO). This metadata defines identification for queues and
controls first element, last element, size(number of elements) and lock information
following patterns in their names. For stabilish this patterns, it's necessary to define
some elements:
=over
=item * prefix - WARNING! This attribute is deprecated!!! DON'T USE IT!
=item * index - WARNING! This attribute is deprecated! DON'T USE IT!
=item * name - This is a 'string' that defines a name for your queue;
=item * id - It's a unique identifier for your queue and is defined on the 'id' attribute.
You can have queues with the same name since you have different ids;
=back
=head1 SYNOPSIS
( run in 0.383 second using v1.01-cache-2.11-cpan-e1769b4cff6 )