App-Basis-Queue

 view release on metacpan or  search on metacpan

lib/App/Basis/Queue.pm  view on Meta::CPAN

    # cascading deletes on FOREIGN keys
    if ( $self->_db_type() eq 'SQLite' ) {
        $self->{dbh}->do("PRAGMA foreign_keys = ON") ;
    }

    # ensure we have the tables created (if wanted)
    $self->_create_tables() if ( !$self->skip_table_check ) ;

    # get the first list of queues we have
    $self->list_queues() ;
}

# -----------------------------------------------------------------------------
# TODO: add a DEMOLISH method to clean up unprocessed items when the object
# handle goes out of scope

# -----------------------------------------------------------------------------
## class private variables
# -----------------------------------------------------------------------------

has _queue_list => (
    is       => 'rwp',              # like ro, but creates _set_queue_list too
    lazy     => 1,
    default  => sub { {} },
    writer   => '_set_queue_list',
    init_arg => undef               # dont allow setting in constructor ;
) ;

has _db_type => (
    is       => 'rwp',              # like ro, but creates _set_queue_list too
    lazy     => 1,
    default  => sub {''},
    writer   => '_set_db_type',
    init_arg => undef               # dont allow setting in constructor ;
) ;

has _processor => (
    is      => 'ro',
    lazy    => 1,
    default => sub {
        my $hostname = `hostname` ;
        $hostname =~ s/\s//g ;
        $hostname . "::$ENV{USER}" . "::" . $$ ;
    },
    init_arg => undef               # dont allow setting in constructor ;
) ;

# -----------------------------------------------------------------------------
## class private methods
# -----------------------------------------------------------------------------

sub _debug
{
    my $self = shift ;

    return if ( !$self->{debug} ) ;

    my $msg = shift ;
    $msg =~ s/^/    /gsm ;

    say STDERR $msg ;
}

# -----------------------------------------------------------------------------
sub _build_sql_stmt
{
    my ( $query, $p ) = @_ ;
    our @params = $p ? @$p : () ;
    $query =~ s/\s+$// ;
    $query .= ' ;' if ( $query !~ /;$/ ) ;

# make sure we repesent NULL properly, do quoting - only basic its only for debug
    our $i = 0 ;
    {

        sub _repl
        {
            my $out = 'NULL' ;

            # quote strings, leave numbers untouched, not doing floats
            if ( defined $params[$i] ) {
                $out
                    = $params[$i] =~ /^\d+$/ ? $params[$i] : "'$params[$i]'" ;
            }
            $i++ ;

            return $out ;
        }
        $query =~ s/\?/_repl/gex if ( @params && scalar(@params) ) ;
    }

    return $query ;
}

# -----------------------------------------------------------------------------
sub _query_db
{
    state $sth_map = {} ;
    my $self = shift ;
    my ( $query, $p, $no_results ) = @_ ;
    my @params = $p ? @$p : () ;
    my %result ;

    $query =~ s/\s+$// ;
    $query .= ' ;' if ( $query !~ /;$/ ) ;

    if ( $self->{debug} ) {

        $self->_debug(
            "ACTUAL QUERY: $query\nQUERY PARAMS: " . to_json( \@params ) ) ;
        my $sql = _build_sql_stmt( $query, $p ) ;
        $self->_debug( 'BUILT QUERY : ' . $sql . "\n" ) ;
    }

    try {
        my $sth ;

        # key based on query and fields we are using
        my $key = "$query." . join( '.', @params ) ;
        if ( $sth_map->{$key} ) {
            $sth = $sth_map->{$key} ;

lib/App/Basis/Queue.pm  view on Meta::CPAN

    # switch to SQL wildcard
    $qname =~ s/\*/%/g ;

    # get list of IDs we can process, as SQLite has an issue
    # with ORDER BY and LIMIT in an UPDATE call so we have to do things
    # in 2 stages, which means it is not easy to mark lots of records
    # to be processed but that its possibly a good thing
    my $sql = sprintf(
        "SELECT * FROM %s_queue
            WHERE queue_name LIKE ?
            AND processed = 0
            AND msg_type = ?
            ORDER BY counter ASC
            LIMIT 1;", $self->{prefix}
    ) ;
    my $expires = _parse_datetime( time() ) ;
    my $info = $self->_query_db( $sql, [ $qname, MSG_SIMPLE ] ) ;

    # if there are no items to update, return
    # return if ( !scalar( $info->{rows} ) ) ;
    return if ( !$info->{row_count} ) ;

    my $row = $info->{rows}->[0] ;
    my $id  = $row->{id} ;

    # mark item that I have popped
    my $update
        = "SET processed=1, processor=?, process_start=?, processing_time=?, process_failure=0
        WHERE id = ? ;" ;
    my $resp = $self->_update_db( $self->{prefix} . "_queue",
        $update, [ $self->_processor(), _std_datetime(), 0.0, $id ] ) ;
    if ( !$resp->{row_count} ) {
        return ;
    }
    # return unpacked data
    return decode_json( $row->{data} ) ;
}

# -----------------------------------------------------------------------------

=head2 size


Get size of a SIMPLE queue

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcards allowed

=back

B<Example usage>

    my $count = $queue->size( queue => 'queue_name') ;
    say "there are $count items in the queue" ;

    # size can manage wildcards
    $queue->size( queue => '/celestial/*') ;

=cut

sub size
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "size accepts a hash or a hashref of parameters" ;
        return 0 ;
    }
    $params->{_qtype} = MSG_SIMPLE ;
    return $self->_queue_size($params) ;
}


