App-Basis-Queue
view release on metacpan or search on metacpan
lib/App/Basis/Queue.pm view on Meta::CPAN
# as the checking for tables and indexes is fraught with issues
# over multiple databases its easier to not print the errors and
# catch the creation failures and ignore them!
my $p = $self->{dbh}->{PrintError} ;
$self->{dbh}->{PrintError} = 0 ;
# I am assuming either table does not exist then nor does the
# other and we should create both
if ( $self->_db_type() eq 'SQLite' ) {
$self->_create_sqlite_table($table) ;
} elsif ( $self->_db_type() eq 'Pg' ) {
$self->_create_postgres_table($table) ;
} elsif ( $self->_db_type() eq 'mysql' ) {
$self->_create_mysql_table($table) ;
} else {
die "Unhandled database type " . $self->_db_type() ;
}
foreach my $field (
qw/counter id added activates queue_name msg_type persist expires processed process_failure/
) {
my $sql = _create_index_str( $table, $field ) ;
$self->_debug($sql) ;
try { $self->{dbh}->do($sql) ; } catch { } ;
}
# create views
# restore the PrintError setting
$self->{dbh}->{PrintError} = $p ;
}
# -----------------------------------------------------------------------------
# always create the datetime strings the same way
sub _std_datetime
{
my ($secs) = @_ ;
$secs ||= time() ;
return strftime( "%Y-%m-%d %H:%M:%S UTC", gmtime($secs) ) ;
}
# -----------------------------------------------------------------------------
# convert something like a datetime string or an epoch value into a standardised
# datetime string and epoch value
sub _parse_datetime
{
my ($datetime) = @_ ;
state $date = new Date::Manip::Date ;
my @ret ;
if ( !$datetime ) {
return wantarray ? ( undef, undef ) : undef ;
} elsif ( $datetime =~ /^\d+$/ ) {
# assume anything less than five days is a time into the future
$datetime += time() if ( $datetime <= FIVE_DAYS ) ;
@ret = ( _std_datetime($datetime), $datetime ) ;
} else {
# so parse will parse in locale time not as UTC
$date->parse($datetime) ;
{
# if we get a warning about converting the date to a day, there
# must be a problem with parsing the input date string
local $SIG{__WARN__} = sub {
die "Invalid date, could not parse" ;
} ;
my $day = $date->printf("%a") ;
}
my $d2 = $date->printf("%O %Z") ;
# reparse the date to get it into UTC, best way I could think of :(
$date->parse($d2) ;
# secs_since_1970_GMT is epoch
@ret = (
_std_datetime( $date->secs_since_1970_GMT() ),
$date->secs_since_1970_GMT()
) ;
}
return wantarray ? @ret : $ret[0] ;
}
# -----------------------------------------------------------------------------
# _add
# Add some data into a named queue. Could be a task or a chatter mesg
# * This does not handle wildcard queues *
sub _add
{
state $uuid = Data::UUID->new() ;
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "_add accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
# to keep what was here before the change to the parameters
my $qname = $params->{queue} ;
my $msg_type = $params->{type} ;
# only TASK events can activate in the future
delete $params->{activates} if ( $msg_type ne MSG_TASK ) ;
my $persist = $params->{persist} ;
my ( $expires, $when ) = _parse_datetime( $params->{expires} ) ;
my $activates = _parse_datetime( $params->{activates} ) ;
$activates ||= _std_datetime() ;
my $data = $params->{data} ;
if ( ref($data) ne 'HASH' ) {
warn "_add data parameter must be a hashref" ;
return 0 ;
}
if ( $params->{expires} && $when && $when < time() ) {
warn "_add not storing expired data" ;
( run in 0.603 second using v1.01-cache-2.11-cpan-ceb78f64989 )