AnyEvent-DBI-MySQL
view release on metacpan or search on metacpan
lib/AnyEvent/DBI/MySQL.pm view on Meta::CPAN
package AnyEvent::DBI::MySQL;
use 5.010001;
use warnings;
use strict;
use utf8;
use Carp;
our $VERSION = 'v2.1.0';
## no critic(ProhibitMultiplePackages Capitalization ProhibitNoWarnings)
use base qw( DBI );
use AnyEvent;
use Scalar::Util qw( weaken );
my @DATA;
my @NEXT_ID = ();
my $NEXT_ID = 0;
my $PRIVATE = 'private_' . __PACKAGE__;
my $PRIVATE_async = "$PRIVATE/async";
# Force connect_cached() but with unique key in $attr - this guarantee
# cached $dbh will be reused only after they no longer in use by user.
# Use {RootClass} instead of $class->connect_cached() because
# DBI->connect_cached() will call $class->connect() in turn.
sub connect { ## no critic(ProhibitBuiltinHomonyms)
my ($class, $dsn, $user, $pass, $attr) = @_;
local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
my $id = @NEXT_ID ? pop @NEXT_ID : $NEXT_ID++;
$attr //= {};
$attr->{RootClass} = $class;
$attr->{$PRIVATE} = $id;
my $dbh = DBI->connect_cached($dsn, $user, $pass, $attr);
return if !$dbh;
# weaken cached $dbh to have DESTROY called when user stop using it
my $cache = $dbh->{Driver}{CachedKids};
for (grep {$cache->{$_} && $cache->{$_} == $dbh} keys %{$cache}) {
weaken($cache->{$_});
}
weaken(my $weakdbh = $dbh);
my $io_cb; $io_cb = sub {
local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; warn "$msg\n" };
my $data = $DATA[$id];
my $cb = delete $data->{cb};
my $h = delete $data->{h};
my $args=delete $data->{call_again};
if ($cb && $h) {
$cb->( $h->mysql_async_result, $h, $args // ());
}
else {
$DATA[$id] = {};
if ($weakdbh && $weakdbh->{mysql_auto_reconnect}) {
$weakdbh->ping; # initiate reconnect
if ($weakdbh->ping) { # check is reconnect was successful
$DATA[ $id ] = {
io => AnyEvent->io(
fh => $weakdbh->mysql_fd,
poll => 'r',
cb => $io_cb,
),
};
}
lib/AnyEvent/DBI/MySQL.pm view on Meta::CPAN
}
else {
if ($args[$attr_idx]->{async}) {
croak q{callback required};
} else {
$args[$attr_idx]->{async} = 0;
}
}
return $dbh->$super(@args);
};
}
package AnyEvent::DBI::MySQL::st;
use base qw( DBI::st );
use Carp;
use Scalar::Util qw( weaken );
sub execute {
my ($sth, @args) = @_;
local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
my $data = $DATA[ $sth->{$PRIVATE} ];
my $ref = ref $args[-1];
if ($ref eq 'CODE' || $ref eq 'AnyEvent::CondVar') {
if ($data->{cb}) {
croak q{can't make more than one asynchronous query simultaneously};
}
$data->{cb} = pop @args;
$data->{h} = $sth;
if (!$sth->{$PRIVATE_async}) {
my $cb = delete $data->{cb};
my $h = delete $data->{h};
$cb->( $sth->SUPER::execute(@args), $h );
return;
}
$sth->SUPER::execute(@args);
if ($sth->err) { # execute failed, I/O won't happens
my $cb = delete $data->{cb};
my $h = delete $data->{h};
my $args=delete $data->{call_again};
$cb->( undef, $h, $args // () );
}
return;
}
elsif ($sth->{$PRIVATE_async}) {
croak q{callback required};
}
return $sth->SUPER::execute(@args);
}
package AnyEvent::DBI::MySQL::st::ready;
use base qw( DBI::st );
sub execute { return '0E0' };
1; # Magic true value required at end of module
__END__
=encoding utf8
=head1 NAME
AnyEvent::DBI::MySQL - Asynchronous MySQL queries
=head1 VERSION
This document describes AnyEvent::DBI::MySQL version v2.1.0
=head1 SYNOPSIS
use AnyEvent::DBI::MySQL;
# get cached but not in use $dbh
$dbh = AnyEvent::DBI::MySQL->connect(â¦);
# async
$dbh->do(â¦, sub { my ($rv, $dbh) = @_; ⦠});
$sth = $dbh->prepare(â¦);
$sth->execute(â¦, sub { my ($rv, $sth) = @_; ⦠});
$dbh->selectall_arrayref(â¦, sub { my ($ary_ref) = @_; ⦠});
$dbh->selectall_hashref(â¦, sub { my ($hash_ref) = @_; ⦠});
$dbh->selectcol_arrayref(â¦, sub { my ($ary_ref) = @_; ⦠});
$dbh->selectrow_array(â¦, sub { my (@row_ary) = @_; ⦠});
$dbh->selectrow_arrayref(â¦, sub { my ($ary_ref) = @_; ⦠});
$dbh->selectrow_hashref(â¦, sub { my ($hash_ref) = @_; ⦠});
# sync
$rv = $dbh->do('â¦');
$dbh->do('â¦', {async=>0}, sub { my ($rv, $dbh) = @_; ⦠});
=head1 DESCRIPTION
This module is an L<AnyEvent> user, you need to make sure that you use and
run a supported event loop.
This module implements asynchronous MySQL queries using
L<DBD::mysql/"ASYNCHRONOUS QUERIES"> feature. Unlike L<AnyEvent::DBI> it
doesn't spawn any processes.
You shouldn't use C<< {RaiseError=>1} >> with this module and should check
returned values in your callback to detect errors. This is because with
C<< {RaiseError=>1} >> exception will be thrown B<instead> of calling your
callback function, which isn't what you want in most cases.
=head1 INTERFACE
The API is trivial: use it just like usual DBI, but instead of expecting
return value from functions which may block add one extra parameter: callback.
That callback will be executed with usual returned value of used method in
params (only exception is extra $dbh/$sth param in do() and execute() for
convenience).
=head2 SYNCHRONOUS QUERIES
In most cases to make usual synchronous query it's enough to don't provide
( run in 1.228 second using v1.01-cache-2.11-cpan-39bf76dae61 )