# -----------------------------------------------------------------------------
# try and find a match for the qname, replace SQL wildcard with perl ones

sub _valid_qname
{
    my $self = shift ;
    my ($qname) = @_ ;

    # update queue list
    $self->list_queues() ;

    $qname =~ s/%/*/g ;
    my $wild = ( $qname =~ /\*/ ) ? 1 : 0 ;

    my $match = 0 ;
    foreach my $q ( keys %{ $self->{_queue_list} } ) {
        if ( ( $wild && $q =~ $qname ) || $self->{_queue_list}->{$qname} ) {
            $match++ ;
            last ;
        }
    }

    return $match ;
}

# -----------------------------------------------------------------------------

=head2 process

Process up to 100 tasks from the named queue(s)

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcards allowed

lib/App/Basis/Queue.pm  view on Meta::CPAN

            AND processed = 0
            AND processor = ?
            AND process_failure = 1
            AND msg_type = ?
            AND expires > ?
            AND activates <= ?
            ORDER BY added ASC
            LIMIT ?;", $self->{prefix},
    ) ;
    my $info = $self->_query_db( $sql,
        [ $qname, $self->_processor(), MSG_TASK, $expires, $now, $params->{count} ] ) ;
    $self->{debug} = 0 ;

    foreach my $row ( @{ $info->{rows} } ) {

        # unpack the data
        $row->{data} = decode_json( $row->{data} ) ;

        my $state = 0 ;
        try {
            $state = $params->{callback}
                ->( $self, $qname, $row, $params->{callback_params} ) ;
        }
        catch {
            warn "process_failures: error in callback $@" ;
        } ;

      # we don't do anything else with the record, we assume that the callback
      # function will have done something like delete it or re-write it
    }

    return $processed_count ;
}

sub process_deadletters{
    my $self = shift ;
    return $self->process_failures( $self, @_) ;
}

# -----------------------------------------------------------------------------

=head2 queue_size

Get the count of unprocessed TASK items in the queue

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcards allowed

=back

B<Example usage>

    my $count = $queue->queue_size( queue => 'queue_name') ;
    say "there are $count unprocessed items in the queue" ;

    # queue size can manage wildcards
    $queue->queue_size( queue => '/celestial/*') ;

=cut

sub queue_size
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "queue_size accepts a hash or a hashref of parameters" ;
        return 0 ;
    }
    $params->{_qtype} = MSG_TASK ;
    return $self->_queue_size($params) ;
}

# -----------------------------------------------------------------------------

=head2 list_queues

Qbtains a list of all the queues used by this database

B<Example usage>

    my $qlist = $queue->list_queues() ;
    foreach my $q (@$qlist) {
        say $q ;
    }

=cut

