AnyEvent-MySQL
view release on metacpan or search on metacpan
lib/AnyEvent/MySQL.pm view on Meta::CPAN
my $database;
if( index($param, '=')>=0 ) {
$param = {
map { split /=/, $_, 2 } split /;/, $param
};
if( $param->{host} =~ /(.*):(.*)/ ) {
$param->{host} = $1;
$param->{port} = $2;
}
}
else {
$param = { database => $param };
}
$param->{port} ||= 3306;
if( $param->{host} eq '' || $param->{host} eq 'localhost' ) { # unix socket
my $sock = $param->{mysql_socket} || `mysql_config --socket`;
if( !$sock ) {
_report_error($dbh, 'connect', 2002, "Can't connect to local MySQL server through socket ''");
$cb->();
return;
}
$param->{host} = '/unix';
$param->{port} = $sock;
}
warn "Connecting to $param->{host}:$param->{port} ...";
weaken( my $wdbh = $dbh );
$dbh->{_}[CONNi] = tcp_connect($param->{host}, $param->{port}, sub {
my $fh = shift;
if( !$fh ) {
warn "Connect to $param->{host}:$param->{port} fail: $! retry later.";
undef $wdbh->{_}[CONNi];
_reconnect($wdbh);
return;
}
warn "Connected ($param->{host}:$param->{port})";
$wdbh->{_}[HDi] = AnyEvent::Handle->new(
fh => $fh,
on_error => sub {
return if !$wdbh;
my $wwdbh = $wdbh;
if( $_[1] ) {
warn "Disconnected from $param->{host}:$param->{port} by $_[2] reconnect later.";
undef $wwdbh->{_}[HDi];
undef $wwdbh->{_}[CONNi];
$wwdbh->{_}[CONN_STATEi] = IDLE_CONN;
_report_error($wwdbh, '', 2013, 'Lost connection to MySQL server during query');
if( $wwdbh->{_}[FALLBACKi] ) {
$wwdbh->{_}[FALLBACKi]();
}
}
},
);
AnyEvent::MySQL::Imp::do_auth($wdbh->{_}[HDi], $wdbh->{Username}, $wdbh->{_}[AUTHi], $param->{database}, sub {
my($success, $err_num_and_msg, $thread_id) = @_;
return if !$wdbh;
if( $success ) {
$wdbh->{mysql_thread_id} = $thread_id;
$cb->($wdbh, guard {
_process_task($wdbh) if $wdbh;
});
}
else {
warn "MySQL auth error: $err_num_and_msg retry later.";
undef $wdbh->{_}[HDi];
undef $wdbh->{_}[CONNi];
_reconnect($wdbh) if $wdbh;
}
});
});
}
sub _process_task {
my $dbh = shift;
$dbh->{_}[CONN_STATEi] = IDLE_CONN;
$dbh->{_}[ERRi] = $AnyEvent::MySQL::err = undef;
$dbh->{_}[ERRSTRi] = $AnyEvent::MySQL::errstr = undef;
$dbh->{_}[FALLBACKi] = undef;
weaken( my $wdbh = $dbh );
if( !$dbh->{_}[HDi] ) {
_reconnect($dbh);
return;
}
my $task = shift @{$dbh->{_}[TASKi]};
return if( !$task );
my $next = sub {
_process_task($wdbh) if $wdbh;
};
$dbh->{_}[FALLBACKi] = sub {
undef $dbh->{_}[FALLBACKi];
if( $dbh->{_}[TXN_STATEi]==NO_TXN && $task->[3]<5 ) {
++$task->[3];
warn "redo the task later.. ($task->[3])";
unshift @{$dbh->{_}[TASKi]}, $task;
}
else {
$task->[2]();
}
_reconnect($dbh);
};
if( $task->[0]==TXN_TASK ) {
if( $dbh->{_}[TXN_STATEi]==DEAD_TXN ) {
_report_error($dbh, 'process_task', 1402, 'Transaction branch dead');
$task->[2]();
_process_task($dbh);
}
else {
$dbh->{_}[TXN_STATEi] = DIRTY_TXN if( $dbh->{_}[TXN_STATEi]!=NO_TXN );
$dbh->{_}[CONN_STATEi] = BUSY_CONN;
$task->[1](sub {
if( $dbh->{_}[TXN_STATEi]==DEAD_TXN && $dbh->{_}[HDi] ) {
_rollback($dbh, $next);
}
else {
( run in 0.690 second using v1.01-cache-2.11-cpan-f56aa216473 )