App-Basis-Queue
view release on metacpan or search on metacpan
bin/qpubsub view on Meta::CPAN
$date->secs_since_1970_GMT()
) ;
}
return wantarray ? @ret : $ret[0] ;
}
# -----------------------------------------------------------------------------
# build suitable config
sub create_default_config
{
my ($cfg) = @_ ;
$cfg->store() ;
}
# -----------------------------------------------------------------------------
# connect to the queue DB
sub connect_queue
{
my ( $dsn, $user, $passwd, $qname ) = @_ ;
my $dbh
= DBI->connect( $dsn, $user, $passwd,
{ RaiseError => 1, PrintError => 0, AutoCommit => 1 } )
or die "Could not connect to DB $dsn" ;
if ( $dsn =~ /SQLite/i ) {
$dbh->do("PRAGMA journal_mode = WAL") ;
$dbh->do("PRAGMA synchronous = NORMAL") ;
}
my $queue = App::Basis::Queue->new(
dbh => $dbh,
default_queue => $qname,
debug => 0,
) ;
return $queue ;
}
# -----------------------------------------------------------------------------
# main
my $action ;
my %opt = init_app(
help_text => "Simple script to queue messages for later action
use perldoc $program to get the setup for the ~/.$program config file",
help_cmdline => "message to send",
options => {
'verbose|v' => 'Output useful information',
'queue|q=s' => { desc => 'queue to add things to', required => 1 },
'size|s' => 'Disply the number of unprocessed items in a task queue',
'peek|p' => 'Display the next few items in a task queue, use count to limit, default '
PEEK_DEFAULT,
'type|t=s' => {
desc => 'Type of the queue, one of ' . join( ", ", @queue_types ),
default => 'task',
required => 1,
validate => sub {
my $t = shift ;
grep {/$t/} @queue_types ;
},
},
'listen|l' =>
'Listen for pubsub messages on the queue, use count to limit, default no limit',
'exec|e=s' => {
desc =>
"command to run with the message, use count to limit, default "
. EXEC_DEFAULT,
},
'activates|a=s' => {
desc =>
'Parsable UTC datetime after which the message should be valid',
},
'count|c=i' => 'Number of messages to read',
}
) ;
my $msg = join( ' ', @ARGV ) ;
if ( $opt{test} ) {
set_verbose(1) ;
set_testing(1) ;
}
# lets have the config named after this program
my $cfg = App::Basis::Config->new(
filename => "$ENV{HOME}/.$program",
die_on_error => 1
) ;
create_default_config($cfg) ;
my ( $activates, $epoch ) = parse_datetime( $opt{activates} ) ;
my ( $queue, $settings ) ;
$settings->{file} ||= get_program . "_$queue.tweets" ;
msg_exit( "Could not find valid config in $ENV{HOME}/.$program", 2 )
if ( !$queue || !$settings ) ;
# update the config if it needs it
$cfg->store() ;
my $q = $cfg->get("/abq/queue") ;
$q->{prefix} ||= "/" ;
my $theq = connect_queue( $q->{dsn}, $q->{user}, $q->{password},
$q->{prefix} . $queue ) ;
if ( !$theq ) {
msg_exit( "Could not connect to queue $q->{dsn}", 2 ) ;
}
# get the things out of the way that are information only
# if asking for size or peeking, then there is no message adding or sending
if ( $opt{size} || $opt{peek} ) {
my $s = $theq->queue_size() ;
if ( $opt{size} ) {
say inflect "<#n:$s> <N:items> <V:were> found in the queue" ;
( run in 0.666 second using v1.01-cache-2.11-cpan-140bd7fdf52 )