App-Basis-Queue
view release on metacpan or search on metacpan
lib/App/Basis/Queue.pm view on Meta::CPAN
may be used in processing callback functions
B<Example usage>
sub processing_callback {
my ( $queue, $qname, $record, $params ) = @_;
# allow partially failed (and failed) records to be processed
if( $record->{process_failure) {
$queue->reset_record( $record) ;
}
return 1 ;
}
=cut
sub reset_record
{
my $self = shift ;
my ($data) = @_ ;
my $sql = "SET process_failure=0
WHERE id = ?
AND queue_name = ?
AND processed=0
AND process_failure > 0
AND msg_type = ?" ;
my $resp = $self->_update_db( $self->{prefix} . "_queue",
$sql, [ $data->{id}, $data->{queue_name}, MSG_TASK ] ) ;
return $resp->{row_count} ;
}
# -----------------------------------------------------------------------------
=head2 publish
Publish some chatter data into a named queue.
B<Parameters>
Hash of
=over
=item queue
Name of the queue to publish to, wildcards B<NOT allowed>
=item data
Hashref of the data to be published
=item persist (optional)
Flag to show that this data data should be persisited (0 or 1).
This will become the only persistent record available until either it is replaced or expires.
=item expires (optional)
Time after which this data should be ignored. Accepts unix epoch time or parsable datetime string
=back
B<Example usage>
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
# keep track of a bit of info
$queue->publish( queue => 'app_log',
data => {
ip => 12.12.12.12, session_id => 12324324345, client_id => 248296432984,
appid => 2, app_name => 'twitter'
}
) ;
=cut
sub publish
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "publish accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{type} = MSG_CHATTER ;
# make sure this is a zero or one value
$params->{persist} = defined $params->{persist} ;
return $self->_add($params) ;
}
# -----------------------------------------------------------------------------
# find the most recent persistent item
# queue is the only parameter, returns arrayref of items
sub _recent_persist
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "_recent_persist accepts a hash or a hashref of parameters" ;
return [] ;
}
my $qname = $params->{queue} ;
my @data ;
# if the queue does not exist
return [] if ( !$self->_valid_qname($qname) ) ;
# switch to SQL wildcard
$qname =~ s/\*/%/g ;
# find the most recent persistent items for each matching queue
my $sql = sprintf(
"SELECT * FROM %s_queue a
WHERE a.queue_name LIKE ?
AND a.msg_type = ?
AND a.persist = ?
AND a.counter NOT IN ( SELECT counter from %s_queue b
WHERE b.queue_name = a.queue_name
AND b.msg_type = a.msg_type
AND b.persist = a.persist
AND b.added > a.added
)
AND a.expires > ?
GROUP BY queue_name
ORDER BY queue_name;", $self->{prefix}, $self->{prefix}
) ;
# there should only be one persist item
my $expires = _parse_datetime( time() ) ;
my $result
= $self->_query_db( $sql, [ $qname, MSG_CHATTER, 1, $expires ] ) ;
foreach my $row ( @{ $result->{rows} } ) {
$row->{data} = decode_json( $row->{data} ) ; # unpack the data
CORE::push @data, $row ;
}
return \@data ;
}
# -----------------------------------------------------------------------------
# get chatter data (ordered by datetime added) after a unix time
# queue is the only parameter,
# returns arrayref of items
sub _recent_chatter
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "_recent_chatter accepts a hash or a hashref of parameters" ;
return [] ;
}
my $qname = $params->{queue} ;
my @data ;
# if the queue does not exist
return [] if ( !$self->_valid_qname($qname) ) ;
# switch to SQL wildcard
$qname =~ s/\*/%/g ;
my $result ;
my $sql ;
if ( $params->{counter} ) {
$sql = sprintf(
"SELECT * FROM %s_queue
WHERE queue_name LIKE ?
AND msg_type = ?
AND counter > ?
AND expires > ?
GROUP BY queue_name
ORDER BY counter;", $self->{prefix}
) ;
my $expires = _parse_datetime( time() ) ;
$expires |= "" ;
$result = $self->_query_db( $sql,
[ $qname, MSG_CHATTER, $params->{counter}, $expires ] ) ;
} else {
# check by date
$sql = sprintf(
"SELECT * FROM %s_queue
WHERE queue_name LIKE ?
AND msg_type = ?
AND added >= ?
AND expires > ?
ORDER BY counter;", $self->{prefix}
) ;
my $expires = _parse_datetime( time() ) ;
$result = $self->_query_db( $sql,
[ $qname, MSG_CHATTER, $params->{after} // 0, $expires ] ) ;
}
foreach my $row ( @{ $result->{rows} } ) {
$row->{data} = decode_json( $row->{data} ) ; # unpack the data
CORE::push @data, $row ;
}
return \@data ;
}
# -----------------------------------------------------------------------------
=head2 subscribe
Subscribe to a named queue with a callback.
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcard allowed
=item callback
Coderef to handle any matched events
=item after (optional)
Unix time after which to listen for events, defaults to now, if set will skip persistent item checks
=item persist (optional)
Include the most recent persistent item, if using a wild card, this will match all the queues and could find multiple persistent items
=back
B<Example usage>
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
# keep track of a bit of info
$queue->subscribe( queue => 'app_logs/*', callback => \&handler) ;
$queue->listen() ;
=cut
sub subscribe
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "subscribe accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
if ( !$params->{queue} ) {
warn "subscribe needs a queue name to listen to" ;
return 0 ;
}
if ( ref( $params->{callback} ) ne 'CODE' ) {
warn "subscribe needs a callback handler to send events to" ;
return 0 ;
}
# add to our current subscriptions
if ( $params->{after} ) {
# we cannot get recent persist item if they want to check after a date
$params->{persist} = 0 ;
}
if ( !defined $params->{after} ) {
$params->{after} = _std_datetime() ;
} elsif ( $params->{after} =~ /^\d+$/ ) {
$params->{after} = _std_datetime( gmtime( $params->{after} ) ) ;
} elsif ( $params->{after} !~ /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$/ ) {
warn(
"this does not look like a datetime value I can use: '$params->{after}'"
) ;
$params->{after} = _std_datetime() ;
}
$self->{subscriptions}->{ $params->{queue} } = {
callback => $params->{callback},
# when do we want events from
after => $params->{after},
persist => $params->{persist},
ev_count => 0,
counter => 0
} ;
}
# -----------------------------------------------------------------------------
=head2 listen
Listen to all subcribed channels. Loops forever unless told to stop.
If there is a persistent message in a queue, this will be passed to the callback before the other records.
B<Parameters>
Hash of
=over
=item events (optional)
Minimum number of events to listen for, stop after this many, may stop after more - this is across ALL the subscriptions
=item datetime (optional)
Unix epoch time or parsable datetime when to stop listening
=item persist (optional)
Include the most recent persistent item, if subscribed using using a wild card,
this will match all the queues and could find multiple persistent items
=item listen_delay (optional)
Override the class delay, obtain events at this rate
=back
B<returns>
Number of chatter events actually passed to ALL the handlers
B<Example usage>
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
$queue->subscribe( '/logs/*', \&handler) ;
$queue->listen() ; # listening forever
# or listen until christmas, checking every 30s
$queue->subscribe( '/presents/*', \&handler) ;
$queue->listen( datetime => '2015-12-25', listen_delay => 30) ;
=cut
sub listen
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
# decide where the delay comes from
my $delay = $params->{listen_delay} || $self->{listen_delay} ;
if ( ref($params) ne 'HASH' ) {
warn "listen accepts a hash or a hashref of parameters" ;
return 0 ;
}
if ( $params->{datetime} ) {
my (@dt) = _parse_datetime( $params->{datetime} ) ;
if ( $dt[1] ) {
# used to check against time() later
$params->{datetime} = $dt[1] ;
}
}
if ( !keys %{ $self->{subscriptions} } ) {
warn "you have not subscribed to any queues" ;
return 0 ;
}
$self->{ev_count} = 0 ;
# clean things up before we listen
foreach my $qmatch ( sort keys %{ $self->{subscriptions} } ) {
my $subs = $self->{subscriptions}->{$qmatch} ;
$subs->{counter} = 0 ;
$subs->{ev_count} = 0 ;
lib/App/Basis/Queue.pm view on Meta::CPAN
B<Example usage>
sub handler {
state $counter = 0 ;
my $q = shift ; # we get the queue object
# the queue trigger that matched, the actual queue name and the data
my ($qmatch, $queue, $data) = @_ ;
# we are only interested in 10 messages
if( ++$counter > 10) {
$q->unsubscribe( queue => $queue) ;
} else {
say Data::Dumper( $data) ;
}
}
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
$queue->subscribe( queue => '/logs/*', callback => \&handler) ;
$queue->listen() ;
=cut
sub unsubscribe
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "unsubscribe accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
if ( $params->{queue} ) {
# does not matter if the queue name does not exist!
delete $self->{subscriptions}->{ $params->{queue} } ;
}
}
# -----------------------------------------------------------------------------
=head2 purge_tasks
Purge will remove all processed task items and failures/deadletters (process_failure >= 5).
These are completely removed from the database
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcard allowed
=item before (optional)
Unix epoch or parsable datetime before which items should be purged
defaults to 'now'
=back
B<Example usage>
my $before = $queue->stats( queue => 'queue_name', before => '2015-11-24') ;
$queue->purge_tasks( queue => 'queue_name') ;
my $after = $queue->stats( queue => 'queue_name') ;
say "removed " .( $before->{total_records} - $after->{total_records}) ;
=cut
sub purge_tasks
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "purge_tasks accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
my $qname = $params->{queue} ;
# SQL wildcard replace
$qname =~ s/\*/%/g ;
try {
if ( !defined $params->{before} ) {
$params->{before} = _parse_datetime( time() ) ;
} else {
$params->{before} = _parse_datetime( $params->{before} ) ;
}
}
catch {
warn(
"this does not look like a datetime value I can use: '$params->{before}'"
) ;
$params->{before} = _parse_datetime( time() ) ;
} ;
# TODO: add in expired items too, plus the and processed=1 or process_failure =1 looks a bit wrong
my $sql = "WHERE queue_name LIKE ?
AND processed = 1
OR process_failure = 1
AND msg_type = ?
AND added <= ?" ;
my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
$sql, [ $qname, MSG_TASK, $params->{before} ] ) ;
# return the number of items deleted
return $resp->{row_count} ;
}
# -----------------------------------------------------------------------------
=head2 purge_chatter
purge will remove all chatter messages.
These are completely removed from the database
B<Parameters>
Hash of
=over
=item queue
Name of the queue, wildcard allowed
=item before (optional)
Unix epoch or parsable datetime before which items should be purged
defaults to 'now'
=back
B<Example usage>
my $del = $queue->purge_chatter( queue => 'queue_name', before => '2015-11-24') ;
say "removed $del messages" ;
=cut
sub purge_chatter
{
my $self = shift ;
my $params = @_ % 2 ? shift : {@_} ;
if ( ref($params) ne 'HASH' ) {
warn "purge_chatter accepts a hash or a hashref of parameters" ;
return 0 ;
}
$params->{queue} ||= $self->{default_queue} ;
my $qname = $params->{queue} ;
# SQL wildcard replace
$qname =~ s/\*/%/g ;
my $sql = "WHERE queue_name LIKE ?
AND processed = 1
OR process_failure = 1
AND msg_type = ?
AND added <= ?" ;
my $sql_args = [ $qname, MSG_CHATTER, $params->{before} ] ;
if ( defined $params->{counter} ) {
my $sql = "WHERE queue_name LIKE ?
AND processed = 1
OR process_failure = 1
AND msg_type = ?
AND counter <= ?" ;
my $sql_args = [ $qname, MSG_CHATTER, $params->{counter} ] ;
} else {
try {
if ( !defined $params->{before} ) {
$params->{before} = _parse_datetime( time() ) ;
} else {
$params->{before} = _parse_datetime( $params->{before} ) ;
}
}
catch {
warn(
"this does not look like a datetime value I can use: '$params->{before}'"
) ;
$params->{before} = _parse_datetime( time() ) ;
} ;
}
( run in 1.537 second using v1.01-cache-2.11-cpan-39bf76dae61 )