App-Basis-Queue

 view release on metacpan or  search on metacpan

lib/App/Basis/Queue.pm  view on Meta::CPAN

may be used in processing callback functions

B<Example usage>

    sub processing_callback {
        my ( $queue, $qname, $record, $params ) = @_;

        # allow partially failed (and failed) records to be processed
        if( $record->{process_failure) {
            $queue->reset_record( $record) ;
        }
        return 1 ;
    }

=cut

sub reset_record
{
    my $self = shift ;
    my ($data) = @_ ;

    my $sql = "SET process_failure=0
        WHERE id = ?
        AND queue_name = ?
        AND processed=0
        AND process_failure > 0
        AND msg_type = ?" ;
    my $resp = $self->_update_db( $self->{prefix} . "_queue",
        $sql, [ $data->{id}, $data->{queue_name}, MSG_TASK ] ) ;

    return $resp->{row_count} ;
}

# -----------------------------------------------------------------------------

=head2 publish

Publish some chatter data into a named queue.

B<Parameters>

Hash of

=over

=item queue

Name of the queue to publish to, wildcards B<NOT allowed>

=item data

Hashref of the data to be published

=item persist (optional)

Flag to show that this data data should be persisited (0 or 1).
This will become the only persistent record available until either it is replaced or expires.

=item expires (optional)

Time after which this data should be ignored. Accepts unix epoch time or parsable datetime string

=back

B<Example usage>

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;

    # keep track of a bit of info
    $queue->publish( queue => 'app_log',
        data => {
            ip => 12.12.12.12, session_id => 12324324345, client_id => 248296432984,
            appid => 2, app_name => 'twitter'
        }
    ) ;

=cut

sub publish
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "publish accepts a hash or a hashref of parameters" ;
        return 0 ;
    }
    $params->{type} = MSG_CHATTER ;

    # make sure this is a zero or one value
    $params->{persist} = defined $params->{persist} ;

    return $self->_add($params) ;
}

# -----------------------------------------------------------------------------
# find the most recent persistent item
# queue is the only parameter, returns arrayref of items

sub _recent_persist
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "_recent_persist accepts a hash or a hashref of parameters" ;
        return [] ;
    }
    my $qname = $params->{queue} ;
    my @data ;

    # if the queue does not exist
    return [] if ( !$self->_valid_qname($qname) ) ;

    # switch to SQL wildcard
    $qname =~ s/\*/%/g ;

    # find the most recent persistent items for each matching queue
    my $sql = sprintf(
        "SELECT * FROM %s_queue a
            WHERE a.queue_name LIKE ?
            AND a.msg_type = ?
            AND a.persist = ?
            AND a.counter NOT IN ( SELECT counter from %s_queue b
            WHERE b.queue_name = a.queue_name
                AND b.msg_type = a.msg_type
                AND b.persist = a.persist
                AND b.added > a.added
            )
            AND a.expires > ?
            GROUP BY queue_name
            ORDER BY queue_name;", $self->{prefix}, $self->{prefix}
    ) ;

    # there should only be one persist item
    my $expires = _parse_datetime( time() ) ;

    my $result
        = $self->_query_db( $sql, [ $qname, MSG_CHATTER, 1, $expires ] ) ;

    foreach my $row ( @{ $result->{rows} } ) {
        $row->{data} = decode_json( $row->{data} ) ;    # unpack the data
        CORE::push @data, $row ;
    }

    return \@data ;
}

# -----------------------------------------------------------------------------
# get chatter data (ordered by datetime added) after a unix time
# queue is the only parameter,
# returns arrayref of items

