AnyEvent-MySQL
view release on metacpan or search on metacpan
lib/AnyEvent/MySQL.pm view on Meta::CPAN
#
#my $update_st;
#
#my $txh3; $txh3 = $dbh->begin_work(sub {
# warn "txn3 begin.. @_";
#});
#
# $update_st = $dbh->prepare("insert into t1 values (?,?)", sub {
# warn "prepare insert @_";
# });
# $update_st->execute(60, 60, { Tx => $txh3 }, sub {
# warn "insert 60 @_";
# });
#
# $dbh->selectall_arrayref("select * from t1", { Tx => $txh3 }, sub {
# warn "select in txn3: ".Dumper($_[0]);
# });
#
# $txh3->rollback(sub {
# warn "txh3 rollback @_";
# });
#
# $dbh->selectall_arrayref("select * from t1", sub {
# warn "select out txn3: ".Dumper($_[0]);
# });
#$st_all = $dbh->prepare("select `date`, `time`, `datetime`, `timestamp` from all_type", sub {
# warn "prepare st_all @_";
#});
#
#$st_all->execute
$end5->recv;
my $readonly_dbh = AnyEvent::MySQL->connect("DBI:mysql:database=test;host=127.0.0.1;port=3306", "ptest", "pass", { ReadOnly => 1 }, sub {
# ... we can only use "select" and "show" and "set names" command on this handle
});
$end->recv;
=cut
sub _empty_cb {}
=head2 $dbh = AnyEvent::MySQL->connect($data_source, $username, [$auth, [\%attr,]] $cb->($dbh, 1))
=cut
sub connect {
shift;
return AnyEvent::MySQL::db->new(@_);
}
package AnyEvent::MySQL::db;
use strict;
use warnings;
use AE;
use AnyEvent::Socket;
use AnyEvent::Handle;
use Scalar::Util qw(weaken dualvar);
use Guard;
# connection state
use constant {
BUSY_CONN => 1,
IDLE_CONN => 2,
ZOMBIE_CONN => 3,
};
# transaction state
use constant {
NO_TXN => 1,
EMPTY_TXN => 2,
CLEAN_TXN => 3,
DIRTY_TXN => 4,
DEAD_TXN => 5,
};
# transaction control token
use constant {
TXN_TASK => 1,
TXN_BEGIN => 2,
TXN_COMMIT => 3,
TXN_ROLLBACK => 4,
};
use constant {
AUTHi => 0,
ATTRi => 1,
HDi => 2,
CONNi => 9,
ON_CONNi => 11,
CONN_STATEi => 3,
TXN_STATEi => 4,
TASKi => 5,
STi => 6,
FALLBACKi => 10,
ERRi => 7,
ERRSTRi => 8,
};
sub _push_task {
my($dbh, $task) = @_;
push @{$dbh->{_}[TASKi]}, $task;
_process_task($dbh) if( $dbh->{_}[CONN_STATEi]==IDLE_CONN );
}
sub _unshift_task {
my($dbh, $task) = @_;
unshift @{$dbh->{_}[TASKi]}, $task;
}
sub _report_error {
my($dbh, $method, $error_num, $error_str) = @_;
$dbh->{_}[ERRi] = $AnyEvent::MySQL::err = $error_num;
$dbh->{_}[ERRSTRi] = $AnyEvent::MySQL::errstr = $error_str;
warn "$dbh $method failed: $error_str ($error_num)\n" if( $dbh->{_}[ATTRi]{PrintError} );
$dbh->{_}[TXN_STATEi] = DEAD_TXN if( $dbh->{_}[TXN_STATEi]!=NO_TXN );
}
sub _reconnect {
my $dbh = shift;
$dbh->{_}[CONN_STATEi] = BUSY_CONN;
my $retry; $retry = AE::timer .1, 0, sub {
undef $retry;
_connect($dbh);
};
}
sub _connect {
my $dbh = shift;
my $cb = $dbh->{_}[ON_CONNi] || \&AnyEvent::MySQL::_empty_cb;
$dbh->{_}[CONN_STATEi] = BUSY_CONN;
my $param = $dbh->{Name};
my $database;
if( index($param, '=')>=0 ) {
$param = {
map { split /=/, $_, 2 } split /;/, $param
};
if( $param->{host} =~ /(.*):(.*)/ ) {
$param->{host} = $1;
$param->{port} = $2;
}
}
else {
$param = { database => $param };
}
$param->{port} ||= 3306;
if( $param->{host} eq '' || $param->{host} eq 'localhost' ) { # unix socket
my $sock = $param->{mysql_socket} || `mysql_config --socket`;
if( !$sock ) {
_report_error($dbh, 'connect', 2002, "Can't connect to local MySQL server through socket ''");
$cb->();
return;
}
$param->{host} = '/unix';
$param->{port} = $sock;
}
warn "Connecting to $param->{host}:$param->{port} ...";
weaken( my $wdbh = $dbh );
$dbh->{_}[CONNi] = tcp_connect($param->{host}, $param->{port}, sub {
my $fh = shift;
if( !$fh ) {
warn "Connect to $param->{host}:$param->{port} fail: $! retry later.";
undef $wdbh->{_}[CONNi];
_reconnect($wdbh);
return;
}
warn "Connected ($param->{host}:$param->{port})";
$wdbh->{_}[HDi] = AnyEvent::Handle->new(
fh => $fh,
on_error => sub {
return if !$wdbh;
my $wwdbh = $wdbh;
if( $_[1] ) {
warn "Disconnected from $param->{host}:$param->{port} by $_[2] reconnect later.";
undef $wwdbh->{_}[HDi];
undef $wwdbh->{_}[CONNi];
$wwdbh->{_}[CONN_STATEi] = IDLE_CONN;
_report_error($wwdbh, '', 2013, 'Lost connection to MySQL server during query');
if( $wwdbh->{_}[FALLBACKi] ) {
$wwdbh->{_}[FALLBACKi]();
}
}
},
);
AnyEvent::MySQL::Imp::do_auth($wdbh->{_}[HDi], $wdbh->{Username}, $wdbh->{_}[AUTHi], $param->{database}, sub {
my($success, $err_num_and_msg, $thread_id) = @_;
return if !$wdbh;
if( $success ) {
$wdbh->{mysql_thread_id} = $thread_id;
$cb->($wdbh, guard {
_process_task($wdbh) if $wdbh;
});
}
else {
warn "MySQL auth error: $err_num_and_msg retry later.";
undef $wdbh->{_}[HDi];
undef $wdbh->{_}[CONNi];
_reconnect($wdbh) if $wdbh;
}
});
});
}
sub _process_task {
my $dbh = shift;
$dbh->{_}[CONN_STATEi] = IDLE_CONN;
$dbh->{_}[ERRi] = $AnyEvent::MySQL::err = undef;
$dbh->{_}[ERRSTRi] = $AnyEvent::MySQL::errstr = undef;
$dbh->{_}[FALLBACKi] = undef;
weaken( my $wdbh = $dbh );
if( !$dbh->{_}[HDi] ) {
_reconnect($dbh);
return;
}
my $task = shift @{$dbh->{_}[TASKi]};
return if( !$task );
my $next = sub {
_process_task($wdbh) if $wdbh;
};
$dbh->{_}[FALLBACKi] = sub {
undef $dbh->{_}[FALLBACKi];
if( $dbh->{_}[TXN_STATEi]==NO_TXN && $task->[3]<5 ) {
++$task->[3];
warn "redo the task later.. ($task->[3])";
unshift @{$dbh->{_}[TASKi]}, $task;
}
else {
$task->[2]();
}
_reconnect($dbh);
};
if( $task->[0]==TXN_TASK ) {
if( $dbh->{_}[TXN_STATEi]==DEAD_TXN ) {
_report_error($dbh, 'process_task', 1402, 'Transaction branch dead');
$task->[2]();
_process_task($dbh);
}
else {
$dbh->{_}[TXN_STATEi] = DIRTY_TXN if( $dbh->{_}[TXN_STATEi]!=NO_TXN );
$dbh->{_}[CONN_STATEi] = BUSY_CONN;
$task->[1](sub {
if( $dbh->{_}[TXN_STATEi]==DEAD_TXN && $dbh->{_}[HDi] ) {
_rollback($dbh, $next);
}
else {
$next->();
}
});
}
}
elsif( $task->[0]==TXN_BEGIN ) {
if( $dbh->{_}[TXN_STATEi]==NO_TXN ) {
$dbh->{_}[TXN_STATEi] = DEAD_TXN;
$dbh->{_}[CONN_STATEi] = BUSY_CONN;
$task->[1]($next);
}
elsif( $dbh->{_}[TXN_STATEi]==EMPTY_TXN ) {
$task->[2]($next);
_process_task($dbh);
}
else {
warn "It's in a transaction already.. Abort the old one and begin the new one.";
$dbh->{_}[CONN_STATEi] = BUSY_CONN;
_rollback($dbh, sub {
$task->[1]($next);
});
lib/AnyEvent/MySQL.pm view on Meta::CPAN
};
$next_act->();
});
}, $cb, 0]);
}
=head2 $dbh->selectrow_hashref($statement, [\%attr, [@bind_values,]], $cb->($hash_ref))
=cut
sub selectrow_hashref {
my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
my($dbh, $statement, $attr, @bind_values) = @_;
_push_task($dbh, [TXN_TASK, sub {
my $next_act = shift;
AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
eval {
if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
$dbh->{mysql_insertid} = $_[2];
$cb->(undef);
}
elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
_report_error($dbh, 'selectrow_hashref', $_[1], $_[3]);
$cb->(undef);
}
else {
if( $_[2][0] ) {
my %record;
for(my $j=$#{$_[2][0]}; $j>=0; --$j) {
$record{$_[1][$j][4]} = $_[2][0][$j];
}
$cb->(\%record);
}
else {
$cb->(undef);
}
}
};
$next_act->();
});
}, $cb, 0]);
}
=head2 $sth = $dbh->prepare($statement, [$cb->($sth)])
$cb will be called each time when this statement is prepared
(or re-prepared when the db connection is reconnected)
if the preparation is not success,
the $sth in the $cb's arg will be undef.
So you should NOT rely on this for work flow controlling.
=cut
sub prepare {
my $dbh = $_[0];
my $sth = AnyEvent::MySQL::st->new(@_);
push @{$dbh->{_}[STi]}, $sth;
weaken($dbh->{_}[STi][-1]);
return $sth;
}
=head2 $dbh->begin_work([$cb->($rv)])
=cut
sub begin_work {
my $dbh = shift;
my $cb = shift || \&AnyEvent::MySQL::_empty_cb;
_push_task($dbh, [TXN_BEGIN, sub {
my $next_act = shift;
AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'begin');
AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
eval {
if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
$dbh->{_}[TXN_STATEi] = EMPTY_TXN;
$cb->(1);
}
else {
if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
_report_error($dbh, 'begin_work', $_[1], $_[3]);
}
else {
_report_error($dbh, 'begin_work', 2000, "Unexpected result: $_[0]");
}
$cb->();
}
};
$next_act->();
});
}, $cb, 0]);
}
=head2 $dbh->commit([$cb->($rv)])
=cut
sub commit {
my $dbh = shift;
my $cb = shift || \&AnyEvent::MySQL::_empty_cb;
_push_task($dbh, [TXN_COMMIT, sub {
my $next_act = shift;
AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'commit');
AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
eval {
if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
$dbh->{_}[TXN_STATEi] = NO_TXN;
$cb->(1);
}
else {
if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
_report_error($dbh, 'commit', $_[1], $_[3]);
}
else {
_report_error($dbh, 'commit', 2000, "Unexpected result: $_[0]");
}
$cb->();
}
lib/AnyEvent/MySQL.pm view on Meta::CPAN
_rollback($dbh, $next_act, sub {
$dbh->{_}[TXN_STATEi] = NO_TXN if( $_[0] );
&$cb;
});
}, $cb, 0]);
}
sub _rollback {
my($dbh, $next_act, $cb) = @_;
AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'rollback');
AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
eval {
if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
$cb->(1) if $cb;
}
else {
if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
_report_error($dbh, 'rollback', $_[1], $_[3]);
}
else {
_report_error($dbh, 'rollback', 2000, "Unexpected result: $_[0]");
}
$cb->() if $cb;
}
};
$next_act->();
});
}
=head2 $dbh->ping(sub {my $alive = shift;});
=cut
sub ping {
my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
my ($dbh) = @_;
_push_task($dbh, [TXN_TASK, sub {
my $next_act = shift;
AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_PING);
AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
eval {
if ($_[0]==AnyEvent::MySQL::Imp::RES_OK) {
$cb->(1);
}
else {
$cb->(0);
}
};
$next_act->();
});
}, $cb, 0]);
}
package AnyEvent::MySQL::st;
use strict;
use warnings;
use Scalar::Util qw(weaken);
use constant {
DBHi => 0,
IDi => 1,
PARAMi => 2,
FIELDi => 3,
STATEMENTi => 4,
};
=head2 $sth = AnyEvent::MySQL::st->new($dbh, $statement, [$cb->($sth)])
=cut
sub new {
my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
my($class, $dbh, $statement) = @_;
my $sth = bless [], $class;
$sth->[DBHi] = $dbh;
$sth->[STATEMENTi] = $statement;
return $sth;
}
=head2 $sth->execute(@bind_values, [\%attr,] [$cb->($fth/$rv)])
=cut
sub execute {
my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
my $attr = ref($_[-1]) eq 'HASH' ? pop : {};
my($sth, @bind_values) = @_;
my $dbh = $sth->[DBHi];
AnyEvent::MySQL::db::_push_task($dbh, [AnyEvent::MySQL::db::TXN_TASK, sub {
my $next_act = shift;
my $execute = sub {
AnyEvent::MySQL::Imp::do_execute_param($dbh->{_}[AnyEvent::MySQL::db::HDi], $sth->[IDi], \@bind_values, $sth->[PARAMi]);
AnyEvent::MySQL::Imp::recv_response($dbh->{_}[AnyEvent::MySQL::db::HDi], execute => 1, sub {
eval {
if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
$cb->($_[1]);
}
elsif( $_[0]==AnyEvent::MySQL::Imp::RES_RESULT ) {
$cb->(AnyEvent::MySQL::ft->new($sth->[FIELDi], $_[2]));
}
elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
AnyEvent::MySQL::db::_report_error($dbh, 'execute', $_[1], $_[3]);
$cb->();
}
else {
AnyEvent::MySQL::db::_report_error($dbh, 'execute', 2000, "Unknown response: $_[0]");
$cb->();
}
};
$next_act->();
});
};
if( $sth->[IDi] ) {
$execute->();
( run in 0.511 second using v1.01-cache-2.11-cpan-39bf76dae61 )