App-Basis-Queue

 view release on metacpan or  search on metacpan

lib/App/Basis/Queue.pm  view on Meta::CPAN

                    # qmatch is the name of the queue matcher
                    $state = $subs->{callback}
                        ->( $self, $row->{queue_name}, $row ) ;
                    if ( $row->{added} gt $subs->{after} ) {
                        $subs->{after} = $row->{added} ;
                    }
                    if ( $row->{counter} > $subs->{counter} ) {
                        $subs->{counter} = $row->{counter} ;
                    }
                }
                catch {
                    warn "listen: error in callback $@" ;
                } ;
            }
        }
        $started = 1 ;
        last
            if ( $params->{events}
            && $self->{ev_count} > $params->{events} ) ;
        last
            if ( $params->{datetime}
            && time() > $params->{datetime} ) ;

        # wait a bit to allow the queues to fillup
        sleep($delay) ;
    }

    return $self->{ev_count} ;
}

# -----------------------------------------------------------------------------

=head2 unsubscribe

Unsubscribe from a named queue.

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcard allowed

=back

B<Example usage>

    sub handler {
        state $counter = 0 ;
        my $q = shift ;             # we get the queue object
        # the queue trigger that matched, the actual queue name and the data
        my ($qmatch, $queue, $data) = @_ ;

        # we are only interested in 10 messages
        if( ++$counter > 10) {
            $q->unsubscribe( queue => $queue) ;
        } else {
            say Data::Dumper( $data) ;
        }
    }

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;
    $queue->subscribe( queue => '/logs/*', callback => \&handler) ;
    $queue->listen() ;

=cut

sub unsubscribe
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "unsubscribe accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;
    if ( $params->{queue} ) {

        # does not matter if the queue name does not exist!
        delete $self->{subscriptions}->{ $params->{queue} } ;
    }
}

# -----------------------------------------------------------------------------

=head2 purge_tasks

Purge will remove all processed task items and failures/deadletters (process_failure >= 5).
These are completely removed from the database

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcard allowed

=item before (optional)

Unix epoch or parsable datetime before which items should be purged

defaults to 'now'

=back

B<Example usage>

    my $before = $queue->stats( queue => 'queue_name', before => '2015-11-24') ;
    $queue->purge_tasks( queue => 'queue_name') ;
    my $after = $queue->stats( queue  => 'queue_name') ;

    say "removed " .( $before->{total_records} - $after->{total_records}) ;



( run in 0.508 second using v1.01-cache-2.11-cpan-39bf76dae61 )