sub list_queues
{
    my $self = shift ;
    my %ques ;

    my $result = $self->_query_db(
        sprintf( 'SELECT DISTINCT queue_name FROM %s_queue;',
            $self->{prefix} )
    ) ;

    if ( !$result->{error} ) {
        %ques = map { $_->{queue_name} => 1 } @{ $result->{rows} } ;
    }

    $self->_set_queue_list( \%ques ) ;

    return [ keys %ques ] ;
}

# -----------------------------------------------------------------------------

=head2 peek

Have a look at an unprocessed item in a TASK queue

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcards allowed

=item position

position in the queue you want to peek at (head/start) or (tail/end) - defaults to head

=item count

number of items to peek, defaults to 1, max is 100 (PEEK_MAX)

=back

B<Returns>

Hashref with the following fields queue_name added activates expires data

B<Example usage>

    my $data = $queue->peek( queue => 'queue_name', position => 'head') ;

=cut

sub peek

lib/App/Basis/Queue.pm  view on Meta::CPAN

            WHERE queue_name LIKE ?
            AND processed = 0
            AND process_failure = 0
            AND msg_type = ?
            AND expires > ?
            ORDER BY counter $direction
            LIMIT ?;", $self->{prefix}
    ) ;
    my $expires = _parse_datetime( time() ) ;
    my $info
        = $self->_query_db( $sql, [ $qname, MSG_TASK, $expires, $params->{count} ] ) ;

    # if there are no items found return
    # if ( !scalar( $info->{rows} ) ) {
    if ( !$info->{row_count} ) {
        return wantarray ? () : undef ;
    }

    my @data ;
    foreach my $row ( @{ $info->{rows} } ) {
        $row->{data} = decode_json( $row->{data} ) ;
        # remove things we do not want to share
        foreach my $f (
            qw(counter id msg_type persist processed processor process_start processing_time process_failure)
            ) {
            delete $row->{$f} ;
        }
        CORE::push @data, $row ;
    }

    return wantarray ? @data : $data[0] ;
}

# -----------------------------------------------------------------------------

=head2 stats

Obtains stats about the task data in the queue, this may be time/processor intensive
so use with care!

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcards allowed

=back

provides counts of unprocessed, processed, failures
max process_failure, avg process_failure, earliest_added, latest_added,
min_data_size, max_data_size, avg_data_size, total_records
avg_elapsed, max_elapsed, min_elapsed

B<Example usage>

    my $stats = $queue->stats( queue => 'queue_name') ;
    say "processed $stats->{processed}, failures $stats->{failure}, unprocessed $stats->{unprocessed}" ;

    # for all matching wildcard queues
    my $all_stats = $queue->stats( queue => '/celestial/*') ;

=cut

sub stats
{
    my $self    = shift ;
    my $params  = @_ % 2 ? shift : {@_} ;
    my $expires = _parse_datetime( time() ) ;

    if ( ref($params) ne 'HASH' ) {
        warn "stats accepts a hash or a hashref of parameters" ;
        return {} ;
    }
    $params->{queue} ||= $self->{default_queue} ;
    my $qname     = $params->{queue} ;
    my %all_stats = () ;

    # update queue list
    $self->list_queues() ;

    # switch to SQL wildcard
    $qname =~ s/%/*/g ;

    # work through all the queues and only count that match our qname
    foreach my $q ( keys %{ $self->{_queue_list} } ) {
        next if ( !$self->_valid_qname($q) ) ;
        next if ( ( $qname =~ /\*/ && $qname !~ $q ) || $qname ne $q ) ;

        # queue_size also calls list_queues, so we don't need to do it!
        $all_stats{unprocessed} += $self->queue_size( queue => $q ) ;

        my $sql = sprintf(
            "SELECT count(*) as count
            FROM %s_queue
            WHERE queue_name = ?
            AND msg_type = ?
            AND expires > ?
            AND processed = 1 ;", $self->{prefix}
        ) ;
        my $resp = $self->_query_db( $sql, [ $q, MSG_TASK, $expires ] ) ;
        $all_stats{processed} += $resp->{rows}->[0]->{count} || 0 ;

        $sql = sprintf(
            "SELECT count(*) as count FROM %s_queue
            WHERE queue_name = ?
            AND processed = 0
            AND msg_type = ?
            AND expires > ?
            AND process_failure = 1 ;", $self->{prefix}
        ) ;
        $resp = $self->_query_db( $sql, [ $q, MSG_TASK, $expires ] ) ;
        $all_stats{failures} += $resp->{rows}->[0]->{count} || 0 ;
    }

    # get all the stats for all matching queues
    my $sql = sprintf(
        "SELECT

lib/App/Basis/Queue.pm  view on Meta::CPAN

                    # qmatch is the name of the queue matcher
                    $state = $subs->{callback}
                        ->( $self, $row->{queue_name}, $row ) ;
                    if ( $row->{added} gt $subs->{after} ) {
                        $subs->{after} = $row->{added} ;
                    }
                    if ( $row->{counter} > $subs->{counter} ) {
                        $subs->{counter} = $row->{counter} ;
                    }
                }
                catch {
                    warn "listen: error in callback $@" ;
                } ;
            }
        }
        $started = 1 ;
        last
            if ( $params->{events}
            && $self->{ev_count} > $params->{events} ) ;
        last
            if ( $params->{datetime}
            && time() > $params->{datetime} ) ;

        # wait a bit to allow the queues to fillup
        sleep($delay) ;
    }

    return $self->{ev_count} ;
}

