AnyEvent-MySQL

 view release on metacpan or  search on metacpan

lib/AnyEvent/MySQL.pm  view on Meta::CPAN

    #
    #my $update_st;
    #
    #my $txh3; $txh3 = $dbh->begin_work(sub {
    #    warn "txn3 begin.. @_";
    #});
    #
    #    $update_st = $dbh->prepare("insert into t1 values (?,?)", sub {
    #        warn "prepare insert @_";
    #    });
    #    $update_st->execute(60, 60, { Tx => $txh3 }, sub {
    #        warn "insert 60 @_";
    #    });
    #
    #    $dbh->selectall_arrayref("select * from t1", { Tx => $txh3 }, sub {
    #        warn "select in txn3: ".Dumper($_[0]);
    #    });
    #
    #    $txh3->rollback(sub {
    #        warn "txh3 rollback @_";
    #    });
    #
    #    $dbh->selectall_arrayref("select * from t1", sub {
    #        warn "select out txn3: ".Dumper($_[0]);
    #    });

    #$st_all = $dbh->prepare("select `date`, `time`, `datetime`, `timestamp` from all_type", sub {
    #    warn "prepare st_all @_";
    #});
    #
    #$st_all->execute

    $end5->recv;

    my $readonly_dbh = AnyEvent::MySQL->connect("DBI:mysql:database=test;host=127.0.0.1;port=3306", "ptest", "pass", { ReadOnly => 1 }, sub {
      # ... we can only use "select" and "show" and "set names" command on this handle
    });

    $end->recv;

=cut

sub _empty_cb {}

=head2 $dbh = AnyEvent::MySQL->connect($data_source, $username, [$auth, [\%attr,]] $cb->($dbh, 1))

=cut
sub connect {
    shift;
    return AnyEvent::MySQL::db->new(@_);
}

package AnyEvent::MySQL::db;

use strict;
use warnings;

use AE;
use AnyEvent::Socket;
use AnyEvent::Handle;
use Scalar::Util qw(weaken dualvar);
use Guard;

# connection state
use constant {
    BUSY_CONN => 1,
    IDLE_CONN => 2,
    ZOMBIE_CONN => 3,
};

# transaction state
use constant {
    NO_TXN => 1,
    EMPTY_TXN => 2,
    CLEAN_TXN => 3,
    DIRTY_TXN => 4,
    DEAD_TXN => 5,
};

# transaction control token
use constant {
    TXN_TASK => 1,
    TXN_BEGIN => 2,
    TXN_COMMIT => 3,
    TXN_ROLLBACK => 4,
};

use constant {
    AUTHi => 0,
    ATTRi => 1,
    HDi => 2,
    CONNi => 9,
    ON_CONNi => 11,

    CONN_STATEi => 3,
    TXN_STATEi => 4,

    TASKi => 5,
    STi => 6,
    FALLBACKi => 10,

    ERRi => 7,
    ERRSTRi => 8,
};

sub _push_task {
    my($dbh, $task) = @_;
    push @{$dbh->{_}[TASKi]}, $task;
    _process_task($dbh) if( $dbh->{_}[CONN_STATEi]==IDLE_CONN );
}

sub _unshift_task {
    my($dbh, $task) = @_;
    unshift @{$dbh->{_}[TASKi]}, $task;
}

sub _report_error {
    my($dbh, $method, $error_num, $error_str) = @_;

    $dbh->{_}[ERRi] = $AnyEvent::MySQL::err = $error_num;
    $dbh->{_}[ERRSTRi] = $AnyEvent::MySQL::errstr = $error_str;
    warn "$dbh $method failed: $error_str ($error_num)\n" if( $dbh->{_}[ATTRi]{PrintError} );

    $dbh->{_}[TXN_STATEi] = DEAD_TXN if( $dbh->{_}[TXN_STATEi]!=NO_TXN );
}

sub _reconnect {
    my $dbh = shift;
    $dbh->{_}[CONN_STATEi] = BUSY_CONN;
    my $retry; $retry = AE::timer .1, 0, sub {
        undef $retry;
        _connect($dbh);
    };
}

