Data-Model
view release on metacpan or search on metacpan
lib/Data/Model/Driver/Queue/Q4M.pm view on Meta::CPAN
package Data::Model::Driver::Queue::Q4M;
use strict;
use warnings;
use base 'Data::Model::Driver::DBI';
use Carp ();
$Carp::Internal{(__PACKAGE__)}++;
sub timeout { $_[0]->{timeout} }
sub _create_arguments {
my $arg_length = scalar(@_);
my $timeout;
my %callbacks;
my @queue_tables;
for (my $i = 0; $i < $arg_length; $i++) {
my($table, $value) = ($_[$i], $_[$i + 1]);
if (ref($value) eq 'CODE') {
# register callback
push @queue_tables, $table;
$callbacks{$table} = $value;
} elsif ($table eq 'timeout' && $value =~ /\A[0-9]+\z/) {
# timeout
$timeout = $value;
}
$i++;
}
(\@queue_tables, \%callbacks, $timeout);
}
sub queue_wait {
my($self, $timeout, @tables) = @_;
my $dbh = $self->r_handle;
my $sql = sprintf 'SELECT queue_wait(%s)', join(', ', (('?') x (scalar(@tables) + 1)));
my $sth = $dbh->prepare_cached($sql);
# bind params
my $i = 1;
for my $table (@tables) {
$sth->bind_param($i++, $table, undef);
}
$sth->bind_param($i, $timeout, undef);
$sth->execute;
$sth->bind_columns(undef, \my $retcode);
my $rv = $sth->fetch;
$sth->finish;
undef $sth;
unless (defined $retcode) { # queue_wait return code is NULL is illegal table name
Carp::croak "no created queue table";
}
return 0 unless $rv && $retcode;
return $retcode;
}
sub queue_abort {
my $self = shift;
my $dbh = $self->r_handle;
my $sql = 'SELECT queue_abort()';
my $sth = $dbh->prepare($sql);
$sth->execute;
$self->{is_aborted} = 1;
}
sub queue_end {
my $self = shift;
my $dbh = $self->r_handle;
my $sql = 'SELECT queue_end()';
my $sth = $dbh->prepare($sql);
$sth->execute;
}
sub queue_running {
my($self, $c) = (shift, shift);
$self->{is_aborted} = 0;
my $arg_length = scalar(@_);
Carp::croak 'illegal parameter' if $arg_length % 2;
# create table attributes
my($queue_tables, $callbacks, $timeout) = _create_arguments(@_);
Carp::croak 'required is callback handler' unless @{ $queue_tables };
my %schema = map { $_ => 1 } $c->schema_names;
for my $table (@{ $queue_tables }) {
my($name) = split /:/, $table;
Carp::croak "'$name' is missing model name" unless $schema{$name};
}
$timeout ||= $self->timeout || 60;
# queue_wait
my $table_id = $self->queue_wait($timeout, @{ $queue_tables });
return unless $table_id;
# get record
my $running_table = $queue_tables->[$table_id - 1];
my($real_table) = split /:/, $running_table;
my($row) = $c->get( $real_table );
unless ($row) {
$self->queue_abort;
return;
}
# running callback
eval {
$callbacks->{$running_table}->($row);
};
if ($@) {
$self->queue_abort unless $self->{is_aborted};
die $@; # throwing exception
}
return if $self->{is_aborted};
$self->queue_end;
return $real_table;
}
# for schema
sub _as_sql_hook {
my $self = shift;
if ($_[1] eq 'get_table_attributes') {
my $ret = $self->dbd->_as_sql_hook(@_);
unless ($ret =~ s/(\A|\W)\s*ENGINE\s*=\s*\w+\s*(\z|\W)/${1}TYPE=QUEUE${2}/) {
$ret ||= 'ENGINE=QUEUE';
}
return $ret;
} else {
return $self->dbd->_as_sql_hook(@_);
}
}
1;
__END__
=head1 NAME
Data::Model::Driver::Queue::Q4M - Q4M manager for Data::Model
=head1 SYNOPSIS
use Data::Model::Driver::Queue::Q4M;
my $driver = Data::Model::Driver::Queue::Q4M->new(
dsn => 'dbi:mysql:database=test',
username => '',
password => '',
timeout => 60, # queue_wait timeout
);
{
package MyQueue;
use base 'Data::Model';
use Data::Model::Mixin modules => ['Queue::Q4M'];
use Data::Model::Schema;
base_driver $driver;
install_model smtp => schema {
column id
=> char => {};
column data
=> int => {};
};
( run in 0.617 second using v1.01-cache-2.11-cpan-39bf76dae61 )