App-Basis-Queue

 view release on metacpan or  search on metacpan

lib/App/Basis/Queue.pm  view on Meta::CPAN


    # as the checking for tables and indexes is fraught with issues
    # over multiple databases its easier to not print the errors and
    # catch the creation failures and ignore them!
    my $p = $self->{dbh}->{PrintError} ;
    $self->{dbh}->{PrintError} = 0 ;

    # I am assuming either table does not exist then nor does the
    # other and we should create both
    if ( $self->_db_type() eq 'SQLite' ) {
        $self->_create_sqlite_table($table) ;
    } elsif ( $self->_db_type() eq 'Pg' ) {
        $self->_create_postgres_table($table) ;
    } elsif ( $self->_db_type() eq 'mysql' ) {
        $self->_create_mysql_table($table) ;
    } else {
        die "Unhandled database type " . $self->_db_type() ;
    }

    foreach my $field (
        qw/counter id added activates queue_name msg_type persist expires processed process_failure/
        ) {
        my $sql = _create_index_str( $table, $field ) ;

        $self->_debug($sql) ;
        try { $self->{dbh}->do($sql) ; } catch { } ;
    }

    # create views

    # restore the PrintError setting
    $self->{dbh}->{PrintError} = $p ;
}

# -----------------------------------------------------------------------------
# always create the datetime strings the same way
sub _std_datetime
{
    my ($secs) = @_ ;
    $secs ||= time() ;
    return strftime( "%Y-%m-%d %H:%M:%S UTC", gmtime($secs) ) ;
}

# -----------------------------------------------------------------------------
# convert something like a datetime string or an epoch value into a standardised
# datetime string and epoch value

sub _parse_datetime
{
    my ($datetime) = @_ ;
    state $date = new Date::Manip::Date ;
    my @ret ;

    if ( !$datetime ) {
        return wantarray ? ( undef, undef ) : undef ;
    } elsif ( $datetime =~ /^\d+$/ ) {
        # assume anything less than five days is a time into the future
        $datetime += time() if ( $datetime <= FIVE_DAYS ) ;
        @ret = ( _std_datetime($datetime), $datetime ) ;
    } else {
        # so parse will parse in locale time not as UTC
        $date->parse($datetime) ;
        {
            # if we get a warning about converting the date to a day, there
            # must be a problem with parsing the input date string
            local $SIG{__WARN__} = sub {
                die "Invalid date, could not parse" ;
            } ;
            my $day = $date->printf("%a") ;
        }

        my $d2 = $date->printf("%O %Z") ;
        # reparse the date to get it into UTC, best way I could think of :(
        $date->parse($d2) ;

        # secs_since_1970_GMT is epoch
        @ret = (
            _std_datetime( $date->secs_since_1970_GMT() ),
            $date->secs_since_1970_GMT()
        ) ;
    }

    return wantarray ? @ret : $ret[0] ;
}

# -----------------------------------------------------------------------------
# _add
# Add some data into a named queue. Could be a task or a chatter mesg
# * This does not handle wildcard queues *

sub _add
{
    state $uuid = Data::UUID->new() ;
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "_add accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;

    # to keep what was here before the change to the parameters
    my $qname    = $params->{queue} ;
    my $msg_type = $params->{type} ;
    # only TASK events can activate in the future
    delete $params->{activates} if ( $msg_type ne MSG_TASK ) ;
    my $persist = $params->{persist} ;
    my ( $expires, $when ) = _parse_datetime( $params->{expires} ) ;
    my $activates = _parse_datetime( $params->{activates} ) ;
    $activates ||= _std_datetime() ;
    my $data = $params->{data} ;

    if ( ref($data) ne 'HASH' ) {
        warn "_add data parameter must be a hashref" ;
        return 0 ;
    }

    if ( $params->{expires} && $when && $when < time() ) {
        warn "_add not storing expired data" ;



( run in 0.603 second using v1.01-cache-2.11-cpan-ceb78f64989 )