App-Basis-Queue
view release on metacpan or search on metacpan
lib/App/Basis/Queue.pm view on Meta::CPAN
or die "Could not connect to DB $dsn" ;
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
# for a system that wants to know when servers have started
$queue->publish( queue => '/chat/helo', data => { host => 'abc, msg => 'helo world') ;
# in another process
use App::Basis::Queue;
my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
my $dbh = DBI->connect( $dsn, "", "",
{ RaiseError => 1, PrintError => 0, } )
or die "Could not connect to DB $dsn" ;
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
=head1 DESCRIPTION
Why have another queuing system? Well for me I wanted a queuing system that did not mean
I needed to install and maintain another server (ie RabbitMQ). Something that could run
against existing DBs (eg PostgreSQL). PGQ was an option, but as it throws away queued items
if there is not a listener, then this was useless! Some of the Job/Worker systems required you to create
classes and plugins to process the queue. Queue::DBI almost made the grade but only has one queue. Minon
maybe could do what was needed but I did not find it in time.
I need multiple queues plus new requirement queue wildcards!
So I created this simple/basic system. You need to expire items, clean the queue and do things like that by hand,
there is no automation. You process items in the queue in chunks, not via a nice iterator.
There is no queue polling per se you need to process the queue and try again when all are done,
there can only be one consumer of a record which is a good thing, if you cannot process an item it can be marked as
failed to be handled by a cleanup function you will need to create.
=head1 End of Life
I created this project mostly as a learning project, my requirements for what it does are changing which will
involve client/server operations, a shared cache and locking system for the task clients, so I am going to leave this
project parked and start something new
=head1 AUTHOR
kmulholland, moodfarm@cpan.org
=head1 See Also
L<Queue::DBI>, L<AnyMQ::Queue>, L<Minion>
=head1 API
=cut
package App::Basis::Queue ;
$App::Basis::Queue::VERSION = '000.600.100';
use 5.10.0 ;
use feature 'state' ;
use strict ;
use warnings ;
use Moo ;
use MooX::Types::MooseLike::Base qw/InstanceOf HashRef Str/ ;
use JSON ;
use Data::UUID ;
use Try::Tiny ;
use POSIX qw( strftime) ;
use Time::HiRes qw(gettimeofday tv_interval ) ;
use Date::Manip ;
# use Data::Printer ;
# extends "App::Basis::QueueBase" ;
# -----------------------------------------------------------------------------
use constant MSG_TASK => 'task' ;
use constant MSG_CHATTER => 'chatter' ;
use constant MSG_SIMPLE => 'simple' ;
use constant MAX_PROCESS_ITEMS => 100 ;
use constant FIVE_DAYS => 5 * 24 * 3600 ;
use constant MAX_EXPIRY_DATETIME => "3000-01-01 12:00 UTC" ;
use constant PEEK_MAX => 100 ;
# -----------------------------------------------------------------------------
## class initialisation
## instancation variables
# -----------------------------------------------------------------------------
has 'dbh' => (
is => 'ro',
isa => InstanceOf ['DBI::db']
) ;
has 'prefix' => (
is => 'ro',
isa => Str,
default => sub { 'qsdb' ; },
) ;
has 'debug' => (
is => 'rw',
default => sub { 0 ; },
writer => 'set_debug'
) ;
has 'skip_table_check' => (
is => 'ro',
default => sub { 0 ; },
) ;
has 'subscriptions' => (
is => 'ro',
init_arg => 0,
default => sub { {} },
) ;
# this is the number of events listened to
has 'ev_count' => (
is => 'ro',
init_arg => 0,
default => sub { {} },
) ;
# when listening for chatter events we will wait for this many seconds
# before trying again
( run in 0.610 second using v1.01-cache-2.11-cpan-39bf76dae61 )