sub _recent_chatter
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "_recent_chatter accepts a hash or a hashref of parameters" ;
        return [] ;
    }

    my $qname = $params->{queue} ;
    my @data ;

    # if the queue does not exist
    return [] if ( !$self->_valid_qname($qname) ) ;

    # switch to SQL wildcard
    $qname =~ s/\*/%/g ;

    my $result ;
    my $sql ;

    if ( $params->{counter} ) {
        $sql = sprintf(
            "SELECT * FROM %s_queue
                    WHERE queue_name LIKE ?
                    AND msg_type = ?
                    AND counter > ?
                    AND expires > ?
                    GROUP BY queue_name
                    ORDER BY counter;", $self->{prefix}
        ) ;

        my $expires = _parse_datetime( time() ) ;
        $expires |= "" ;

        $result = $self->_query_db( $sql,
            [ $qname, MSG_CHATTER, $params->{counter}, $expires ] ) ;

    } else {
        # check by date
        $sql = sprintf(
            "SELECT * FROM %s_queue
                    WHERE queue_name LIKE ?
                    AND msg_type = ?
                    AND added >= ?
                    AND expires > ?
                    ORDER BY counter;", $self->{prefix}
        ) ;

        my $expires = _parse_datetime( time() ) ;
        $result = $self->_query_db( $sql,
            [ $qname, MSG_CHATTER, $params->{after} // 0, $expires ] ) ;
    }

    foreach my $row ( @{ $result->{rows} } ) {
        $row->{data} = decode_json( $row->{data} ) ;    # unpack the data
        CORE::push @data, $row ;
    }

    return \@data ;
}

# -----------------------------------------------------------------------------

=head2 subscribe

Subscribe to a named queue with a callback.

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcard allowed

=item callback

Coderef to handle any matched events

=item after (optional)

Unix time after which to listen for events, defaults to now,  if set will skip persistent item checks

=item persist (optional)

Include the most recent persistent item, if using a wild card, this will match all the queues and could find multiple persistent items

=back

B<Example usage>

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;

    # keep track of a bit of info
    $queue->subscribe( queue => 'app_logs/*', callback => \&handler) ;
    $queue->listen() ;

=cut

sub subscribe
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "subscribe accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;
    if ( !$params->{queue} ) {
        warn "subscribe needs a queue name to listen to" ;
        return 0 ;
    }

    if ( ref( $params->{callback} ) ne 'CODE' ) {
        warn "subscribe needs a callback handler to send events to" ;
        return 0 ;
    }

    # add to our current subscriptions

    if ( $params->{after} ) {

        # we cannot get recent persist item if they want to check after a date
        $params->{persist} = 0 ;
    }

    if ( !defined $params->{after} ) {
        $params->{after} = _std_datetime() ;
    } elsif ( $params->{after} =~ /^\d+$/ ) {
        $params->{after} = _std_datetime( gmtime( $params->{after} ) ) ;
    } elsif ( $params->{after} !~ /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$/ ) {
        warn(
            "this does not look like a datetime value I can use: '$params->{after}'"
        ) ;
        $params->{after} = _std_datetime() ;
    }

    $self->{subscriptions}->{ $params->{queue} } = {
        callback => $params->{callback},

        # when do we want events from
        after    => $params->{after},
        persist  => $params->{persist},
        ev_count => 0,
        counter  => 0
    } ;
}

# -----------------------------------------------------------------------------

=head2 listen

Listen to all subcribed channels. Loops forever unless told to stop.
If there is a persistent message in a queue, this will be passed to the callback before the other records.

B<Parameters>

Hash of

=over

=item events (optional)

Minimum number of events to listen for, stop after this many,  may stop after more - this is across ALL the subscriptions

=item datetime (optional)

Unix epoch time or parsable datetime when to stop listening

=item persist (optional)

Include the most recent persistent item, if subscribed using using a wild card,
this will match all the queues and could find multiple persistent items

=item listen_delay (optional)

Override the class delay, obtain events at this rate

=back

B<returns>

Number of chatter events actually passed to ALL the handlers

B<Example usage>

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;
    $queue->subscribe( '/logs/*', \&handler) ;
    $queue->listen() ;    # listening forever

    # or listen until christmas, checking every 30s
    $queue->subscribe( '/presents/*', \&handler) ;
    $queue->listen( datetime => '2015-12-25', listen_delay => 30) ;

=cut

sub listen
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;
    # decide where the delay comes from
    my $delay = $params->{listen_delay} || $self->{listen_delay} ;

    if ( ref($params) ne 'HASH' ) {
        warn "listen accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    if ( $params->{datetime} ) {
        my (@dt) = _parse_datetime( $params->{datetime} ) ;
        if ( $dt[1] ) {
            # used to check against time() later
            $params->{datetime} = $dt[1] ;
        }
    }

    if ( !keys %{ $self->{subscriptions} } ) {
        warn "you have not subscribed to any queues" ;
        return 0 ;
    }

    $self->{ev_count} = 0 ;

    # clean things up before we listen
    foreach my $qmatch ( sort keys %{ $self->{subscriptions} } ) {
        my $subs = $self->{subscriptions}->{$qmatch} ;
        $subs->{counter}  = 0 ;
        $subs->{ev_count} = 0 ;

lib/App/Basis/Queue.pm  view on Meta::CPAN


B<Example usage>

    sub handler {
        state $counter = 0 ;
        my $q = shift ;             # we get the queue object
        # the queue trigger that matched, the actual queue name and the data
        my ($qmatch, $queue, $data) = @_ ;

        # we are only interested in 10 messages
        if( ++$counter > 10) {
            $q->unsubscribe( queue => $queue) ;
        } else {
            say Data::Dumper( $data) ;
        }
    }

    my $queue = App::Basis::Queue->new( dbh => $dbh) ;
    $queue->subscribe( queue => '/logs/*', callback => \&handler) ;
    $queue->listen() ;

=cut

sub unsubscribe
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "unsubscribe accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;
    if ( $params->{queue} ) {

        # does not matter if the queue name does not exist!
        delete $self->{subscriptions}->{ $params->{queue} } ;
    }
}