# -----------------------------------------------------------------------------

=head2 unsubscribe

Unsubscribe from a named queue.

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcard allowed

=back

B<Example usage>

    sub handler {
        state $counter = 0 ;
        my $q = shift ;             # we get the queue object
        # the queue trigger that matched, the actual queue name and the data
        my ($qmatch, $queue, $data) = @_ ;

        # we are only interested in 10 messages
        if( ++$counter > 10) {
            $q->unsubscribe( queue => $queue) ;
        } else {
            say Data::Dumper( $data) ;
        }
    }

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;
    $queue->subscribe( queue => '/logs/*', callback => \&handler) ;
    $queue->listen() ;

=cut

sub unsubscribe
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "unsubscribe accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;
    if ( $params->{queue} ) {

        # does not matter if the queue name does not exist!
        delete $self->{subscriptions}->{ $params->{queue} } ;
    }
}

# -----------------------------------------------------------------------------

=head2 purge_tasks

Purge will remove all processed task items and failures/deadletters (process_failure >= 5).
These are completely removed from the database

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcard allowed

=item before (optional)

Unix epoch or parsable datetime before which items should be purged

defaults to 'now'

=back

B<Example usage>

    my $before = $queue->stats( queue => 'queue_name', before => '2015-11-24') ;
    $queue->purge_tasks( queue => 'queue_name') ;
    my $after = $queue->stats( queue  => 'queue_name') ;

    say "removed " .( $before->{total_records} - $after->{total_records}) ;

=cut

sub purge_tasks
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "purge_tasks accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;
    my $qname = $params->{queue} ;

    # SQL wildcard replace
    $qname =~ s/\*/%/g ;

    try {
        if ( !defined $params->{before} ) {
            $params->{before} = _parse_datetime( time() ) ;
        } else {
            $params->{before} = _parse_datetime( $params->{before} ) ;
        }
    }
    catch {
        warn(
            "this does not look like a datetime value I can use: '$params->{before}'"
        ) ;
        $params->{before} = _parse_datetime( time() ) ;
    } ;

# TODO: add in expired items too, plus the and processed=1 or process_failure =1 looks a bit wrong
    my $sql = "WHERE queue_name LIKE ?
        AND processed = 1
        OR process_failure = 1
        AND msg_type = ?
        AND added <= ?" ;

    my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
        $sql, [ $qname, MSG_TASK, $params->{before} ] ) ;

    # return the number of items deleted
    return $resp->{row_count} ;
}

# -----------------------------------------------------------------------------

=head2 purge_chatter

purge will remove all chatter messages.
These are completely removed from the database

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcard allowed

=item before (optional)

Unix epoch or parsable datetime before which items should be purged

defaults to 'now'

=back

B<Example usage>

    my $del = $queue->purge_chatter( queue => 'queue_name', before => '2015-11-24') ;

    say "removed $del messages" ;

=cut

