AnyEvent-DBI-MySQL

 view release on metacpan or  search on metacpan

lib/AnyEvent/DBI/MySQL.pm  view on Meta::CPAN

package AnyEvent::DBI::MySQL;
use 5.010001;
use warnings;
use strict;
use utf8;
use Carp;

our $VERSION = 'v2.1.0';

## no critic(ProhibitMultiplePackages Capitalization ProhibitNoWarnings)

use base qw( DBI );
use AnyEvent;
use Scalar::Util qw( weaken );

my @DATA;
my @NEXT_ID = ();
my $NEXT_ID = 0;
my $PRIVATE = 'private_' . __PACKAGE__;
my $PRIVATE_async = "$PRIVATE/async";

# Force connect_cached() but with unique key in $attr - this guarantee
# cached $dbh will be reused only after they no longer in use by user.
# Use {RootClass} instead of $class->connect_cached() because
# DBI->connect_cached() will call $class->connect() in turn.
sub connect { ## no critic(ProhibitBuiltinHomonyms)
    my ($class, $dsn, $user, $pass, $attr) = @_;
    local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
    my $id = @NEXT_ID ? pop @NEXT_ID : $NEXT_ID++;

    $attr //= {};
    $attr->{RootClass} = $class;
    $attr->{$PRIVATE} = $id;
    my $dbh = DBI->connect_cached($dsn, $user, $pass, $attr);
    return if !$dbh;

    # weaken cached $dbh to have DESTROY called when user stop using it
    my $cache = $dbh->{Driver}{CachedKids};
    for (grep {$cache->{$_} && $cache->{$_} == $dbh} keys %{$cache}) {
        weaken($cache->{$_});
    }

    weaken(my $weakdbh = $dbh);
    my $io_cb; $io_cb = sub {
        local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; warn "$msg\n" };
        my $data = $DATA[$id];
        my $cb = delete $data->{cb};
        my $h  = delete $data->{h};
        my $args=delete $data->{call_again};
        if ($cb && $h) {
            $cb->( $h->mysql_async_result, $h, $args // ());
        }
        else {
            $DATA[$id] = {};
            if ($weakdbh && $weakdbh->{mysql_auto_reconnect}) {
                $weakdbh->ping;         # initiate reconnect
                if ($weakdbh->ping) {   # check is reconnect was successful
                    $DATA[ $id ] = {
                        io => AnyEvent->io(
                            fh      => $weakdbh->mysql_fd,
                            poll    => 'r',
                            cb      => $io_cb,
                        ),
                    };
                }

lib/AnyEvent/DBI/MySQL.pm  view on Meta::CPAN

        }
        else {
            if ($args[$attr_idx]->{async}) {
                croak q{callback required};
            } else {
                $args[$attr_idx]->{async} = 0;
            }
        }

        return $dbh->$super(@args);
    };
}


package AnyEvent::DBI::MySQL::st;
use base qw( DBI::st );
use Carp;
use Scalar::Util qw( weaken );

sub execute {
    my ($sth, @args) = @_;
    local $SIG{__WARN__} = sub { (my $msg=shift)=~s/ at .*//ms; carp $msg };
    my $data = $DATA[ $sth->{$PRIVATE} ];
    my $ref = ref $args[-1];
    if ($ref eq 'CODE' || $ref eq 'AnyEvent::CondVar') {
        if ($data->{cb}) {
            croak q{can't make more than one asynchronous query simultaneously};
        }
        $data->{cb} = pop @args;
        $data->{h} = $sth;
        if (!$sth->{$PRIVATE_async}) {
            my $cb = delete $data->{cb};
            my $h  = delete $data->{h};
            $cb->( $sth->SUPER::execute(@args), $h );
            return;
        }
        $sth->SUPER::execute(@args);
        if ($sth->err) { # execute failed, I/O won't happens
            my $cb = delete $data->{cb};
            my $h  = delete $data->{h};
            my $args=delete $data->{call_again};
            $cb->( undef, $h, $args // () );
        }
        return;
    }
    elsif ($sth->{$PRIVATE_async}) {
        croak q{callback required};
    }
    return $sth->SUPER::execute(@args);
}


package AnyEvent::DBI::MySQL::st::ready;
use base qw( DBI::st );
sub execute { return '0E0' };


1; # Magic true value required at end of module
__END__

=encoding utf8

=head1 NAME

AnyEvent::DBI::MySQL - Asynchronous MySQL queries


=head1 VERSION

This document describes AnyEvent::DBI::MySQL version v2.1.0


=head1 SYNOPSIS

    use AnyEvent::DBI::MySQL;

    # get cached but not in use $dbh
    $dbh = AnyEvent::DBI::MySQL->connect(…);

    # async
    $dbh->do(…,                 sub { my ($rv, $dbh) = @_; … });
    $sth = $dbh->prepare(…);
    $sth->execute(…,            sub { my ($rv, $sth) = @_; … });
    $dbh->selectall_arrayref(…, sub { my ($ary_ref)  = @_; … });
    $dbh->selectall_hashref(…,  sub { my ($hash_ref) = @_; … });
    $dbh->selectcol_arrayref(…, sub { my ($ary_ref)  = @_; … });
    $dbh->selectrow_array(…,    sub { my (@row_ary)  = @_; … });
    $dbh->selectrow_arrayref(…, sub { my ($ary_ref)  = @_; … });
    $dbh->selectrow_hashref(…,  sub { my ($hash_ref) = @_; … });

    # sync
    $rv = $dbh->do('…');
    $dbh->do('…', {async=>0}, sub { my ($rv, $dbh) = @_; … });


=head1 DESCRIPTION

This module is an L<AnyEvent> user, you need to make sure that you use and
run a supported event loop.

This module implements asynchronous MySQL queries using
L<DBD::mysql/"ASYNCHRONOUS QUERIES"> feature. Unlike L<AnyEvent::DBI> it
doesn't spawn any processes.

You shouldn't use C<< {RaiseError=>1} >> with this module and should check
returned values in your callback to detect errors. This is because with
C<< {RaiseError=>1} >> exception will be thrown B<instead> of calling your
callback function, which isn't what you want in most cases.


=head1 INTERFACE 

The API is trivial: use it just like usual DBI, but instead of expecting
return value from functions which may block add one extra parameter: callback.
That callback will be executed with usual returned value of used method in
params (only exception is extra $dbh/$sth param in do() and execute() for
convenience).

=head2 SYNCHRONOUS QUERIES

In most cases to make usual synchronous query it's enough to don't provide



( run in 1.228 second using v1.01-cache-2.11-cpan-39bf76dae61 )