view release on metacpan or search on metacpan
bin/qpubsub view on Meta::CPAN
if ( !$theq ) {
msg_exit( "Could not connect to queue $q->{dsn}", 2 ) ;
}
# get the things out of the way that are information only
# if asking for size or peeking, then there is no message adding or sending
if ( $opt{size} || $opt{peek} ) {
my $s = $theq->queue_size() ;
if ( $opt{size} ) {
say inflect "<#n:$s> <N:items> <V:were> found in the queue" ;
} else {
if ($s) {
my $count = 1 ;
say "-" x 80 ;
foreach my $tweet ( $theq->peek( count => $opt{peek} ) ) {
say $count++ . ":\n$tweet->{data}->{tweet}" ;
say "-" x 80 ;
}
} else {
say "The queue is empty" ;
}
}
} else {
# if we have a message then this should be added to the queue asap
if ($msg) {
my $resp = $theq->add(
data => { tweet => $msg },
activates => $activates
) ;
}
if ( !$theq ) {
msg_exit( "Could not connect to queue $q->{dsn}", 2 ) ;
}
# get the things out of the way that are information only
# if asking for size or peeking, then there is no message adding or sending
if ( $opt{size} || $opt{peek} ) {
my $s = $theq->queue_size() ;
if ( $opt{size} ) {
say inflect "<#n:$s> <N:items> <V:were> found in the queue" ;
} else {
if ($s) {
my $count = 1 ;
say "-" x 80 ;
foreach my $msg ( $theq->peek( count => PEEK_DEFAULT ) ) {
say $count++ . ":\n$msg->{data}" ;
say "-" x 80 ;
}
} else {
say "The queue is empty" ;
}
}
} elsif( $opt{exec}) {
# read $opt{count} items from the queue and run them individually against the exec program
} else {
# optionally ready from stdin, allowing piping to script
if( $msg eq '-') {
lib/App/Basis/Queue.pm view on Meta::CPAN
sub _debug
{
my $self = shift ;
return if ( !$self->{debug} ) ;
my $msg = shift ;
$msg =~ s/^/ /gsm ;
say STDERR $msg ;
}
# -----------------------------------------------------------------------------
sub _build_sql_stmt
{
my ( $query, $p ) = @_ ;
our @params = $p ? @$p : () ;
$query =~ s/\s+$// ;
$query .= ' ;' if ( $query !~ /;$/ ) ;
lib/App/Basis/Queue.pm view on Meta::CPAN
=item queue
Name of the queue, wildcards allowed
=back
B<Example usage>
my $count = $queue->size( queue => 'queue_name') ;
say "there are $count items in the queue" ;
# size can manage wildcards
$queue->size( queue => '/celestial/*') ;
=cut
sub size
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
lib/App/Basis/Queue.pm view on Meta::CPAN
=item queue
Name of the queue, wildcards allowed
=back
B<Example usage>
my $count = $queue->queue_size( queue => 'queue_name') ;
say "there are $count unprocessed items in the queue" ;
# queue size can manage wildcards
$queue->queue_size( queue => '/celestial/*') ;
=cut
sub queue_size
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
lib/App/Basis/Queue.pm view on Meta::CPAN
# -----------------------------------------------------------------------------
=head2 list_queues
Qbtains a list of all the queues used by this database
B<Example usage>
my $qlist = $queue->list_queues() ;
foreach my $q (@$qlist) {
say $q ;
}
=cut
sub list_queues
{
my $self = shift ;
my %ques ;
my $result = $self->_query_db(
lib/App/Basis/Queue.pm view on Meta::CPAN
=back
provides counts of unprocessed, processed, failures
max process_failure, avg process_failure, earliest_added, latest_added,
min_data_size, max_data_size, avg_data_size, total_records
avg_elapsed, max_elapsed, min_elapsed
B<Example usage>
my $stats = $queue->stats( queue => 'queue_name') ;
say "processed $stats->{processed}, failures $stats->{failure}, unprocessed $stats->{unprocessed}" ;
# for all matching wildcard queues
my $all_stats = $queue->stats( queue => '/celestial/*') ;
=cut
sub stats
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
lib/App/Basis/Queue.pm view on Meta::CPAN
sub handler {
state $counter = 0 ;
my $q = shift ; # we get the queue object
# the queue trigger that matched, the actual queue name and the data
my ($qmatch, $queue, $data) = @_ ;
# we are only interested in 10 messages
if( ++$counter > 10) {
$q->unsubscribe( queue => $queue) ;
} else {
say Data::Dumper( $data) ;
}
}
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
$queue->subscribe( queue => '/logs/*', callback => \&handler) ;
$queue->listen() ;
=cut
sub unsubscribe
lib/App/Basis/Queue.pm view on Meta::CPAN
defaults to 'now'
=back
B<Example usage>
my $before = $queue->stats( queue => 'queue_name', before => '2015-11-24') ;
$queue->purge_tasks( queue => 'queue_name') ;
my $after = $queue->stats( queue => 'queue_name') ;
say "removed " .( $before->{total_records} - $after->{total_records}) ;
=cut
sub purge_tasks
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "purge_tasks accepts a hash or a hashref of parameters" ;
lib/App/Basis/Queue.pm view on Meta::CPAN
Unix epoch or parsable datetime before which items should be purged
defaults to 'now'
=back
B<Example usage>
my $del = $queue->purge_chatter( queue => 'queue_name', before => '2015-11-24') ;
say "removed $del messages" ;
=cut
sub purge_chatter
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "purge_chatter accepts a hash or a hashref of parameters" ;
lib/App/Basis/Queue.pm view on Meta::CPAN
Name of the queue, wildcards allowed
=back
B<Example usage>
$queue->remove_queue( queue => 'queue_name') ;
my $after = $queue->list_queues() ;
# convert list into a hash for easier checking
my %a = map { $_ => 1} @after ;
say "queue removed" if( !$q->{queue_name}) ;
=cut
sub remove_queue
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "remove_queue accepts a hash or a hashref of parameters" ;
lib/App/Basis/Queue.pm view on Meta::CPAN
Name of the queue, wildcard allowed
=back
B<Example usage>
my $before = $queue->stats( queue => 'queue_name') ;
$queue->reset_failures( queue => 'queue_name') ;
my $after = $queue->stats( queue => 'queue_name') ;
say "reset " .( $after->{unprocessed} - $before->{unprocessed}) ;
=cut
sub reset_failures
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "reset_failures accepts a hash or a hashref of parameters" ;
lib/App/Basis/Queue.pm view on Meta::CPAN
=item queue
Name of the queue, wildcard allowed
=back
B<Example usage>
$queue->remove_failues( queue => 'queue_name') ;
my $stats = $queue->stats( queue => 'queue_name') ;
say "failues left " .( $stats->{failures}) ;
=cut
sub remove_failures
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "remove_failures accepts a hash or a hashref of parameters" ;
t/01_tasks.t view on Meta::CPAN
# foreach my $i ( 1 .. $loop ) {
# my $resp = $queue->add(
# queue => $qname,
# data => { number => $i, desc => "test data" }
# ) ;
# $count++ if ($resp) ;
# }
# # $dbh->commit ;
# # } ;
# # if( $@) {
# # say STDERR "Error: during adding many items" ;
# # }
# my $elapsed = tv_interval($start) ;
# ok( $count == $loop, "added $loop items" ) ;
# my $rate = $count / $elapsed ;
# ok( $rate > 5, "Insert rate > 5 per second" ) ;
# note sprintf( "thats %.2f per second (%d in %ds)",
# $rate, $count, $elapsed ) ;
# } ;
# -----------------------------------------------------------------------------