AnyEvent-MySQL

 view release on metacpan or  search on metacpan

lib/AnyEvent/MySQL.pm  view on Meta::CPAN


# transaction state
use constant {
    NO_TXN => 1,
    EMPTY_TXN => 2,
    CLEAN_TXN => 3,
    DIRTY_TXN => 4,
    DEAD_TXN => 5,
};

# transaction control token
use constant {
    TXN_TASK => 1,
    TXN_BEGIN => 2,
    TXN_COMMIT => 3,
    TXN_ROLLBACK => 4,
};

use constant {
    AUTHi => 0,
    ATTRi => 1,
    HDi => 2,
    CONNi => 9,
    ON_CONNi => 11,

    CONN_STATEi => 3,
    TXN_STATEi => 4,

    TASKi => 5,
    STi => 6,
    FALLBACKi => 10,

    ERRi => 7,
    ERRSTRi => 8,
};

sub _push_task {
    my($dbh, $task) = @_;
    push @{$dbh->{_}[TASKi]}, $task;
    _process_task($dbh) if( $dbh->{_}[CONN_STATEi]==IDLE_CONN );
}

sub _unshift_task {
    my($dbh, $task) = @_;
    unshift @{$dbh->{_}[TASKi]}, $task;
}

sub _report_error {
    my($dbh, $method, $error_num, $error_str) = @_;

    $dbh->{_}[ERRi] = $AnyEvent::MySQL::err = $error_num;
    $dbh->{_}[ERRSTRi] = $AnyEvent::MySQL::errstr = $error_str;
    warn "$dbh $method failed: $error_str ($error_num)\n" if( $dbh->{_}[ATTRi]{PrintError} );

    $dbh->{_}[TXN_STATEi] = DEAD_TXN if( $dbh->{_}[TXN_STATEi]!=NO_TXN );
}

sub _reconnect {
    my $dbh = shift;
    $dbh->{_}[CONN_STATEi] = BUSY_CONN;
    my $retry; $retry = AE::timer .1, 0, sub {
        undef $retry;
        _connect($dbh);
    };
}

sub _connect {
    my $dbh = shift;
    my $cb = $dbh->{_}[ON_CONNi] || \&AnyEvent::MySQL::_empty_cb;
    $dbh->{_}[CONN_STATEi] = BUSY_CONN;

    my $param = $dbh->{Name};
    my $database;
    if( index($param, '=')>=0 ) {
        $param = {
            map { split /=/, $_, 2 } split /;/, $param
        };
        if( $param->{host} =~ /(.*):(.*)/ ) {
            $param->{host} = $1;
            $param->{port} = $2;
        }
    }
    else {
        $param = { database => $param };
    }

    $param->{port} ||= 3306;

    if( $param->{host} eq '' || $param->{host} eq 'localhost' ) { # unix socket
        my $sock = $param->{mysql_socket} || `mysql_config --socket`;
        if( !$sock ) {
            _report_error($dbh, 'connect', 2002, "Can't connect to local MySQL server through socket ''");
            $cb->();
            return;
        }
        $param->{host} = '/unix';
        $param->{port} = $sock;
    }

    warn "Connecting to $param->{host}:$param->{port} ...";
    weaken( my $wdbh = $dbh );
    $dbh->{_}[CONNi] = tcp_connect($param->{host}, $param->{port}, sub {
        my $fh = shift;
        if( !$fh ) {
            warn "Connect to $param->{host}:$param->{port} fail: $!  retry later.";
            undef $wdbh->{_}[CONNi];

            _reconnect($wdbh);
            return;
        }
        warn "Connected ($param->{host}:$param->{port})";

        $wdbh->{_}[HDi] = AnyEvent::Handle->new(
            fh => $fh,
            on_error => sub {
                return if !$wdbh;

                my $wwdbh = $wdbh;
                if( $_[1] ) {
                    warn "Disconnected from $param->{host}:$param->{port} by $_[2]  reconnect later.";
                    undef $wwdbh->{_}[HDi];



( run in 3.207 seconds using v1.01-cache-2.11-cpan-02777c243ea )