AnyEvent-MySQL

 view release on metacpan or  search on metacpan

lib/AnyEvent/MySQL.pm  view on Meta::CPAN

    my $database;
    if( index($param, '=')>=0 ) {
        $param = {
            map { split /=/, $_, 2 } split /;/, $param
        };
        if( $param->{host} =~ /(.*):(.*)/ ) {
            $param->{host} = $1;
            $param->{port} = $2;
        }
    }
    else {
        $param = { database => $param };
    }

    $param->{port} ||= 3306;

    if( $param->{host} eq '' || $param->{host} eq 'localhost' ) { # unix socket
        my $sock = $param->{mysql_socket} || `mysql_config --socket`;
        if( !$sock ) {
            _report_error($dbh, 'connect', 2002, "Can't connect to local MySQL server through socket ''");
            $cb->();
            return;
        }
        $param->{host} = '/unix';
        $param->{port} = $sock;
    }

    warn "Connecting to $param->{host}:$param->{port} ...";
    weaken( my $wdbh = $dbh );
    $dbh->{_}[CONNi] = tcp_connect($param->{host}, $param->{port}, sub {
        my $fh = shift;
        if( !$fh ) {
            warn "Connect to $param->{host}:$param->{port} fail: $!  retry later.";
            undef $wdbh->{_}[CONNi];

            _reconnect($wdbh);
            return;
        }
        warn "Connected ($param->{host}:$param->{port})";

        $wdbh->{_}[HDi] = AnyEvent::Handle->new(
            fh => $fh,
            on_error => sub {
                return if !$wdbh;

                my $wwdbh = $wdbh;
                if( $_[1] ) {
                    warn "Disconnected from $param->{host}:$param->{port} by $_[2]  reconnect later.";
                    undef $wwdbh->{_}[HDi];
                    undef $wwdbh->{_}[CONNi];
                    $wwdbh->{_}[CONN_STATEi] = IDLE_CONN;
                    _report_error($wwdbh, '', 2013, 'Lost connection to MySQL server during query');
                    if( $wwdbh->{_}[FALLBACKi] ) {
                        $wwdbh->{_}[FALLBACKi]();
                    }
                }
            },
        );

        AnyEvent::MySQL::Imp::do_auth($wdbh->{_}[HDi], $wdbh->{Username}, $wdbh->{_}[AUTHi], $param->{database}, sub {
            my($success, $err_num_and_msg, $thread_id) = @_;
            return if !$wdbh;
            if( $success ) {
                $wdbh->{mysql_thread_id} = $thread_id;
                $cb->($wdbh, guard {
                    _process_task($wdbh) if $wdbh;
                });
            }
            else {
                warn "MySQL auth error: $err_num_and_msg  retry later.";
                undef $wdbh->{_}[HDi];
                undef $wdbh->{_}[CONNi];
                _reconnect($wdbh) if $wdbh;
            }
        });
    });
}

sub _process_task {
    my $dbh = shift;
    $dbh->{_}[CONN_STATEi] = IDLE_CONN;
    $dbh->{_}[ERRi] = $AnyEvent::MySQL::err = undef;
    $dbh->{_}[ERRSTRi] = $AnyEvent::MySQL::errstr = undef;
    $dbh->{_}[FALLBACKi] = undef;
    weaken( my $wdbh = $dbh );

    if( !$dbh->{_}[HDi] ) {
        _reconnect($dbh);
        return;
    }

    my $task = shift @{$dbh->{_}[TASKi]};
    return if( !$task );

    my $next = sub {
        _process_task($wdbh) if $wdbh;
    };

    $dbh->{_}[FALLBACKi] = sub {
        undef $dbh->{_}[FALLBACKi];
        if( $dbh->{_}[TXN_STATEi]==NO_TXN && $task->[3]<5 ) {
            ++$task->[3];
            warn "redo the task later.. ($task->[3])";
            unshift @{$dbh->{_}[TASKi]}, $task;
        }
        else {
            $task->[2]();
        }
        _reconnect($dbh);
    };
    if( $task->[0]==TXN_TASK ) {
        if( $dbh->{_}[TXN_STATEi]==DEAD_TXN ) {
            _report_error($dbh, 'process_task', 1402, 'Transaction branch dead');
            $task->[2]();
            _process_task($dbh);
        }
        else {
            $dbh->{_}[TXN_STATEi] = DIRTY_TXN if( $dbh->{_}[TXN_STATEi]!=NO_TXN );
            $dbh->{_}[CONN_STATEi] = BUSY_CONN;
            $task->[1](sub {
                if( $dbh->{_}[TXN_STATEi]==DEAD_TXN && $dbh->{_}[HDi] ) {
                    _rollback($dbh, $next);
                }
                else {



( run in 0.690 second using v1.01-cache-2.11-cpan-f56aa216473 )