sub _connect {
    my $dbh = shift;
    my $cb = $dbh->{_}[ON_CONNi] || \&AnyEvent::MySQL::_empty_cb;
    $dbh->{_}[CONN_STATEi] = BUSY_CONN;

    my $param = $dbh->{Name};
    my $database;
    if( index($param, '=')>=0 ) {
        $param = {
            map { split /=/, $_, 2 } split /;/, $param
        };
        if( $param->{host} =~ /(.*):(.*)/ ) {
            $param->{host} = $1;
            $param->{port} = $2;
        }
    }
    else {
        $param = { database => $param };
    }

    $param->{port} ||= 3306;

    if( $param->{host} eq '' || $param->{host} eq 'localhost' ) { # unix socket
        my $sock = $param->{mysql_socket} || `mysql_config --socket`;
        if( !$sock ) {
            _report_error($dbh, 'connect', 2002, "Can't connect to local MySQL server through socket ''");
            $cb->();
            return;
        }
        $param->{host} = '/unix';
        $param->{port} = $sock;
    }

    warn "Connecting to $param->{host}:$param->{port} ...";
    weaken( my $wdbh = $dbh );
    $dbh->{_}[CONNi] = tcp_connect($param->{host}, $param->{port}, sub {
        my $fh = shift;
        if( !$fh ) {
            warn "Connect to $param->{host}:$param->{port} fail: $!  retry later.";
            undef $wdbh->{_}[CONNi];

            _reconnect($wdbh);
            return;
        }
        warn "Connected ($param->{host}:$param->{port})";

        $wdbh->{_}[HDi] = AnyEvent::Handle->new(
            fh => $fh,
            on_error => sub {
                return if !$wdbh;

                my $wwdbh = $wdbh;
                if( $_[1] ) {
                    warn "Disconnected from $param->{host}:$param->{port} by $_[2]  reconnect later.";
                    undef $wwdbh->{_}[HDi];
                    undef $wwdbh->{_}[CONNi];
                    $wwdbh->{_}[CONN_STATEi] = IDLE_CONN;
                    _report_error($wwdbh, '', 2013, 'Lost connection to MySQL server during query');
                    if( $wwdbh->{_}[FALLBACKi] ) {
                        $wwdbh->{_}[FALLBACKi]();
                    }
                }
            },
        );

        AnyEvent::MySQL::Imp::do_auth($wdbh->{_}[HDi], $wdbh->{Username}, $wdbh->{_}[AUTHi], $param->{database}, sub {
            my($success, $err_num_and_msg, $thread_id) = @_;
            return if !$wdbh;
            if( $success ) {
                $wdbh->{mysql_thread_id} = $thread_id;
                $cb->($wdbh, guard {
                    _process_task($wdbh) if $wdbh;
                });
            }
            else {
                warn "MySQL auth error: $err_num_and_msg  retry later.";
                undef $wdbh->{_}[HDi];
                undef $wdbh->{_}[CONNi];
                _reconnect($wdbh) if $wdbh;
            }
        });
    });
}