# -----------------------------------------------------------------------------

=head2 purge_tasks

Purge will remove all processed task items and failures/deadletters (process_failure >= 5).
These are completely removed from the database

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcard allowed

=item before (optional)

Unix epoch or parsable datetime before which items should be purged

defaults to 'now'

=back

B<Example usage>

    my $before = $queue->stats( queue => 'queue_name', before => '2015-11-24') ;
    $queue->purge_tasks( queue => 'queue_name') ;
    my $after = $queue->stats( queue  => 'queue_name') ;

    say "removed " .( $before->{total_records} - $after->{total_records}) ;

=cut

sub purge_tasks
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "purge_tasks accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;
    my $qname = $params->{queue} ;

    # SQL wildcard replace
    $qname =~ s/\*/%/g ;

    try {
        if ( !defined $params->{before} ) {
            $params->{before} = _parse_datetime( time() ) ;
        } else {
            $params->{before} = _parse_datetime( $params->{before} ) ;
        }
    }
    catch {
        warn(
            "this does not look like a datetime value I can use: '$params->{before}'"
        ) ;
        $params->{before} = _parse_datetime( time() ) ;
    } ;

# TODO: add in expired items too, plus the and processed=1 or process_failure =1 looks a bit wrong
    my $sql = "WHERE queue_name LIKE ?
        AND processed = 1
        OR process_failure = 1
        AND msg_type = ?
        AND added <= ?" ;

    my $resp = $self->_delete_db_record( $self->{prefix} . "_queue",
        $sql, [ $qname, MSG_TASK, $params->{before} ] ) ;

    # return the number of items deleted
    return $resp->{row_count} ;
}

# -----------------------------------------------------------------------------

=head2 purge_chatter

purge will remove all chatter messages.
These are completely removed from the database

B<Parameters>

Hash of

=over

=item queue

Name of the queue, wildcard allowed

=item before (optional)

Unix epoch or parsable datetime before which items should be purged

defaults to 'now'

=back

B<Example usage>

    my $del = $queue->purge_chatter( queue => 'queue_name', before => '2015-11-24') ;

    say "removed $del messages" ;

=cut

sub purge_chatter
{
    my $self = shift ;
    my $params = @_ % 2 ? shift : {@_} ;

    if ( ref($params) ne 'HASH' ) {
        warn "purge_chatter accepts a hash or a hashref of parameters" ;
        return 0 ;
    }

    $params->{queue} ||= $self->{default_queue} ;
    my $qname = $params->{queue} ;

    # SQL wildcard replace
    $qname =~ s/\*/%/g ;

    my $sql = "WHERE queue_name LIKE ?
        AND processed = 1
        OR process_failure = 1
        AND msg_type = ?
        AND added <= ?" ;
    my $sql_args = [ $qname, MSG_CHATTER, $params->{before} ] ;

    if ( defined $params->{counter} ) {
        my $sql = "WHERE queue_name LIKE ?
        AND processed = 1
        OR process_failure = 1
        AND msg_type = ?
        AND counter <= ?" ;
        my $sql_args = [ $qname, MSG_CHATTER, $params->{counter} ] ;

    } else {
        try {
            if ( !defined $params->{before} ) {
                $params->{before} = _parse_datetime( time() ) ;
            } else {
                $params->{before} = _parse_datetime( $params->{before} ) ;
            }
        }
        catch {
            warn(
                "this does not look like a datetime value I can use: '$params->{before}'"
            ) ;
            $params->{before} = _parse_datetime( time() ) ;
        } ;
    }



( run in 1.537 second using v1.01-cache-2.11-cpan-39bf76dae61 )