App-Basis-Queue
view release on metacpan or search on metacpan
lib/App/Basis/Queue.pm view on Meta::CPAN
# when do we want events from
after => $params->{after},
persist => $params->{persist},
ev_count => 0,
counter => 0
} ;
}
# -----------------------------------------------------------------------------
=head2 listen
Listen to all subcribed channels. Loops forever unless told to stop.
If there is a persistent message in a queue, this will be passed to the callback before the other records.
B<Parameters>
Hash of
=over
=item events (optional)
Minimum number of events to listen for, stop after this many, may stop after more - this is across ALL the subscriptions
=item datetime (optional)
Unix epoch time or parsable datetime when to stop listening
=item persist (optional)
Include the most recent persistent item, if subscribed using using a wild card,
this will match all the queues and could find multiple persistent items
=item listen_delay (optional)
Override the class delay, obtain events at this rate
=back
B<returns>
Number of chatter events actually passed to ALL the handlers
B<Example usage>
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
$queue->subscribe( '/logs/*', \&handler) ;
$queue->listen() ; # listening forever
# or listen until christmas, checking every 30s
$queue->subscribe( '/presents/*', \&handler) ;
$queue->listen( datetime => '2015-12-25', listen_delay => 30) ;
=cut
sub listen
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
# decide where the delay comes from
my $delay = $params->{listen_delay} || $self->{listen_delay} ;
if ( ref($params) ne 'HASH' ) {
warn "listen accepts a hash or a hashref of parameters" ;
return 0 ;
}
if ( $params->{datetime} ) {
my (@dt) = _parse_datetime( $params->{datetime} ) ;
if ( $dt[1] ) {
# used to check against time() later
$params->{datetime} = $dt[1] ;
}
}
if ( !keys %{ $self->{subscriptions} } ) {
warn "you have not subscribed to any queues" ;
return 0 ;
}
$self->{ev_count} = 0 ;
# clean things up before we listen
foreach my $qmatch ( sort keys %{ $self->{subscriptions} } ) {
my $subs = $self->{subscriptions}->{$qmatch} ;
$subs->{counter} = 0 ;
$subs->{ev_count} = 0 ;
}
# loop forever unless there is a reason to stop
my $started = 0 ;
while (1) {
foreach my $qmatch ( sort keys %{ $self->{subscriptions} } ) {
my $subs = $self->{subscriptions}->{$qmatch} ;
# we may not want the most recent persistent record
next if ( !$started && !$subs->{persist} ) ;
my $items ;
if ( !$started ) {
$items = $self->_recent_persist( queue => $qmatch ) ;
} else {
$items = $self->_recent_chatter(
queue => $qmatch,
after => $subs->{after},
counter => $subs->{counter},
) ;
}
my $state ;
foreach my $row ( @{$items} ) {
$subs->{ev_count}++ ; # count matches for this queue
$self->{ev_count}++ ; # and overall
try {
# qmatch is the name of the queue matcher
$state = $subs->{callback}
->( $self, $row->{queue_name}, $row ) ;
if ( $row->{added} gt $subs->{after} ) {
( run in 1.932 second using v1.01-cache-2.11-cpan-39bf76dae61 )