sub _process_task {
    my $dbh = shift;
    $dbh->{_}[CONN_STATEi] = IDLE_CONN;
    $dbh->{_}[ERRi] = $AnyEvent::MySQL::err = undef;
    $dbh->{_}[ERRSTRi] = $AnyEvent::MySQL::errstr = undef;
    $dbh->{_}[FALLBACKi] = undef;
    weaken( my $wdbh = $dbh );

    if( !$dbh->{_}[HDi] ) {
        _reconnect($dbh);
        return;
    }

    my $task = shift @{$dbh->{_}[TASKi]};
    return if( !$task );

    my $next = sub {
        _process_task($wdbh) if $wdbh;
    };

    $dbh->{_}[FALLBACKi] = sub {
        undef $dbh->{_}[FALLBACKi];
        if( $dbh->{_}[TXN_STATEi]==NO_TXN && $task->[3]<5 ) {
            ++$task->[3];
            warn "redo the task later.. ($task->[3])";
            unshift @{$dbh->{_}[TASKi]}, $task;
        }
        else {
            $task->[2]();
        }
        _reconnect($dbh);
    };
    if( $task->[0]==TXN_TASK ) {
        if( $dbh->{_}[TXN_STATEi]==DEAD_TXN ) {
            _report_error($dbh, 'process_task', 1402, 'Transaction branch dead');
            $task->[2]();
            _process_task($dbh);
        }
        else {
            $dbh->{_}[TXN_STATEi] = DIRTY_TXN if( $dbh->{_}[TXN_STATEi]!=NO_TXN );
            $dbh->{_}[CONN_STATEi] = BUSY_CONN;
            $task->[1](sub {
                if( $dbh->{_}[TXN_STATEi]==DEAD_TXN && $dbh->{_}[HDi] ) {
                    _rollback($dbh, $next);
                }
                else {
                    $next->();
                }
            });
        }
    }
    elsif( $task->[0]==TXN_BEGIN ) {
        if( $dbh->{_}[TXN_STATEi]==NO_TXN ) {
            $dbh->{_}[TXN_STATEi] = DEAD_TXN;
            $dbh->{_}[CONN_STATEi] = BUSY_CONN;
            $task->[1]($next);
        }
        elsif( $dbh->{_}[TXN_STATEi]==EMPTY_TXN ) {
            $task->[2]($next);
            _process_task($dbh);
        }
        else {
            warn "It's in a transaction already.. Abort the old one and begin the new one.";
            $dbh->{_}[CONN_STATEi] = BUSY_CONN;
            _rollback($dbh, sub {
                $task->[1]($next);
            });

lib/AnyEvent/MySQL.pm  view on Meta::CPAN

            };
            $next_act->();
        });
    }, $cb, 0]);
}

=head2 $dbh->selectrow_hashref($statement, [\%attr, [@bind_values,]], $cb->($hash_ref))

=cut
sub selectrow_hashref {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my($dbh, $statement, $attr, @bind_values) = @_;

    _push_task($dbh, [TXN_TASK, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                    $dbh->{mysql_insertid} = $_[2];
                    $cb->(undef);
                }
                elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                    _report_error($dbh, 'selectrow_hashref', $_[1], $_[3]);
                    $cb->(undef);
                }
                else {
                    if( $_[2][0] ) {
                        my %record;
                        for(my $j=$#{$_[2][0]}; $j>=0; --$j) {
                            $record{$_[1][$j][4]} = $_[2][0][$j];
                        }
                        $cb->(\%record);
                    }
                    else {
                        $cb->(undef);
                    }
                }
            };
            $next_act->();
        });
    }, $cb, 0]);
}

=head2 $sth = $dbh->prepare($statement, [$cb->($sth)])

    $cb will be called each time when this statement is prepared
    (or re-prepared when the db connection is reconnected)

    if the preparation is not success,
    the $sth in the $cb's arg will be undef.

    So you should NOT rely on this for work flow controlling.

=cut
sub prepare {
    my $dbh = $_[0];

    my $sth = AnyEvent::MySQL::st->new(@_);
    push @{$dbh->{_}[STi]}, $sth;
    weaken($dbh->{_}[STi][-1]);
    return $sth;
}

=head2 $dbh->begin_work([$cb->($rv)])

=cut
sub begin_work {
    my $dbh = shift;
    my $cb = shift || \&AnyEvent::MySQL::_empty_cb;

    _push_task($dbh, [TXN_BEGIN, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'begin');
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                    $dbh->{_}[TXN_STATEi] = EMPTY_TXN;
                    $cb->(1);
                }
                else {
                    if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                        _report_error($dbh, 'begin_work', $_[1], $_[3]);
                    }
                    else {
                        _report_error($dbh, 'begin_work', 2000, "Unexpected result: $_[0]");
                    }
                    $cb->();
                }
            };
            $next_act->();
        });
    }, $cb, 0]);
}

