App-Basis-Queue
view release on metacpan or search on metacpan
lib/App/Basis/Queue.pm view on Meta::CPAN
# qmatch is the name of the queue matcher
$state = $subs->{callback}
->( $self, $row->{queue_name}, $row ) ;
if ( $row->{added} gt $subs->{after} ) {
$subs->{after} = $row->{added} ;
}
if ( $row->{counter} > $subs->{counter} ) {
$subs->{counter} = $row->{counter} ;
}
}
catch {
warn "listen: error in callback $@" ;
} ;
}
}
$started = 1 ;
last
if ( $params->{events}
&& $self->{ev_count} > $params->{events} ) ;
last
if ( $params->{datetime}
&& time() > $params->{datetime} ) ;
# wait a bit to allow the queues to fillup
sleep($delay) ;
}
return $self->{ev_count} ;
}
# -----------------------------------------------------------------------------
=head2 unsubscribe
Unsubscribe from a named queue.
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcard allowed
=back
B<Example usage>
sub handler {
state $counter = 0 ;
my $q = shift ; # we get the queue object
# the queue trigger that matched, the actual queue name and the data
my ($qmatch, $queue, $data) = @_ ;
# we are only interested in 10 messages
if( ++$counter > 10) {
$q->unsubscribe( queue => $queue) ;
} else {
say Data::Dumper( $data) ;
}
}
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
$queue->subscribe( queue => '/logs/*', callback => \&handler) ;
$queue->listen() ;
=cut
sub unsubscribe
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "unsubscribe accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
if ( $params->{queue} ) {
# does not matter if the queue name does not exist!
delete $self->{subscriptions}->{ $params->{queue} } ;
}
}
# -----------------------------------------------------------------------------
=head2 purge_tasks
Purge will remove all processed task items and failures/deadletters (process_failure >= 5).
These are completely removed from the database
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcard allowed
=item before (optional)
Unix epoch or parsable datetime before which items should be purged
defaults to 'now'
=back
B<Example usage>
my $before = $queue->stats( queue => 'queue_name', before => '2015-11-24') ;
$queue->purge_tasks( queue => 'queue_name') ;
my $after = $queue->stats( queue => 'queue_name') ;
say "removed " .( $before->{total_records} - $after->{total_records}) ;
( run in 0.508 second using v1.01-cache-2.11-cpan-39bf76dae61 )