AnyEvent-MySQL

 view release on metacpan or  search on metacpan

lib/AnyEvent/MySQL/Imp.pm  view on Meta::CPAN

            my $len = unpack("V", $_[1]);
            my $num = $len >> 24;
            $len &= 0xFFFFFF;
            print "pack_len=$len, pack_num=$num\n" if DEV;
            $_[0]->unshift_read( chunk => $len, sub {
                $cb->($_[1]);
            } );
        } );
    }
}

=head2 skip_until_eof($hd, $cb->())
=cut
sub skip_until_eof {
    my($hd, $cb) = @_;
    recv_packet($hd, sub {
        if( substr($_[0], 0, 1) eq "\xFE" ) {
            $cb->();
        }
        else {
            skip_until_eof($hd, $cb);
        }
    });
}

=head2 send_packet($hd, $packet_num, $packet_frag1, $pack_frag2, ...)
=cut
sub send_packet {
    return if !$_[0];
    local $_[0] = $_[0];
    my $len = reduce { $a + length($b) } 0, @_[2..$#_];
    $_[0]->push_write(substr(pack('V', $len), 0, 3) . chr($_[1]) . join('', @_[2..$#_]));
}

=head2 _recv_field($hd, \@field)
=cut
sub _recv_field {
    warn "get field." if DEV;
    my $field = $_[1];
    recv_packet($_[0], sub {
        warn "got field!" if DEV;
        push @$field, [
            take_lcs($_[0]), take_lcs($_[0]), take_lcs($_[0]),
            take_lcs($_[0]), take_lcs($_[0]), take_lcs($_[0]),
            take_filler($_[0], 1),
            take_num($_[0], 2),
            take_num($_[0], 4),
            take_num($_[0], 1),
            take_num($_[0], 2),
            take_num($_[0], 1),
            take_filler($_[0], 2),
            take_lcb($_[0]),
        ];
    });
}

=head2 recv_response($hd, %opt, $cb->(TYPE, data...))
  RES_OK, $affected_rows, $insert_id, $server_status, $warning_count, $message
  RES_ERROR, $errno, $sqlstate, $message
  RES_RESULT, \@field, \@row
   $field[$i] = [$catalog, $db, $table, $org_table, $name, $org_name, $charsetnr, $length, $type, $flags, $decimals, $default]
   $row[$i] = [$field, $field, $field, ...]
  RES_PREPARE, $stmt_id, \@param, \@column, $warning_count
   $param[$i] = [$catalog, $db, $table, $org_table, $name, $org_name, $charsetnr, $length, $type, $flags, $decimals, $default]
   $column[$i] = [$catalog, $db, $table, $org_table, $name, $org_name, $charsetnr, $length, $type, $flags, $decimals, $default]
 opt:
  prepare (set to truthy to recv prepare_ok)
=cut
sub recv_response {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : sub {};
    my($hd, %opt) = @_;

    if( DEV ) {
        my $cb0 = $cb;
        $cb = sub {
            use Data::Dumper;
            warn "recv_response: ".Dumper(\@_);
            &$cb0;
        };
    }

    recv_packet($hd, sub {
        my $head = substr($_[0], 0, 1);
        if( $head eq "\x00" ) { # OK
            substr($_[0], 0, 1, '');
            if( $opt{prepare} ) {
                my $stmt_id = take_num($_[0], 4);
                my $column_count = take_num($_[0], 2);
                my $param_count = take_num($_[0], 2);
                take_filler($_[0], 1);
                my $warning_count = take_num($_[0], 2);
                warn "stmt_id=$stmt_id, column_count=$column_count, param_count=$param_count, warning_count=$warning_count" if DEV;

                my(@param, @column);

                my $end_cv = AE::cv {
                    $cb->(RES_PREPARE, $stmt_id, \@param, \@column, $warning_count);
                };

                $end_cv->begin;

                if( $param_count ) {
                    $end_cv->begin;
                    for(my $i=0; $i<$param_count; ++$i) {
                        _recv_field($hd, \@param);
                    }
                    recv_packet($hd, sub { $end_cv->end }); # EOF
                }

                if( $column_count ) {
                    $end_cv->begin;
                    for(my $i=0; $i<$column_count; ++$i) {
                        _recv_field($hd, \@column);
                    }
                    recv_packet($hd, sub { $end_cv->end }); # EOF
                }

                $end_cv->end;
            }
            else {
                $cb->(
                    RES_OK,
                    take_lcb($_[0]),
                    take_lcb($_[0]),
                    take_num($_[0], 2),

lib/AnyEvent/MySQL/Imp.pm  view on Meta::CPAN

            $hex =~ s/(.)/sprintf"%02X ",ord$1/ges;
            my $ascii = $_[0];
            $ascii =~ s/([^\x20-\x7E])/./g;
            warn $hex, $ascii;
        }
        my $proto_ver = take_num($_[0], 1); warn "proto_ver:$proto_ver" if DEV;
        my $server_ver = take_zstr($_[0]); warn "server_ver:$server_ver" if DEV;
        my $thread_id = take_num($_[0], 4); warn "thread_id:$thread_id" if DEV;
        my $scramble_buff = take_str($_[0], 8).substr($_[0], 19, 12); warn "scramble_buff:$scramble_buff" if DEV;
        my $filler = take_num($_[0], 1); warn "filler:$filler" if DEV;
        my $server_cap = take_num($_[0], 2);
        my $server_lang = take_num($_[0], 1); warn "server_lang:$server_lang" if DEV;
        my $server_status = take_num($_[0], 2); warn "server_status:$server_status" if DEV;
        $server_cap += take_num($_[0], 2) << 16;
        if( DEV ) {
            warn "server_cap:";
            warn "  CLIENT_LONG_PASSWORD" if( $server_cap & CLIENT_LONG_PASSWORD );
            warn "  CLIENT_FOUND_ROWS" if( $server_cap & CLIENT_FOUND_ROWS );
            warn "  CLIENT_LONG_FLAG" if( $server_cap & CLIENT_LONG_FLAG );
            warn "  CLIENT_CONNECT_WITH_DB" if( $server_cap & CLIENT_CONNECT_WITH_DB );
            warn "  CLIENT_NO_SCHEMA" if( $server_cap & CLIENT_NO_SCHEMA );
            warn "  CLIENT_COMPRESS" if( $server_cap & CLIENT_COMPRESS );
            warn "  CLIENT_ODBC" if( $server_cap & CLIENT_ODBC );
            warn "  CLIENT_LOCAL_FILES" if( $server_cap & CLIENT_LOCAL_FILES );
            warn "  CLIENT_IGNORE_SPACE" if( $server_cap & CLIENT_IGNORE_SPACE );
            warn "  CLIENT_PROTOCOL_41" if( $server_cap & CLIENT_PROTOCOL_41 );
            warn "  CLIENT_INTERACTIVE" if( $server_cap & CLIENT_INTERACTIVE );
            warn "  CLIENT_SSL" if( $server_cap & CLIENT_SSL );
            warn "  CLIENT_IGNORE_SIGPIPE" if( $server_cap & CLIENT_IGNORE_SIGPIPE );
            warn "  CLIENT_TRANSACTIONS" if( $server_cap & CLIENT_TRANSACTIONS );
            warn "  CLIENT_RESERVED" if( $server_cap & CLIENT_RESERVED );
            warn "  CLIENT_SECURE_CONNECTION" if( $server_cap & CLIENT_SECURE_CONNECTION );
            warn "  CLIENT_MULTI_STATEMENTS" if( $server_cap & CLIENT_MULTI_STATEMENTS );
            warn "  CLIENT_MULTI_RESULTS" if( $server_cap & CLIENT_MULTI_RESULTS );
        }
        my $scramble_len = take_num($_[0], 1); warn "scramble_len:$scramble_len" if DEV;

        my $packet = '';
        put_num($packet, $server_cap & (
            CLIENT_LONG_PASSWORD     | # new more secure passwords
            CLIENT_FOUND_ROWS        | # Found instead of affected rows
            CLIENT_LONG_FLAG         | # Get all column flags
            CLIENT_CONNECT_WITH_DB   | # One can specify db on connect
            # CLIENT_NO_SCHEMA         | # Don't allow database.table.column
            # CLIENT_COMPRESS          | # Can use compression protocol
            # CLIENT_ODBC              | # Odbc client
            # CLIENT_LOCAL_FILES       | # Can use LOAD DATA LOCAL
            # CLIENT_IGNORE_SPACE      | # Ignore spaces before '('
            CLIENT_PROTOCOL_41       | # New 4.1 protocol
            # CLIENT_INTERACTIVE       | # This is an interactive client
            # CLIENT_SSL               | # Switch to SSL after handshake
            # CLIENT_IGNORE_SIGPIPE    | # IGNORE sigpipes
            CLIENT_TRANSACTIONS      | # Client knows about transactions
            # CLIENT_RESERVED          | # Old flag for 4.1 protocol 
            CLIENT_SECURE_CONNECTION | # New 4.1 authentication
            CLIENT_MULTI_STATEMENTS  | # Enable/disable multi-stmt support
            CLIENT_MULTI_RESULTS     | # Enable/disable multi-results
            0
        ), 4); # client_flags
        put_num($packet, 0x1000000, 4); # max_packet_size
        put_num($packet, $server_lang, 1); # charset_number
        $packet .= "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"; # filler
        put_zstr($packet, $username); # username
        if( $password eq '' ) {
            put_lcs($packet, '');
        }
        else {
            my $stage1_hash = sha1($password);
            put_lcs($packet, sha1($scramble_buff.sha1($stage1_hash)) ^ $stage1_hash); # scramble buff
        }
        put_zstr($packet, $database); # database name

        send_packet($hd, 1, $packet);
        recv_packet($hd, sub {
            if( parse_ok($_[0]) ) {
                $cb->(1, undef, $thread_id);
            }
            else {
                my($errno, $sqlstate, $message) = parse_error($_[0]);
                warn "$errno [$sqlstate] $message" if DEV;
                $cb->(0, dualvar($errno, $message));
            }
        });
    });
}

=head2 do_reset_stmt($hd, $stmt_id)
=cut
sub do_reset_stmt {
    my $packet = '';
    put_num($packet, $_[1], 4);
    send_packet($_[0], 0, COM_STMT_RESET, $packet);
}

=head2 do_long_data_packet($hd, $stmt_id, $param_num, $type, $data, $len, $flag, $packet_num)
=cut
sub do_long_data_packet {
    my $packet = '';
    put_num($packet, $_[1], 4);
    put_num($packet, $_[2], 2);
    put_num($packet, $_[3], 2);
    put_type($packet, $_[4], $_[3], $_[5], $_[6]);
    send_packet($_[0], $_[7], COM_STMT_SEND_LONG_DATA, $packet);
}

=head2 do_execute($hd, $stmt_id, $null_bit_map, $packet_num)
=cut
sub do_execute {
    my $packet = '';
    put_num($packet, $_[1], 4);
    $packet .= "\0\1\0\0\0";
    $packet .= $_[2];
    $packet .= "\0";
    send_packet($_[0], $_[3], COM_STMT_EXECUTE, $packet);
}

=head2 do_execute_param($hd, $stmt_id, \@param, \@param_config)
=cut
sub do_execute_param {
    my $null_bit_map = pack('b*', join '', map { defined($_) ? '0' : '1' } @{$_[2]});
    my $packet = '';



( run in 1.648 second using v1.01-cache-2.11-cpan-ceb78f64989 )