=head2 $dbh->commit([$cb->($rv)])

=cut
sub commit {
    my $dbh = shift;
    my $cb = shift || \&AnyEvent::MySQL::_empty_cb;

    _push_task($dbh, [TXN_COMMIT, sub {
        my $next_act = shift;

        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'commit');
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                    $dbh->{_}[TXN_STATEi] = NO_TXN;
                    $cb->(1);
                }
                else {
                    if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                        _report_error($dbh, 'commit', $_[1], $_[3]);
                    }
                    else {
                        _report_error($dbh, 'commit', 2000, "Unexpected result: $_[0]");
                    }
                    $cb->();
                }

lib/AnyEvent/MySQL.pm  view on Meta::CPAN


        _rollback($dbh, $next_act, sub {
            $dbh->{_}[TXN_STATEi] = NO_TXN if( $_[0] );
            &$cb;
        });
    }, $cb, 0]);
}
sub _rollback {
    my($dbh, $next_act, $cb) = @_;

    AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'rollback');
    AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
        eval {
            if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                $cb->(1) if $cb;
            }
            else {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                    _report_error($dbh, 'rollback', $_[1], $_[3]);
                }
                else {
                    _report_error($dbh, 'rollback', 2000, "Unexpected result: $_[0]");
                }
                $cb->() if $cb;
            }
        };
        $next_act->();
    });
}

=head2 $dbh->ping(sub {my $alive = shift;});

=cut

sub ping {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my ($dbh) = @_;

    _push_task($dbh, [TXN_TASK, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_PING);
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if ($_[0]==AnyEvent::MySQL::Imp::RES_OK) {
                    $cb->(1);
                }
                else {
                    $cb->(0);
                }
            };
            $next_act->();
        });
    }, $cb, 0]);
}

package AnyEvent::MySQL::st;

use strict;
use warnings;

use Scalar::Util qw(weaken);

use constant {
    DBHi => 0,
    IDi => 1,
    PARAMi => 2,
    FIELDi => 3,
    STATEMENTi => 4,
};

=head2 $sth = AnyEvent::MySQL::st->new($dbh, $statement, [$cb->($sth)])

=cut
sub new {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my($class, $dbh, $statement) = @_;
    my $sth = bless [], $class;
    $sth->[DBHi] = $dbh;
    $sth->[STATEMENTi] = $statement;

    return $sth;
}

=head2 $sth->execute(@bind_values, [\%attr,] [$cb->($fth/$rv)])

=cut
sub execute {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my $attr = ref($_[-1]) eq 'HASH' ? pop : {};
    my($sth, @bind_values) = @_;
    my $dbh = $sth->[DBHi];


    AnyEvent::MySQL::db::_push_task($dbh, [AnyEvent::MySQL::db::TXN_TASK, sub {
        my $next_act = shift;

        my $execute = sub {
            AnyEvent::MySQL::Imp::do_execute_param($dbh->{_}[AnyEvent::MySQL::db::HDi], $sth->[IDi], \@bind_values, $sth->[PARAMi]);
            AnyEvent::MySQL::Imp::recv_response($dbh->{_}[AnyEvent::MySQL::db::HDi], execute => 1, sub {
                eval {
                    if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                        $cb->($_[1]);
                    }
                    elsif( $_[0]==AnyEvent::MySQL::Imp::RES_RESULT ) {
                        $cb->(AnyEvent::MySQL::ft->new($sth->[FIELDi], $_[2]));
                    }
                    elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                        AnyEvent::MySQL::db::_report_error($dbh, 'execute', $_[1], $_[3]);
                        $cb->();
                    }
                    else {
                        AnyEvent::MySQL::db::_report_error($dbh, 'execute', 2000, "Unknown response: $_[0]");
                        $cb->();
                    }
                };
                $next_act->();
            });
        };

        if( $sth->[IDi] ) {
            $execute->();



( run in 0.511 second using v1.01-cache-2.11-cpan-39bf76dae61 )