AnyEvent-MySQL
view release on metacpan or search on metacpan
lib/AnyEvent/MySQL/Imp.pm view on Meta::CPAN
my $len = unpack("V", $_[1]);
my $num = $len >> 24;
$len &= 0xFFFFFF;
print "pack_len=$len, pack_num=$num\n" if DEV;
$_[0]->unshift_read( chunk => $len, sub {
$cb->($_[1]);
} );
} );
}
}
=head2 skip_until_eof($hd, $cb->())
=cut
sub skip_until_eof {
my($hd, $cb) = @_;
recv_packet($hd, sub {
if( substr($_[0], 0, 1) eq "\xFE" ) {
$cb->();
}
else {
skip_until_eof($hd, $cb);
}
});
}
=head2 send_packet($hd, $packet_num, $packet_frag1, $pack_frag2, ...)
=cut
sub send_packet {
return if !$_[0];
local $_[0] = $_[0];
my $len = reduce { $a + length($b) } 0, @_[2..$#_];
$_[0]->push_write(substr(pack('V', $len), 0, 3) . chr($_[1]) . join('', @_[2..$#_]));
}
=head2 _recv_field($hd, \@field)
=cut
sub _recv_field {
warn "get field." if DEV;
my $field = $_[1];
recv_packet($_[0], sub {
warn "got field!" if DEV;
push @$field, [
take_lcs($_[0]), take_lcs($_[0]), take_lcs($_[0]),
take_lcs($_[0]), take_lcs($_[0]), take_lcs($_[0]),
take_filler($_[0], 1),
take_num($_[0], 2),
take_num($_[0], 4),
take_num($_[0], 1),
take_num($_[0], 2),
take_num($_[0], 1),
take_filler($_[0], 2),
take_lcb($_[0]),
];
});
}
=head2 recv_response($hd, %opt, $cb->(TYPE, data...))
RES_OK, $affected_rows, $insert_id, $server_status, $warning_count, $message
RES_ERROR, $errno, $sqlstate, $message
RES_RESULT, \@field, \@row
$field[$i] = [$catalog, $db, $table, $org_table, $name, $org_name, $charsetnr, $length, $type, $flags, $decimals, $default]
$row[$i] = [$field, $field, $field, ...]
RES_PREPARE, $stmt_id, \@param, \@column, $warning_count
$param[$i] = [$catalog, $db, $table, $org_table, $name, $org_name, $charsetnr, $length, $type, $flags, $decimals, $default]
$column[$i] = [$catalog, $db, $table, $org_table, $name, $org_name, $charsetnr, $length, $type, $flags, $decimals, $default]
opt:
prepare (set to truthy to recv prepare_ok)
=cut
sub recv_response {
my $cb = ref($_[-1]) eq 'CODE' ? pop : sub {};
my($hd, %opt) = @_;
if( DEV ) {
my $cb0 = $cb;
$cb = sub {
use Data::Dumper;
warn "recv_response: ".Dumper(\@_);
&$cb0;
};
}
recv_packet($hd, sub {
my $head = substr($_[0], 0, 1);
if( $head eq "\x00" ) { # OK
substr($_[0], 0, 1, '');
if( $opt{prepare} ) {
my $stmt_id = take_num($_[0], 4);
my $column_count = take_num($_[0], 2);
my $param_count = take_num($_[0], 2);
take_filler($_[0], 1);
my $warning_count = take_num($_[0], 2);
warn "stmt_id=$stmt_id, column_count=$column_count, param_count=$param_count, warning_count=$warning_count" if DEV;
my(@param, @column);
my $end_cv = AE::cv {
$cb->(RES_PREPARE, $stmt_id, \@param, \@column, $warning_count);
};
$end_cv->begin;
if( $param_count ) {
$end_cv->begin;
for(my $i=0; $i<$param_count; ++$i) {
_recv_field($hd, \@param);
}
recv_packet($hd, sub { $end_cv->end }); # EOF
}
if( $column_count ) {
$end_cv->begin;
for(my $i=0; $i<$column_count; ++$i) {
_recv_field($hd, \@column);
}
recv_packet($hd, sub { $end_cv->end }); # EOF
}
$end_cv->end;
}
else {
$cb->(
RES_OK,
take_lcb($_[0]),
take_lcb($_[0]),
take_num($_[0], 2),
lib/AnyEvent/MySQL/Imp.pm view on Meta::CPAN
$hex =~ s/(.)/sprintf"%02X ",ord$1/ges;
my $ascii = $_[0];
$ascii =~ s/([^\x20-\x7E])/./g;
warn $hex, $ascii;
}
my $proto_ver = take_num($_[0], 1); warn "proto_ver:$proto_ver" if DEV;
my $server_ver = take_zstr($_[0]); warn "server_ver:$server_ver" if DEV;
my $thread_id = take_num($_[0], 4); warn "thread_id:$thread_id" if DEV;
my $scramble_buff = take_str($_[0], 8).substr($_[0], 19, 12); warn "scramble_buff:$scramble_buff" if DEV;
my $filler = take_num($_[0], 1); warn "filler:$filler" if DEV;
my $server_cap = take_num($_[0], 2);
my $server_lang = take_num($_[0], 1); warn "server_lang:$server_lang" if DEV;
my $server_status = take_num($_[0], 2); warn "server_status:$server_status" if DEV;
$server_cap += take_num($_[0], 2) << 16;
if( DEV ) {
warn "server_cap:";
warn " CLIENT_LONG_PASSWORD" if( $server_cap & CLIENT_LONG_PASSWORD );
warn " CLIENT_FOUND_ROWS" if( $server_cap & CLIENT_FOUND_ROWS );
warn " CLIENT_LONG_FLAG" if( $server_cap & CLIENT_LONG_FLAG );
warn " CLIENT_CONNECT_WITH_DB" if( $server_cap & CLIENT_CONNECT_WITH_DB );
warn " CLIENT_NO_SCHEMA" if( $server_cap & CLIENT_NO_SCHEMA );
warn " CLIENT_COMPRESS" if( $server_cap & CLIENT_COMPRESS );
warn " CLIENT_ODBC" if( $server_cap & CLIENT_ODBC );
warn " CLIENT_LOCAL_FILES" if( $server_cap & CLIENT_LOCAL_FILES );
warn " CLIENT_IGNORE_SPACE" if( $server_cap & CLIENT_IGNORE_SPACE );
warn " CLIENT_PROTOCOL_41" if( $server_cap & CLIENT_PROTOCOL_41 );
warn " CLIENT_INTERACTIVE" if( $server_cap & CLIENT_INTERACTIVE );
warn " CLIENT_SSL" if( $server_cap & CLIENT_SSL );
warn " CLIENT_IGNORE_SIGPIPE" if( $server_cap & CLIENT_IGNORE_SIGPIPE );
warn " CLIENT_TRANSACTIONS" if( $server_cap & CLIENT_TRANSACTIONS );
warn " CLIENT_RESERVED" if( $server_cap & CLIENT_RESERVED );
warn " CLIENT_SECURE_CONNECTION" if( $server_cap & CLIENT_SECURE_CONNECTION );
warn " CLIENT_MULTI_STATEMENTS" if( $server_cap & CLIENT_MULTI_STATEMENTS );
warn " CLIENT_MULTI_RESULTS" if( $server_cap & CLIENT_MULTI_RESULTS );
}
my $scramble_len = take_num($_[0], 1); warn "scramble_len:$scramble_len" if DEV;
my $packet = '';
put_num($packet, $server_cap & (
CLIENT_LONG_PASSWORD | # new more secure passwords
CLIENT_FOUND_ROWS | # Found instead of affected rows
CLIENT_LONG_FLAG | # Get all column flags
CLIENT_CONNECT_WITH_DB | # One can specify db on connect
# CLIENT_NO_SCHEMA | # Don't allow database.table.column
# CLIENT_COMPRESS | # Can use compression protocol
# CLIENT_ODBC | # Odbc client
# CLIENT_LOCAL_FILES | # Can use LOAD DATA LOCAL
# CLIENT_IGNORE_SPACE | # Ignore spaces before '('
CLIENT_PROTOCOL_41 | # New 4.1 protocol
# CLIENT_INTERACTIVE | # This is an interactive client
# CLIENT_SSL | # Switch to SSL after handshake
# CLIENT_IGNORE_SIGPIPE | # IGNORE sigpipes
CLIENT_TRANSACTIONS | # Client knows about transactions
# CLIENT_RESERVED | # Old flag for 4.1 protocol
CLIENT_SECURE_CONNECTION | # New 4.1 authentication
CLIENT_MULTI_STATEMENTS | # Enable/disable multi-stmt support
CLIENT_MULTI_RESULTS | # Enable/disable multi-results
0
), 4); # client_flags
put_num($packet, 0x1000000, 4); # max_packet_size
put_num($packet, $server_lang, 1); # charset_number
$packet .= "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"; # filler
put_zstr($packet, $username); # username
if( $password eq '' ) {
put_lcs($packet, '');
}
else {
my $stage1_hash = sha1($password);
put_lcs($packet, sha1($scramble_buff.sha1($stage1_hash)) ^ $stage1_hash); # scramble buff
}
put_zstr($packet, $database); # database name
send_packet($hd, 1, $packet);
recv_packet($hd, sub {
if( parse_ok($_[0]) ) {
$cb->(1, undef, $thread_id);
}
else {
my($errno, $sqlstate, $message) = parse_error($_[0]);
warn "$errno [$sqlstate] $message" if DEV;
$cb->(0, dualvar($errno, $message));
}
});
});
}
=head2 do_reset_stmt($hd, $stmt_id)
=cut
sub do_reset_stmt {
my $packet = '';
put_num($packet, $_[1], 4);
send_packet($_[0], 0, COM_STMT_RESET, $packet);
}
=head2 do_long_data_packet($hd, $stmt_id, $param_num, $type, $data, $len, $flag, $packet_num)
=cut
sub do_long_data_packet {
my $packet = '';
put_num($packet, $_[1], 4);
put_num($packet, $_[2], 2);
put_num($packet, $_[3], 2);
put_type($packet, $_[4], $_[3], $_[5], $_[6]);
send_packet($_[0], $_[7], COM_STMT_SEND_LONG_DATA, $packet);
}
=head2 do_execute($hd, $stmt_id, $null_bit_map, $packet_num)
=cut
sub do_execute {
my $packet = '';
put_num($packet, $_[1], 4);
$packet .= "\0\1\0\0\0";
$packet .= $_[2];
$packet .= "\0";
send_packet($_[0], $_[3], COM_STMT_EXECUTE, $packet);
}
=head2 do_execute_param($hd, $stmt_id, \@param, \@param_config)
=cut
sub do_execute_param {
my $null_bit_map = pack('b*', join '', map { defined($_) ? '0' : '1' } @{$_[2]});
my $packet = '';
( run in 1.648 second using v1.01-cache-2.11-cpan-ceb78f64989 )