App-Basis-Queue

 view release on metacpan or  search on metacpan

bin/qpubsub  view on Meta::CPAN


if ( !$theq ) {
    msg_exit( "Could not connect to queue $q->{dsn}", 2 ) ;
}

# get the things out of the way that are information only
# if asking for size or peeking, then there is no message adding or sending
if ( $opt{size} || $opt{peek} ) {
    my $s = $theq->queue_size() ;
    if ( $opt{size} ) {
        say inflect "<#n:$s> <N:items> <V:were> found in the queue" ;
    } else {
        if ($s) {
            my $count = 1 ;
            say "-" x 80 ;
            foreach my $tweet ( $theq->peek( count => $opt{peek} ) ) {
                say $count++ . ":\n$tweet->{data}->{tweet}" ;
                say "-" x 80 ;
            }
        } else {
            say "The queue is empty" ;
        }
    }
} else {
    # if we have a message then this should be added to the queue asap
    if ($msg) {
        my $resp = $theq->add(
            data      => { tweet => $msg },
            activates => $activates
        ) ;
    }

bin/qtask  view on Meta::CPAN


if ( !$theq ) {
    msg_exit( "Could not connect to queue $q->{dsn}", 2 ) ;
}

# get the things out of the way that are information only
# if asking for size or peeking, then there is no message adding or sending
if ( $opt{size} || $opt{peek} ) {
    my $s = $theq->queue_size() ;
    if ( $opt{size} ) {
        say inflect "<#n:$s> <N:items> <V:were> found in the queue" ;
    } else {
        if ($s) {
            my $count = 1 ;
            say "-" x 80 ;
            foreach my $msg ( $theq->peek( count => PEEK_DEFAULT ) ) {
                say $count++ . ":\n$msg->{data}" ;
                say "-" x 80 ;
            }
        } else {
            say "The queue is empty" ;
        }
    }
} elsif( $opt{exec}) {
    # read $opt{count} items from the queue and run them individually against the exec program


} else {

    # optionally ready from stdin, allowing piping to script
    if( $msg eq '-') {

lib/App/Basis/Queue.pm  view on Meta::CPAN


sub _debug
{
    my $self = shift ;

    return if ( !$self->{debug} ) ;

    my $msg = shift ;
    $msg =~ s/^/    /gsm ;

    say STDERR $msg ;
}

# -----------------------------------------------------------------------------
sub _build_sql_stmt
{
    my ( $query, $p ) = @_ ;
    our @params = $p ? @$p : () ;
    $query =~ s/\s+$// ;
    $query .= ' ;' if ( $query !~ /;$/ ) ;

lib/App/Basis/Queue.pm  view on Meta::CPAN


=item queue

Name of the queue, wildcards allowed

=back

B<Example usage>

    my $count = $queue->size( queue => 'queue_name') ;
    say "there are $count items in the queue" ;

    # size can manage wildcards
    $queue->size( queue => '/celestial/*') ;

=cut

sub size
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

lib/App/Basis/Queue.pm  view on Meta::CPAN


=item queue

Name of the queue, wildcards allowed

=back

B<Example usage>

    my $count = $queue->queue_size( queue => 'queue_name') ;
    say "there are $count unprocessed items in the queue" ;

    # queue size can manage wildcards
    $queue->queue_size( queue => '/celestial/*') ;

=cut

sub queue_size
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

lib/App/Basis/Queue.pm  view on Meta::CPAN

# -----------------------------------------------------------------------------

=head2 list_queues

Qbtains a list of all the queues used by this database

B<Example usage>

    my $qlist = $queue->list_queues() ;
    foreach my $q (@$qlist) {
        say $q ;
    }

=cut

sub list_queues
{
    my $self = shift ;
    my %ques ;

    my $result = $self->_query_db(

lib/App/Basis/Queue.pm  view on Meta::CPAN

=back

provides counts of unprocessed, processed, failures
max process_failure, avg process_failure, earliest_added, latest_added,
min_data_size, max_data_size, avg_data_size, total_records
avg_elapsed, max_elapsed, min_elapsed

B<Example usage>

    my $stats = $queue->stats( queue => 'queue_name') ;
    say "processed $stats->{processed}, failures $stats->{failure}, unprocessed $stats->{unprocessed}" ;

    # for all matching wildcard queues
    my $all_stats = $queue->stats( queue => '/celestial/*') ;

=cut

sub stats
{
    my $self    = shift ;
    my $params  = @_ % 2 ? shift : {@_} ;

lib/App/Basis/Queue.pm  view on Meta::CPAN

    sub handler {
        state $counter = 0 ;
        my $q = shift ;             # we get the queue object
        # the queue trigger that matched, the actual queue name and the data
        my ($qmatch, $queue, $data) = @_ ;

        # we are only interested in 10 messages
        if( ++$counter > 10) {
            $q->unsubscribe( queue => $queue) ;
        } else {
            say Data::Dumper( $data) ;
        }
    }

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;
    $queue->subscribe( queue => '/logs/*', callback => \&handler) ;
    $queue->listen() ;

=cut

sub unsubscribe

lib/App/Basis/Queue.pm  view on Meta::CPAN

defaults to 'now'

=back

B<Example usage>

    my $before = $queue->stats( queue => 'queue_name', before => '2015-11-24') ;
    $queue->purge_tasks( queue => 'queue_name') ;
    my $after = $queue->stats( queue  => 'queue_name') ;

    say "removed " .( $before->{total_records} - $after->{total_records}) ;

=cut

sub purge_tasks
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "purge_tasks accepts a hash or a hashref of parameters" ;

lib/App/Basis/Queue.pm  view on Meta::CPAN

Unix epoch or parsable datetime before which items should be purged

defaults to 'now'

=back

B<Example usage>

    my $del = $queue->purge_chatter( queue => 'queue_name', before => '2015-11-24') ;

    say "removed $del messages" ;

=cut

sub purge_chatter
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "purge_chatter accepts a hash or a hashref of parameters" ;

lib/App/Basis/Queue.pm  view on Meta::CPAN

Name of the queue, wildcards allowed

=back

B<Example usage>

    $queue->remove_queue( queue => 'queue_name') ;
    my $after = $queue->list_queues() ;
    # convert list into a hash for easier checking
    my %a = map { $_ => 1} @after ;
    say "queue removed" if( !$q->{queue_name}) ;

=cut

sub remove_queue
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "remove_queue accepts a hash or a hashref of parameters" ;

lib/App/Basis/Queue.pm  view on Meta::CPAN

Name of the queue, wildcard allowed

=back

B<Example usage>

    my $before = $queue->stats( queue => 'queue_name') ;
    $queue->reset_failures( queue => 'queue_name') ;
    my $after = $queue->stats( queue => 'queue_name') ;

    say "reset " .( $after->{unprocessed} - $before->{unprocessed}) ;

=cut

sub reset_failures
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "reset_failures accepts a hash or a hashref of parameters" ;

lib/App/Basis/Queue.pm  view on Meta::CPAN

=item queue

Name of the queue, wildcard allowed

=back

B<Example usage>

    $queue->remove_failues( queue => 'queue_name') ;
    my $stats = $queue->stats( queue => 'queue_name') ;
    say "failues left " .( $stats->{failures}) ;

=cut

sub remove_failures
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "remove_failures accepts a hash or a hashref of parameters" ;

t/01_tasks.t  view on Meta::CPAN

#     foreach my $i ( 1 .. $loop ) {
#         my $resp = $queue->add(
#             queue => $qname,
#             data  => { number => $i, desc => "test data" }
#         ) ;
#         $count++ if ($resp) ;
#     }
#     # $dbh->commit ;
#     # } ;
#     # if( $@) {
#     #     say STDERR "Error: during adding many items" ;
#     # }
#     my $elapsed = tv_interval($start) ;
#     ok( $count == $loop, "added $loop items" ) ;
#     my $rate = $count / $elapsed ;
#     ok( $rate > 5, "Insert rate > 5 per second" ) ;
#     note sprintf( "thats %.2f per second (%d in %ds)",
#         $rate, $count, $elapsed ) ;
# } ;

# -----------------------------------------------------------------------------



( run in 0.426 second using v1.01-cache-2.11-cpan-d7a12ab2c7f )