App-Basis-Queue
view release on metacpan or search on metacpan
bin/qpubsub view on Meta::CPAN
{
my ($datetime) = @_ ;
state $date = Date::Manip::Date->new() ;
my @ret ;
if ( !$datetime ) {
return wantarray ? ( undef, undef ) : undef ;
} elsif ( $datetime =~ /^\d+$/ ) {
# assume anything less than five days is a time into the future
$datetime += time() if ( $datetime <= FIVE_DAYS ) ;
@ret = ( std_datetime($datetime), $datetime ) ;
} else {
# so parse will parse in locale time not as UTC
$date->parse($datetime) ;
{
# if we get a warning about converting the date to a day, there
# must be a problem with parsing the input date string
local $SIG{__WARN__} = sub {
die "Invalid date, could not parse" ;
} ;
my $day = $date->printf("%a") ;
}
my $d2 = $date->printf("%O %Z") ;
# reparse the date to get it into UTC, best way I could think of :(
$date->parse($d2) ;
# secs_since_1970_GMT is epoch
@ret = (
std_datetime( $date->secs_since_1970_GMT() ),
$date->secs_since_1970_GMT()
) ;
}
return wantarray ? @ret : $ret[0] ;
}
# -----------------------------------------------------------------------------
# build suitable config
sub create_default_config
{
my ($cfg) = @_ ;
$cfg->store() ;
}
# -----------------------------------------------------------------------------
# connect to the queue DB
sub connect_queue
{
my ( $dsn, $user, $passwd, $qname ) = @_ ;
my $dbh
= DBI->connect( $dsn, $user, $passwd,
{ RaiseError => 1, PrintError => 0, AutoCommit => 1 } )
or die "Could not connect to DB $dsn" ;
if ( $dsn =~ /SQLite/i ) {
$dbh->do("PRAGMA journal_mode = WAL") ;
$dbh->do("PRAGMA synchronous = NORMAL") ;
}
my $queue = App::Basis::Queue->new(
dbh => $dbh,
default_queue => $qname,
debug => 0,
) ;
return $queue ;
}
# -----------------------------------------------------------------------------
# main
my $action ;
my %opt = init_app(
help_text => "Simple script to queue messages for later action
use perldoc $program to get the setup for the ~/.$program config file",
help_cmdline => "message to send",
options => {
'verbose|v' => 'Output useful information',
'queue|q=s' => { desc => 'queue to add things to', required => 1 },
'size|s' => 'Disply the number of unprocessed items in a task queue',
'peek|p' => 'Display the next few items in a task queue, use count to limit, default '
PEEK_DEFAULT,
'type|t=s' => {
desc => 'Type of the queue, one of ' . join( ", ", @queue_types ),
default => 'task',
required => 1,
validate => sub {
my $t = shift ;
grep {/$t/} @queue_types ;
},
},
'listen|l' =>
'Listen for pubsub messages on the queue, use count to limit, default no limit',
'exec|e=s' => {
desc =>
"command to run with the message, use count to limit, default "
. EXEC_DEFAULT,
},
'activates|a=s' => {
desc =>
'Parsable UTC datetime after which the message should be valid',
},
'count|c=i' => 'Number of messages to read',
}
) ;
my $msg = join( ' ', @ARGV ) ;
if ( $opt{test} ) {
set_verbose(1) ;
set_testing(1) ;
}
# lets have the config named after this program
my $cfg = App::Basis::Config->new(
filename => "$ENV{HOME}/.$program",
die_on_error => 1
) ;
( run in 1.614 second using v1.01-cache-2.11-cpan-df04353d9ac )