Cache-Memcached-Queue

 view release on metacpan or  search on metacpan

lib/Cache/Memcached/Queue.pm  view on Meta::CPAN


    #This queue already exists?
    my $real_first = $self->memcached->get($first);
    confess "Fatal error! Can't load or create queue! Check memcached server!" if $flag and !defined($real_first);
    if ( defined($real_first) ) {
      $self->first( $self->memcached->get($first) );
      $self->last( $self->memcached->get($last) );
      $self->size( $self->memcached->get($size) );
      $self->name( $self->memcached->get($name) ) if !defined $self->name;
      $self->qid($qid);
      $ok = 1;
    }
    else {
      say q[Queue '] . $self->qid . q[' doesn't exists! Creating...];
      $self->memcached->set($qid . 'LOCKED',$$,0);
      $self->memcached->set($name,$self->name,0);
      $self->memcached->set($first,$self->qid . '1',0,0);
      $self->memcached->set($last,$self->qid . '1',0,0);
      $self->memcached->set($size,0,0);
      $self->memcached->set($qid . 'LOCKED',0,0);
      say q[Queue '] . $self->qid . q[' was created!];
      $self->load(1);
    }
  }
  return $ok;
}








sub enq {
  my ( $self, $parameters ) = @_;
  my ( $ok, $expire, ) = ( 0, undef, undef );
  if(!defined($parameters)){
    say 'No value was defined to enqueue!';
  }
  else {
    my $value = undef;
    if(ref($parameters) eq ''){
      $value = $parameters // '';
    }
    elsif(!defined($parameters->{value})){
      $value = $parameters || '';
    }
    else {
      $value = $parameters->{value} || '';
    }

    #checar se é necessário a serialização
    if(ref($value)){
      #serializar
      my $serialized = $self->serializer->serialize($value);
      $value = $serialized;
      undef $serialized;
    }
    $self->load;
    if(!$self->_is_locked || $self->_unlock){
      $self->_lock;
      my $size = $self->size // 0;
      #checando se a fila esta cheia
      if($self->max_enq > 0 && $self->size >= $self->max_enq){
        say "Queue is full!";
      }
      else {
        my $last = $1 if $self->last =~ /_(\d+)$/ // 1;
        #checando se last == first e se existe algum valor
        my $first_value = $self->memcached->get($self->first);
        if( $first_value) {
          $last++;
        }
        $size++;
        my $new_last = $self->qid . $last;
        $self->last($new_last);
  
        $self->memcached->set($new_last,$value,0);
      }
      $self->size($size);
      $self->_save(['last','size']);
      $self->_unlock if($self->_is_locked);
    }
  }
  return $ok;
}







sub deq {
  my ( $self, ) = @_;
  my ( $last_item,$value ) = ( undef,undef );
  $self->load;
  if(!$self->_is_locked || $self->_unlock ){
    $self->_lock;
    my $size = $self->size;
    if(!$size){
      say 'Queue is empty!';
    }
    else { 
      my $first = $1 if $self->first =~ /_(\d+)$/ // 1;
      $value = $self->memcached->get($self->first) // '';
      if($value =~ /^\^.*?Storable/i){
        my $unserialized = $self->serializer->deserialize($value);
        $value = $unserialized;
        undef $unserialized;
      }
      $self->memcached->delete($self->first);
      if($self->last ne $self->first){
        $first++;
        $self->first($self->qid . $first);
        $size-- if($size > 0);
      }
      else {
        $size = 0;
        $self->first($self->qid . '1',0);
        $self->last($self->qid . '1',0);
        $self->_save(['last']);
      }
    }
    $self->size($size);
    $self->_save(['first','size']);
    $self->_unlock if($self->_is_locked);
  }
  return $value // '';
}






sub show {
  my ( $self, ) = @_;
  while(!$self->_lock){
    $self->load;
    sleep .3;
  }
  my $first = $1 if $self->first =~ /_(\d+)$/ // 1;
  my $last = $1 if $self->last =~ /_(\d+)$/ // 1;
  foreach my $i($first..$last){
    my $value = $self->memcached->get($self->qid . $i);
    say "$i - $value";
  } 
  $self->_unlock;
}





sub cleanup {
  my ( $self, ) = @_;
  $self->load;
  $self->iterate(sub {
                  my $index = shift;
                  $self->memcached->delete($index);
                });
}





sub _save {
  my ( $self, $parameters ) = @_;
  my $last = $self->last;
  my $ok   = 0;

  if ( ref($parameters) !~ /ARRAY/ ) {
    confess "The parameters to save data MUST BE AN ARRAYREF";
  }
  foreach my $k ( @{$parameters} ) {
    if ( $k !~ /^name|first|last|size|max_enq|qid$/ ) {
        confess "The parameter '$k' is invalid!";
    }
    else {
      my $index = $self->qid . $k;
      if ( !$self->memcached->set( $index, $self->{$k},0 ) ) {
        confess "Memcached can't set a value!";
      }
      else {
        $ok = 1;
      }
    }
  }
  return $ok;
}





