App-Basis-Queue
view release on metacpan or search on metacpan
t/02_pubsub.t view on Meta::CPAN
$result = $sth->fetchall_arrayref( {} ) ;
}
catch {} ;
return $result, $err ;
}
# -----------------------------------------------------------------------------
sub _rand_pub
{
my $c = int( rand(10) + 1 ) ;
$dbh->begin_work ;
for ( my $i = 0; $i < $c; $i++ ) {
my $q = "/pub/" . int( rand(3) + 1 ) ;
$queue->publish( queue => $q, data => { text => "item $i of $c" } ) ;
}
$dbh->commit ;
return $c ;
}
# -----------------------------------------------------------------------------
my $persist_counter = 0 ;
sub _dump_data
{
my $obj = shift ;
my ( $queue, $data ) = @_ ;
$persist_counter++ ;
# note "Dump data for $queue $data->{data}->{text}" ;
}
# -----------------------------------------------------------------------------
my $dumped_counter = 0 ;
sub _dump_data2
{
my $obj = shift ;
my ( $queue, $data ) = @_ ;
$dumped_counter++ ;
# note "Dump data2 ($dumped_counter) for $queue $data->{data}->{text}" ;
}
# ----------------------------------------------------------------------------
# Testing starts here
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# author testing can use sql, automated CPAN testing may not be able to
if ( $ENV{AUTHOR_TESTING} ) {
# $add_items = ( $add_items / 10 ) if ( $dsn =~ /SQLite/i );
# set PrintError off otherwise it will tell us that tables do not exist, we know that!
$dbh
= DBI->connect( $dsn, $user, $passwd,
{ RaiseError => 1, PrintError => 0, AutoCommit => 1 } )
or die "Could not connect to DB $dsn" ;
# diag "Testing against $dsn" ;
if ( $dsn =~ /SQLite/i ) {
$dbh->do("PRAGMA journal_mode = WAL") ;
$dbh->do("PRAGMA synchronous = NORMAL") ;
}
# -----------------------------------------------------------------------------
subtest "check clean start\n" => sub {
# remove all entries from the tables to make sure we are starting clean
my $table_name = $test_q . "_queue" ;
my ( $ret, $err ) = query_db( $dbh, "DROP TABLE $table_name;" ) ;
# check the table does not exist before we start
( $ret, $err )
= query_db( $dbh, "SELECT * from $table_name LIMIT 1;" ) ;
ok( !$ret && !$err, "Table $table_name does not exist" ) ;
} ;
# -----------------------------------------------------------------------------
subtest "create queue object\n" => sub {
$queue = App::Basis::Queue->new(
dbh => $dbh,
prefix => $test_q,
debug => $ENV{DEBUG}
) ;
# isa_ok( $queue, 'App::Basis::Queue' ) ;
# new should have created the various database tables, lets check if this is the case
# we know that the tables will start with $test_q and the be 'queue_names' and 'queue_info'
my $table_name = $test_q . "_queue" ;
my ( $ret, $err )
= query_db( $dbh, "SELECT * from $table_name LIMIT 1;" ) ;
ok( $ret, "Table $table_name exists" ) ;
my $queue_list = $queue->list_queues() ;
ok( !scalar(@$queue_list), 'No queues listed' ) ;
} ;
# -----------------------------------------------------------------------------
# lets publish some persistent items
subtest "publishing - with pauses" => sub {
my $time = time() ;
my $totals = 0 ;
sleep(1) ;
$totals += _rand_pub() ;
if ($queue->publish(
queue => '/pub/1',
data => { text => 'first persist on one' },
persist => 1
)
) {
$totals++ ;
}
sleep(1) ;
$totals += _rand_pub() ;
if ($queue->publish(
queue => '/pub/1',
data => { text => 'second persist on one' },
persist => 1
)
( run in 0.439 second using v1.01-cache-2.11-cpan-39bf76dae61 )