App-Basis-Queue

 view release on metacpan or  search on metacpan

lib/App/Basis/Queue.pm  view on Meta::CPAN

        # when do we want events from
        after    => $params->{after},
        persist  => $params->{persist},
        ev_count => 0,
        counter  => 0
    } ;
}

# -----------------------------------------------------------------------------

=head2 listen

Listen to all subcribed channels. Loops forever unless told to stop.
If there is a persistent message in a queue, this will be passed to the callback before the other records.

B<Parameters>

Hash of

=over

=item events (optional)

Minimum number of events to listen for, stop after this many,  may stop after more - this is across ALL the subscriptions

=item datetime (optional)

Unix epoch time or parsable datetime when to stop listening

=item persist (optional)

Include the most recent persistent item, if subscribed using using a wild card,
this will match all the queues and could find multiple persistent items

=item listen_delay (optional)

Override the class delay, obtain events at this rate

=back

B<returns>

Number of chatter events actually passed to ALL the handlers

B<Example usage>

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;
    $queue->subscribe( '/logs/*', \&handler) ;
    $queue->listen() ;    # listening forever

    # or listen until christmas, checking every 30s
    $queue->subscribe( '/presents/*', \&handler) ;
    $queue->listen( datetime => '2015-12-25', listen_delay => 30) ;

=cut

sub listen
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;
    # decide where the delay comes from
    my $delay = $params->{listen_delay} || $self->{listen_delay} ;

    if ( ref($params) ne 'HASH' ) {
        warn "listen accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    if ( $params->{datetime} ) {
        my (@dt) = _parse_datetime( $params->{datetime} ) ;
        if ( $dt[1] ) {
            # used to check against time() later
            $params->{datetime} = $dt[1] ;
        }
    }

    if ( !keys %{ $self->{subscriptions} } ) {
        warn "you have not subscribed to any queues" ;
        return 0 ;
    }

    $self->{ev_count} = 0 ;

    # clean things up before we listen
    foreach my $qmatch ( sort keys %{ $self->{subscriptions} } ) {
        my $subs = $self->{subscriptions}->{$qmatch} ;
        $subs->{counter}  = 0 ;
        $subs->{ev_count} = 0 ;
    }

    # loop forever unless there is a reason to stop
    my $started = 0 ;
    while (1) {

        foreach my $qmatch ( sort keys %{ $self->{subscriptions} } ) {
            my $subs = $self->{subscriptions}->{$qmatch} ;

            # we may not want the most recent persistent record
            next if ( !$started && !$subs->{persist} ) ;

            my $items ;
            if ( !$started ) {
                $items = $self->_recent_persist( queue => $qmatch ) ;
            } else {
                $items = $self->_recent_chatter(
                    queue   => $qmatch,
                    after   => $subs->{after},
                    counter => $subs->{counter},
                ) ;
            }

            my $state ;
            foreach my $row ( @{$items} ) {

                $subs->{ev_count}++ ;    # count matches for this queue
                $self->{ev_count}++ ;    # and overall
                try {
                    # qmatch is the name of the queue matcher
                    $state = $subs->{callback}
                        ->( $self, $row->{queue_name}, $row ) ;
                    if ( $row->{added} gt $subs->{after} ) {



( run in 1.932 second using v1.01-cache-2.11-cpan-39bf76dae61 )