sub purge_chatter
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "purge_chatter accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;
    my $qname = $params->{queue} ;

    # SQL wildcard replace
    $qname =~ s/\*/%/g ;

    my $sql = "WHERE queue_name LIKE ?
        AND processed = 1
        OR process_failure = 1
        AND msg_type = ?
        AND added <= ?" ;
    my $sql_args = [ $qname, MSG_CHATTER, $params->{before} ] ;

    if ( defined $params->{counter} ) {
        my $sql = "WHERE queue_name LIKE ?
        AND processed = 1
        OR process_failure = 1
        AND msg_type = ?
        AND counter <= ?" ;
        my $sql_args = [ $qname, MSG_CHATTER, $params->{counter} ] ;

    } else {
        try {
            if ( !defined $params->{before} ) {
                $params->{before} = _parse_datetime( time() ) ;
            } else {
                $params->{before} = _parse_datetime( $params->{before} ) ;
            }
        }
        catch {
            warn(
                "this does not look like a datetime value I can use: '$params->{before}'"
            ) ;
            $params->{before} = _parse_datetime( time() ) ;
        } ;
    }

    my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
        $sql, $sql_args ) ;

    # return the number of items deleted
    return $resp->{row_count} ;
}

# -----------------------------------------------------------------------------

=head2 remove_queue

Remove a queue and all of its records (task and chatter)

B<Parameters>

Takes a hash of

=over

=item queue

Name of the queue, wildcards allowed

=back

B<Example usage>

    $queue->remove_queue( queue => 'queue_name') ;
    my $after = $queue->list_queues() ;
    # convert list into a hash for easier checking
    my %a = map { $_ => 1} @after ;
    say "queue removed" if( !$q->{queue_name}) ;

=cut

sub remove_queue
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "remove_queue accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;
    my $qname = $params->{queue} ;

    # SQL wildcard replace
    $qname =~ s/\*/%/g ;

    my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
        "WHERE queue_name LIKE ?", [$qname] ) ;
    return $resp->{success} ;
}

# -----------------------------------------------------------------------------

=head2 reset_failures, reset_deadletters

Clear any process_failure values from all unprocessed task items

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcard allowed

=back

B<Example usage>

    my $before = $queue->stats( queue => 'queue_name') ;
    $queue->reset_failures( queue => 'queue_name') ;
    my $after = $queue->stats( queue => 'queue_name') ;

    say "reset " .( $after->{unprocessed} - $before->{unprocessed}) ;

=cut

sub reset_failures
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "reset_failures accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;
    my $qname = $params->{queue} ;

    # SQL wildcard replace
    $qname =~ s/\*/%/g ;

    my $sql = "SET process_failure=0" ;
    $sql .= " WHERE queue_name LIKE ?
        AND process_failure = 1
        AND msg_type = ?" ;
    my $resp = $self->_update_db( $self->{prefix} . "_queue",
        $sql, [ $qname, MSG_TASK ] ) ;

    return $resp->{row_count} ? $resp->{row_count} : 0 ;
}

sub reset_deadletters {
    my $self = shift ;
    return $self->reset_failures( $self, @_) ;
}

# -----------------------------------------------------------------------------

=head2 remove_failures, remove_deadletters

Permanently delete task failures from the database

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcard allowed

=back

B<Example usage>

    $queue->remove_failues( queue => 'queue_name') ;
    my $stats = $queue->stats( queue => 'queue_name') ;
    say "failues left " .( $stats->{failures}) ;

=cut

sub remove_failures
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "remove_failures accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;
    my $qname = $params->{queue} ;

    # SQL wildcard replace
    $qname =~ s/\*/%/g ;

    my $sql  = "WHERE process_failure = 1 AND msg_type = ?" ;
    my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
        $sql, [MSG_TASK] ) ;

    return $resp->{row_count} ;
}

sub remove_deadletters {
    my $self = shift ;
    return $self->reset_failures( $self, @_) ;
}

# -----------------------------------------------------------------------------

=head2 remove_tables

If you never need to use the database again, it can be completely removed

B<Example usage>

    $queue_>remove_tables() ;

=cut

sub remove_tables
{
    my $self = shift ;

    my $sql = sprintf( 'DROP TABLE %s_queue;', $self->{prefix} ) ;
    $self->_debug($sql) ;
    $self->{dbh}->do($sql) ;
}

# -----------------------------------------------------------------------------

1 ;



( run in 0.693 second using v1.01-cache-2.11-cpan-d7a12ab2c7f )