sub iterate {
  my ( $self, $action, $action_params ) = @_;
  $self->load;
  if( (!defined($action) || !$action ) ||  
    (defined($action) && ref($action) !~ /CODE/)
  ){
    confess "'action' MUST be a CODE reference!";
  }
  elsif(defined($action_params) && ref($action_params) !~ /ARRAY/){
    confess "'action_parameters' MUST be Array"; 
  }
  elsif($self->size == 0){
    say STDERR "Queue '" . $self->qid . "' is empty!";
  }
  else {
    my $first_index = $1 if $self->first =~ /(\d+)$/;
	my $last_index = $1 if $self->last =~ /(\d+)$/;
    say "The queue is " . $self->name;
	foreach my $i($first_index .. $last_index){
      #mounting index for memcached
      my $mc_index = $self->qid;
      $mc_index .= '_' if $mc_index !~ /_$/;
      $mc_index .= $i;
      my $value = $self->memcached->get($mc_index);
      if(!defined($value)){
        confess "An error occured trying make a 'get' operation. No value found for '$mc_index' index";
      }
      $action->($mc_index,$value,$action_params);
    }
  }
}







sub _lock {
  my ($self,$pid,$lock_pid) = (shift,$$,0);
  $self->load;
  my $qid = $self->qid;
  confess "Panic! No 'qid'!" if (!defined($qid) || !$qid);
  my $lock_idx = $qid . 'LOCKED';
  $lock_pid = $self->_is_locked($lock_idx);
  if(!$lock_pid){
    my $rs = $self->memcached->set($lock_idx,$pid,0);
    confess "Memcached server can't write!" if !defined($rs);
    $lock_pid = $pid;
  }
  else {
    say "is already locked!";
    $lock_pid = 0;
  }
  $self->load;
  return $lock_pid || 0;
}






sub _unlock {
  my ($self,$pid,$ok) = (shift,$$,0);
  $self->load;
  my $qid = $self->qid;
  confess "Panic! No 'qid'!" if (!defined($qid) || !$qid);
  my $lock_idx = $qid . 'LOCKED';
  my $lock_pid = $self->_is_locked($lock_idx);
  if($lock_pid && $lock_pid == $pid){
    my $rs = $self->memcached->set($lock_idx,0,0);
    confess "Memcached can't write!" if !defined($rs);
    $ok = 1;
  }
  elsif($lock_pid && $lock_pid != $pid){
    say "Is locked by another process! $lock_pid";

  }
  $self->load;
  return $ok;
}






sub _is_locked {
  my ($self,$lock_idx) = @_;
  $lock_idx = 0 if !defined $lock_idx;
  my $found = 0;

#  confess "Parameter 'lock_idx' is mandatory!" if (!defined($lock_idx) || !$lock_idx);
  if(!defined($lock_idx) || !$lock_idx){
    $lock_idx = $self->qid . 'LOCKED';
  }
  my $lock_pid = $self->memcached->get($lock_idx); #this pid locked the queue!
#  $lock_pid = 0 if $$ == $lock_pid;
#  foreach my $p(@{$t->table}){
#    if($p->pid == $lock_pid){
#      $found = $p->pid;
#      last;
#    }
#  }
#  $lock_pid = 0 if !$found;
  return $lock_pid ;
}





__PACKAGE__->meta->make_immutable;

=head1 NAME

 Cache::Memcached::Queue - Simple and elegant way to persist queues on Memcached 

=head1 VERSION

 Version 0.1.8

 unstable version

=cut

=head1 DESCRIPTION

The idea is take advantage from Cache::Memcached::Fast module using it as a back-end for
queue structures without sockets, extra protocols or extra databases to maintain queues-metadata.
All stuff is stored on Memcached! Including metadata.


This can be done adding some metadata on Memcached hash structure that controls data on 
a queue structure(strict FIFO). This metadata defines identification for queues and 
controls first element, last element, size(number of elements) and lock information 
following patterns in their names. For stabilish this patterns, it's necessary to define 
some elements:

=over

=item * prefix - WARNING! This attribute is deprecated!!! DON'T USE IT! 

=item * index - WARNING! This attribute is deprecated! DON'T USE IT!

=item * name - This is a 'string' that defines a name for your queue;

=item * id - It's a unique identifier for your queue and is defined on the 'id' attribute.
        You can have queues with the same name since you have different ids;



=back



=head1 SYNOPSIS



( run in 0.383 second using v1.01-cache-2.11-cpan-e1769b4cff6 )