App-Basis-Queue
view release on metacpan or search on metacpan
lib/App/Basis/Queue.pm view on Meta::CPAN
# cascading deletes on FOREIGN keys
if ( $self->_db_type() eq 'SQLite' ) {
$self->{dbh}->do("PRAGMA foreign_keys = ON") ;
}
# ensure we have the tables created (if wanted)
$self->_create_tables() if ( !$self->skip_table_check ) ;
# get the first list of queues we have
$self->list_queues() ;
}
# -----------------------------------------------------------------------------
# TODO: add a DEMOLISH method to clean up unprocessed items when the object
# handle goes out of scope
# -----------------------------------------------------------------------------
## class private variables
# -----------------------------------------------------------------------------
has _queue_list => (
is => 'rwp', # like ro, but creates _set_queue_list too
lazy => 1,
default => sub { {} },
writer => '_set_queue_list',
init_arg => undef # dont allow setting in constructor ;
) ;
has _db_type => (
is => 'rwp', # like ro, but creates _set_queue_list too
lazy => 1,
default => sub {''},
writer => '_set_db_type',
init_arg => undef # dont allow setting in constructor ;
) ;
has _processor => (
is => 'ro',
lazy => 1,
default => sub {
my $hostname = `hostname` ;
$hostname =~ s/\s//g ;
$hostname . "::$ENV{USER}" . "::" . $$ ;
},
init_arg => undef # dont allow setting in constructor ;
) ;
# -----------------------------------------------------------------------------
## class private methods
# -----------------------------------------------------------------------------
sub _debug
{
my $self = shift ;
return if ( !$self->{debug} ) ;
my $msg = shift ;
$msg =~ s/^/ /gsm ;
say STDERR $msg ;
}
# -----------------------------------------------------------------------------
sub _build_sql_stmt
{
my ( $query, $p ) = @_ ;
our @params = $p ? @$p : () ;
$query =~ s/\s+$// ;
$query .= ' ;' if ( $query !~ /;$/ ) ;
# make sure we repesent NULL properly, do quoting - only basic its only for debug
our $i = 0 ;
{
sub _repl
{
my $out = 'NULL' ;
# quote strings, leave numbers untouched, not doing floats
if ( defined $params[$i] ) {
$out
= $params[$i] =~ /^\d+$/ ? $params[$i] : "'$params[$i]'" ;
}
$i++ ;
return $out ;
}
$query =~ s/\?/_repl/gex if ( @params && scalar(@params) ) ;
}
return $query ;
}
# -----------------------------------------------------------------------------
sub _query_db
{
state $sth_map = {} ;
my $self = shift ;
my ( $query, $p, $no_results ) = @_ ;
my @params = $p ? @$p : () ;
my %result ;
$query =~ s/\s+$// ;
$query .= ' ;' if ( $query !~ /;$/ ) ;
if ( $self->{debug} ) {
$self->_debug(
"ACTUAL QUERY: $query\nQUERY PARAMS: " . to_json( \@params ) ) ;
my $sql = _build_sql_stmt( $query, $p ) ;
$self->_debug( 'BUILT QUERY : ' . $sql . "\n" ) ;
}
try {
my $sth ;
# key based on query and fields we are using
my $key = "$query." . join( '.', @params ) ;
if ( $sth_map->{$key} ) {
$sth = $sth_map->{$key} ;
lib/App/Basis/Queue.pm view on Meta::CPAN
# switch to SQL wildcard
$qname =~ s/\*/%/g ;
# get list of IDs we can process, as SQLite has an issue
# with ORDER BY and LIMIT in an UPDATE call so we have to do things
# in 2 stages, which means it is not easy to mark lots of records
# to be processed but that its possibly a good thing
my $sql = sprintf(
"SELECT * FROM %s_queue
WHERE queue_name LIKE ?
AND processed = 0
AND msg_type = ?
ORDER BY counter ASC
LIMIT 1;", $self->{prefix}
) ;
my $expires = _parse_datetime( time() ) ;
my $info = $self->_query_db( $sql, [ $qname, MSG_SIMPLE ] ) ;
# if there are no items to update, return
# return if ( !scalar( $info->{rows} ) ) ;
return if ( !$info->{row_count} ) ;
my $row = $info->{rows}->[0] ;
my $id = $row->{id} ;
# mark item that I have popped
my $update
= "SET processed=1, processor=?, process_start=?, processing_time=?, process_failure=0
WHERE id = ? ;" ;
my $resp = $self->_update_db( $self->{prefix} . "_queue",
$update, [ $self->_processor(), _std_datetime(), 0.0, $id ] ) ;
if ( !$resp->{row_count} ) {
return ;
}
# return unpacked data
return decode_json( $row->{data} ) ;
}
# -----------------------------------------------------------------------------
=head2 size
Get size of a SIMPLE queue
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcards allowed
=back
B<Example usage>
my $count = $queue->size( queue => 'queue_name') ;
say "there are $count items in the queue" ;
# size can manage wildcards
$queue->size( queue => '/celestial/*') ;
=cut
sub size
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "size accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{_qtype} = MSG_SIMPLE ;
return $self->_queue_size($params) ;
}
# -----------------------------------------------------------------------------
# try and find a match for the qname, replace SQL wildcard with perl ones
sub _valid_qname
{
my $self = shift ;
my ($qname) = @_ ;
# update queue list
$self->list_queues() ;
$qname =~ s/%/*/g ;
my $wild = ( $qname =~ /\*/ ) ? 1 : 0 ;
my $match = 0 ;
foreach my $q ( keys %{ $self->{_queue_list} } ) {
if ( ( $wild && $q =~ $qname ) || $self->{_queue_list}->{$qname} ) {
$match++ ;
last ;
}
}
return $match ;
}
# -----------------------------------------------------------------------------
=head2 process
Process up to 100 tasks from the named queue(s)
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcards allowed
lib/App/Basis/Queue.pm view on Meta::CPAN
AND processed = 0
AND processor = ?
AND process_failure = 1
AND msg_type = ?
AND expires > ?
AND activates <= ?
ORDER BY added ASC
LIMIT ?;", $self->{prefix},
) ;
my $info = $self->_query_db( $sql,
[ $qname, $self->_processor(), MSG_TASK, $expires, $now, $params->{count} ] ) ;
$self->{debug} = 0 ;
foreach my $row ( @{ $info->{rows} } ) {
# unpack the data
$row->{data} = decode_json( $row->{data} ) ;
my $state = 0 ;
try {
$state = $params->{callback}
->( $self, $qname, $row, $params->{callback_params} ) ;
}
catch {
warn "process_failures: error in callback $@" ;
} ;
# we don't do anything else with the record, we assume that the callback
# function will have done something like delete it or re-write it
}
return $processed_count ;
}
sub process_deadletters{
my $self = shift ;
return $self->process_failures( $self, @_) ;
}
# -----------------------------------------------------------------------------
=head2 queue_size
Get the count of unprocessed TASK items in the queue
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcards allowed
=back
B<Example usage>
my $count = $queue->queue_size( queue => 'queue_name') ;
say "there are $count unprocessed items in the queue" ;
# queue size can manage wildcards
$queue->queue_size( queue => '/celestial/*') ;
=cut
sub queue_size
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "queue_size accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{_qtype} = MSG_TASK ;
return $self->_queue_size($params) ;
}
# -----------------------------------------------------------------------------
=head2 list_queues
Qbtains a list of all the queues used by this database
B<Example usage>
my $qlist = $queue->list_queues() ;
foreach my $q (@$qlist) {
say $q ;
}
=cut
sub list_queues
{
my $self = shift ;
my %ques ;
my $result = $self->_query_db(
sprintf( 'SELECT DISTINCT queue_name FROM %s_queue;',
$self->{prefix} )
) ;
if ( !$result->{error} ) {
%ques = map { $_->{queue_name} => 1 } @{ $result->{rows} } ;
}
$self->_set_queue_list( \%ques ) ;
return [ keys %ques ] ;
}
# -----------------------------------------------------------------------------
=head2 peek
Have a look at an unprocessed item in a TASK queue
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcards allowed
=item position
position in the queue you want to peek at (head/start) or (tail/end) - defaults to head
=item count
number of items to peek, defaults to 1, max is 100 (PEEK_MAX)
=back
B<Returns>
Hashref with the following fields queue_name added activates expires data
B<Example usage>
my $data = $queue->peek( queue => 'queue_name', position => 'head') ;
=cut
sub peek
lib/App/Basis/Queue.pm view on Meta::CPAN
WHERE queue_name LIKE ?
AND processed = 0
AND process_failure = 0
AND msg_type = ?
AND expires > ?
ORDER BY counter $direction
LIMIT ?;", $self->{prefix}
) ;
my $expires = _parse_datetime( time() ) ;
my $info
= $self->_query_db( $sql, [ $qname, MSG_TASK, $expires, $params->{count} ] ) ;
# if there are no items found return
# if ( !scalar( $info->{rows} ) ) {
if ( !$info->{row_count} ) {
return wantarray ? () : undef ;
}
my @data ;
foreach my $row ( @{ $info->{rows} } ) {
$row->{data} = decode_json( $row->{data} ) ;
# remove things we do not want to share
foreach my $f (
qw(counter id msg_type persist processed processor process_start processing_time process_failure)
) {
delete $row->{$f} ;
}
CORE::push @data, $row ;
}
return wantarray ? @data : $data[0] ;
}
# -----------------------------------------------------------------------------
=head2 stats
Obtains stats about the task data in the queue, this may be time/processor intensive
so use with care!
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcards allowed
=back
provides counts of unprocessed, processed, failures
max process_failure, avg process_failure, earliest_added, latest_added,
min_data_size, max_data_size, avg_data_size, total_records
avg_elapsed, max_elapsed, min_elapsed
B<Example usage>
my $stats = $queue->stats( queue => 'queue_name') ;
say "processed $stats->{processed}, failures $stats->{failure}, unprocessed $stats->{unprocessed}" ;
# for all matching wildcard queues
my $all_stats = $queue->stats( queue => '/celestial/*') ;
=cut
sub stats
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
my $expires = _parse_datetime( time() ) ;
if ( ref($params) ne 'HASH' ) {
warn "stats accepts a hash or a hashref of parameters" ;
return {} ;
}
$params->{queue} ||= $self->{default_queue} ;
my $qname = $params->{queue} ;
my %all_stats = () ;
# update queue list
$self->list_queues() ;
# switch to SQL wildcard
$qname =~ s/%/*/g ;
# work through all the queues and only count that match our qname
foreach my $q ( keys %{ $self->{_queue_list} } ) {
next if ( !$self->_valid_qname($q) ) ;
next if ( ( $qname =~ /\*/ && $qname !~ $q ) || $qname ne $q ) ;
# queue_size also calls list_queues, so we don't need to do it!
$all_stats{unprocessed} += $self->queue_size( queue => $q ) ;
my $sql = sprintf(
"SELECT count(*) as count
FROM %s_queue
WHERE queue_name = ?
AND msg_type = ?
AND expires > ?
AND processed = 1 ;", $self->{prefix}
) ;
my $resp = $self->_query_db( $sql, [ $q, MSG_TASK, $expires ] ) ;
$all_stats{processed} += $resp->{rows}->[0]->{count} || 0 ;
$sql = sprintf(
"SELECT count(*) as count FROM %s_queue
WHERE queue_name = ?
AND processed = 0
AND msg_type = ?
AND expires > ?
AND process_failure = 1 ;", $self->{prefix}
) ;
$resp = $self->_query_db( $sql, [ $q, MSG_TASK, $expires ] ) ;
$all_stats{failures} += $resp->{rows}->[0]->{count} || 0 ;
}
# get all the stats for all matching queues
my $sql = sprintf(
"SELECT
lib/App/Basis/Queue.pm view on Meta::CPAN
# qmatch is the name of the queue matcher
$state = $subs->{callback}
->( $self, $row->{queue_name}, $row ) ;
if ( $row->{added} gt $subs->{after} ) {
$subs->{after} = $row->{added} ;
}
if ( $row->{counter} > $subs->{counter} ) {
$subs->{counter} = $row->{counter} ;
}
}
catch {
warn "listen: error in callback $@" ;
} ;
}
}
$started = 1 ;
last
if ( $params->{events}
&& $self->{ev_count} > $params->{events} ) ;
last
if ( $params->{datetime}
&& time() > $params->{datetime} ) ;
# wait a bit to allow the queues to fillup
sleep($delay) ;
}
return $self->{ev_count} ;
}
# -----------------------------------------------------------------------------
=head2 unsubscribe
Unsubscribe from a named queue.
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcard allowed
=back
B<Example usage>
sub handler {
state $counter = 0 ;
my $q = shift ; # we get the queue object
# the queue trigger that matched, the actual queue name and the data
my ($qmatch, $queue, $data) = @_ ;
# we are only interested in 10 messages
if( ++$counter > 10) {
$q->unsubscribe( queue => $queue) ;
} else {
say Data::Dumper( $data) ;
}
}
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
$queue->subscribe( queue => '/logs/*', callback => \&handler) ;
$queue->listen() ;
=cut
sub unsubscribe
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "unsubscribe accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
if ( $params->{queue} ) {
# does not matter if the queue name does not exist!
delete $self->{subscriptions}->{ $params->{queue} } ;
}
}
# -----------------------------------------------------------------------------
=head2 purge_tasks
Purge will remove all processed task items and failures/deadletters (process_failure >= 5).
These are completely removed from the database
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcard allowed
=item before (optional)
Unix epoch or parsable datetime before which items should be purged
defaults to 'now'
=back
B<Example usage>
my $before = $queue->stats( queue => 'queue_name', before => '2015-11-24') ;
$queue->purge_tasks( queue => 'queue_name') ;
my $after = $queue->stats( queue => 'queue_name') ;
say "removed " .( $before->{total_records} - $after->{total_records}) ;
=cut
sub purge_tasks
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "purge_tasks accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
my $qname = $params->{queue} ;
# SQL wildcard replace
$qname =~ s/\*/%/g ;
try {
if ( !defined $params->{before} ) {
$params->{before} = _parse_datetime( time() ) ;
} else {
$params->{before} = _parse_datetime( $params->{before} ) ;
}
}
catch {
warn(
"this does not look like a datetime value I can use: '$params->{before}'"
) ;
$params->{before} = _parse_datetime( time() ) ;
} ;
# TODO: add in expired items too, plus the and processed=1 or process_failure =1 looks a bit wrong
my $sql = "WHERE queue_name LIKE ?
AND processed = 1
OR process_failure = 1
AND msg_type = ?
AND added <= ?" ;
my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
$sql, [ $qname, MSG_TASK, $params->{before} ] ) ;
# return the number of items deleted
return $resp->{row_count} ;
}
# -----------------------------------------------------------------------------
=head2 purge_chatter
purge will remove all chatter messages.
These are completely removed from the database
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcard allowed
=item before (optional)
Unix epoch or parsable datetime before which items should be purged
defaults to 'now'
=back
B<Example usage>
my $del = $queue->purge_chatter( queue => 'queue_name', before => '2015-11-24') ;
say "removed $del messages" ;
=cut
sub purge_chatter
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "purge_chatter accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
my $qname = $params->{queue} ;
# SQL wildcard replace
$qname =~ s/\*/%/g ;
my $sql = "WHERE queue_name LIKE ?
AND processed = 1
OR process_failure = 1
AND msg_type = ?
AND added <= ?" ;
my $sql_args = [ $qname, MSG_CHATTER, $params->{before} ] ;
if ( defined $params->{counter} ) {
my $sql = "WHERE queue_name LIKE ?
AND processed = 1
OR process_failure = 1
AND msg_type = ?
AND counter <= ?" ;
my $sql_args = [ $qname, MSG_CHATTER, $params->{counter} ] ;
} else {
try {
if ( !defined $params->{before} ) {
$params->{before} = _parse_datetime( time() ) ;
} else {
$params->{before} = _parse_datetime( $params->{before} ) ;
}
}
catch {
warn(
"this does not look like a datetime value I can use: '$params->{before}'"
) ;
$params->{before} = _parse_datetime( time() ) ;
} ;
}
my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
$sql, $sql_args ) ;
# return the number of items deleted
return $resp->{row_count} ;
}
# -----------------------------------------------------------------------------
=head2 remove_queue
Remove a queue and all of its records (task and chatter)
B<Parameters>
Takes a hash of
=over
=item queue
Name of the queue, wildcards allowed
=back
B<Example usage>
$queue->remove_queue( queue => 'queue_name') ;
my $after = $queue->list_queues() ;
# convert list into a hash for easier checking
my %a = map { $_ => 1} @after ;
say "queue removed" if( !$q->{queue_name}) ;
=cut
sub remove_queue
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "remove_queue accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
my $qname = $params->{queue} ;
# SQL wildcard replace
$qname =~ s/\*/%/g ;
my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
"WHERE queue_name LIKE ?", [$qname] ) ;
return $resp->{success} ;
}
# -----------------------------------------------------------------------------
=head2 reset_failures, reset_deadletters
Clear any process_failure values from all unprocessed task items
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcard allowed
=back
B<Example usage>
my $before = $queue->stats( queue => 'queue_name') ;
$queue->reset_failures( queue => 'queue_name') ;
my $after = $queue->stats( queue => 'queue_name') ;
say "reset " .( $after->{unprocessed} - $before->{unprocessed}) ;
=cut
sub reset_failures
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "reset_failures accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
my $qname = $params->{queue} ;
# SQL wildcard replace
$qname =~ s/\*/%/g ;
my $sql = "SET process_failure=0" ;
$sql .= " WHERE queue_name LIKE ?
AND process_failure = 1
AND msg_type = ?" ;
my $resp = $self->_update_db( $self->{prefix} . "_queue",
$sql, [ $qname, MSG_TASK ] ) ;
return $resp->{row_count} ? $resp->{row_count} : 0 ;
}
sub reset_deadletters {
my $self = shift ;
return $self->reset_failures( $self, @_) ;
}
# -----------------------------------------------------------------------------
=head2 remove_failures, remove_deadletters
Permanently delete task failures from the database
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcard allowed
=back
B<Example usage>
$queue->remove_failues( queue => 'queue_name') ;
my $stats = $queue->stats( queue => 'queue_name') ;
say "failues left " .( $stats->{failures}) ;
=cut
sub remove_failures
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "remove_failures accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
my $qname = $params->{queue} ;
# SQL wildcard replace
$qname =~ s/\*/%/g ;
my $sql = "WHERE process_failure = 1 AND msg_type = ?" ;
my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
$sql, [MSG_TASK] ) ;
return $resp->{row_count} ;
}
sub remove_deadletters {
my $self = shift ;
return $self->reset_failures( $self, @_) ;
}
# -----------------------------------------------------------------------------
=head2 remove_tables
If you never need to use the database again, it can be completely removed
B<Example usage>
$queue_>remove_tables() ;
=cut
sub remove_tables
{
my $self = shift ;
my $sql = sprintf( 'DROP TABLE %s_queue;', $self->{prefix} ) ;
$self->_debug($sql) ;
$self->{dbh}->do($sql) ;
}
# -----------------------------------------------------------------------------
1 ;
( run in 0.693 second using v1.01-cache-2.11-cpan-d7a12ab2c7f )