App-Basis-Queue
view release on metacpan or search on metacpan
bin/qpubsub view on Meta::CPAN
#!/usr/bin/env perl
# PODNAME: abq 'app basis queue' script to add things to queues
# ABSTRACT: add things to queues, using App::Basis::Queue
=head1 SYNOPSIS
add a message to queue fred, will be a simple queue, --type=simple is default
> abq -q fred "message to tweet"
# add a message to the work task queue
> abq -q work --type=task "process /some/file/path"
# add to a pubsub queue
> abq --queue=notes --type=pubsub "started a program"
# listen to all pubsub queue messages forever, messages are writen to stdout
> abq --queue=notes --type=pubsub --listen
# take an item from a simple queue
> abq -q fred --pop
# process an item in a task queue, exit status will determin if it is processed
# the queue message is passed to the exec command in quotes
# obviously there are security concerns around doing this, clean your inputs!
> abq -q work --type=task --exec "/command/to/run"
# peek at work items in a task queue, --type=task is default for a peek
> abq --peek --count=10 -q work
to get full help use
> abq --help
=head1 DESCRIPTION
Add am item to a queue or process an item from a queue
config file is in ~/.abq
abq:
queue:
dsn: dbi:SQLite:/tmp/abq.sqlite
user:
password:
The queue entry holds information about the queue database that you want to connect to, this is
obviously a perl DBI style connection
=cut
#
# (c) Kevin Mulholland, moodfarm@cpan.org
# this code is released under the Perl Artistic License
# -----------------------------------------------------------------------------
use 5.10.0 ;
use strict ;
use warnings ;
use POSIX qw(strftime) ;
use App::Basis ;
use App::Basis::Config ;
use DBI ;
use App::Basis::Queue ;
use Date::Manip::Date ;
use feature 'say' ;
use Lingua::EN::Inflexion ;
# what about Data::Dumper::GUI or YAML::Tiny::Color
use Data::Printer ;
use constant FIVE_DAYS => 5 * 24 * 3600 ;
use constant PEEK_DEFAULT => 10 ;
use constant EXEC_DEFAULT => 1 ;
# -----------------------------------------------------------------------------
use constant QUEUE_CONFIG => "$ENV{HOME}/.abq" ;
my @queue_types = qw(task simple pubsub) ;
# -----------------------------------------------------------------------------
# lets do the testing stuff with private variables
{
my $testing = 0 ;
sub set_testing
{
$testing = 1 ;
}
sub is_testing
{
$testing ;
}
}
# -----------------------------------------------------------------------------
my $program = get_program() ;
# URL parsing from https://metacpan.org/pod/URI
my $valid_url
= "(?:([^:/?#]+):)?(?://([^/?#]*))?([^?#]*)(?:\?([^#]*))?(?:#(.*))?" ;
bin/qpubsub view on Meta::CPAN
use perldoc $program to get the setup for the ~/.$program config file",
help_cmdline => "message to send",
options => {
'verbose|v' => 'Output useful information',
'queue|q=s' => { desc => 'queue to add things to', required => 1 },
'size|s' => 'Disply the number of unprocessed items in a task queue',
'peek|p' => 'Display the next few items in a task queue, use count to limit, default '
PEEK_DEFAULT,
'type|t=s' => {
desc => 'Type of the queue, one of ' . join( ", ", @queue_types ),
default => 'task',
required => 1,
validate => sub {
my $t = shift ;
grep {/$t/} @queue_types ;
},
},
'listen|l' =>
'Listen for pubsub messages on the queue, use count to limit, default no limit',
'exec|e=s' => {
desc =>
"command to run with the message, use count to limit, default "
. EXEC_DEFAULT,
},
'activates|a=s' => {
desc =>
'Parsable UTC datetime after which the message should be valid',
},
'count|c=i' => 'Number of messages to read',
}
) ;
my $msg = join( ' ', @ARGV ) ;
if ( $opt{test} ) {
set_verbose(1) ;
set_testing(1) ;
}
# lets have the config named after this program
my $cfg = App::Basis::Config->new(
filename => "$ENV{HOME}/.$program",
die_on_error => 1
) ;
create_default_config($cfg) ;
my ( $activates, $epoch ) = parse_datetime( $opt{activates} ) ;
my ( $queue, $settings ) ;
$settings->{file} ||= get_program . "_$queue.tweets" ;
msg_exit( "Could not find valid config in $ENV{HOME}/.$program", 2 )
if ( !$queue || !$settings ) ;
# update the config if it needs it
$cfg->store() ;
my $q = $cfg->get("/abq/queue") ;
$q->{prefix} ||= "/" ;
my $theq = connect_queue( $q->{dsn}, $q->{user}, $q->{password},
$q->{prefix} . $queue ) ;
if ( !$theq ) {
msg_exit( "Could not connect to queue $q->{dsn}", 2 ) ;
}
# get the things out of the way that are information only
# if asking for size or peeking, then there is no message adding or sending
if ( $opt{size} || $opt{peek} ) {
my $s = $theq->queue_size() ;
if ( $opt{size} ) {
say inflect "<#n:$s> <N:items> <V:were> found in the queue" ;
} else {
if ($s) {
my $count = 1 ;
say "-" x 80 ;
foreach my $tweet ( $theq->peek( count => $opt{peek} ) ) {
say $count++ . ":\n$tweet->{data}->{tweet}" ;
say "-" x 80 ;
}
} else {
say "The queue is empty" ;
}
}
} else {
# if we have a message then this should be added to the queue asap
if ($msg) {
my $resp = $theq->add(
data => { tweet => $msg },
activates => $activates
) ;
}
if ( !$action && !$msg ) {
verbose("Parameters are required, or message queue is empty") ;
exit 0 ;
}
}
( run in 1.088 second using v1.01-cache-2.11-cpan-e1769b4cff6 )