AnyEvent-MySQL

 view release on metacpan or  search on metacpan

lib/AnyEvent/MySQL.pm  view on Meta::CPAN

    }

    $EV::DIED = sub {
        print "EV::DIED: $@\n";
        print Devel::StackTrace->new->as_string;
    };

    use lib 'lib';
    use AnyEvent::MySQL;

    my $end = AE::cv;

    my $dbh = AnyEvent::MySQL->connect("DBI:mysql:database=test;host=127.0.0.1;port=3306", "ptest", "pass", { PrintError => 1 }, sub {
        my($dbh) = @_;
        if( $dbh ) {
            warn "Connect success!";
            $dbh->pre_do("set names latin1");
            $dbh->pre_do("set names utf8");
        }
        else {
            warn "Connect fail: $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
            $end->send;
        }
    });

    $dbh->do("select * from t1 where a<=?", {}, 15, sub {
        my $rv = shift;
        if( defined($rv) ) {
            warn "Do success: $rv";
        }
        else {
            warn "Do fail: $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
        }
        $end->send;
    });

    #$end->recv;
    my $end2 = AE::cv;

    #$dbh->prepare("update t1 set a=1 where b=1", sub {
    #$dbh->prepare("select * from t1", sub {
    my $sth = $dbh->prepare("select b, a aaa from t1 where a>?", sub {
    #$dbh->prepare("select * from type_all", sub {
        warn "prepared!";
        $end2->send;
    });

    #$end2->recv;

    my $end3 = AE::cv;

    $sth->execute(1, sub {
        warn "executed! $_[0]";
        $end3->send($_[0]);
    });

    my $fth = $end3->recv;

    my $end4 = AE::cv;

    $fth->bind_col(2, \my $a, sub {
        warn $_[0];
    });
    my $fetch; $fetch = sub {
        $fth->fetch(sub {
            if( $_[0] ) {
                warn "Get! $a";
                $fetch->();
            }
            else {
                warn "Get End!";
                undef $fetch;
                $end4->send;
            }
        });
    }; $fetch->();

    #$fth->bind_columns(\my($a, $b), sub {
    #    warn $_[0];
    #    warn $AnyEvent::MySQL::errstr;
    #});
    #my $fetch; $fetch = sub {
    #    $fth->fetch(sub {
    #        if( $_[0] ) {
    #            warn "Get! ($a, $b)";
    #            $fetch->();
    #        }
    #        else {
    #            undef $fetch;
    #            $end4->send;
    #        }
    #    });
    #}; $fetch->();

    #my $fetch; $fetch = sub {
    #    $fth->fetchrow_array(sub {
    #        if( @_ ) {
    #            warn "Get! (@_)";
    #            $fetch->();
    #        }
    #        else {
    #            undef $fetch;
    #            $end4->send;
    #        }
    #    });
    #}; $fetch->();

    #my $fetch; $fetch = sub {
    #    $fth->fetchrow_arrayref(sub {
    #        if( $_[0] ) {
    #            warn "Get! (@{$_[0]})";
    #            $fetch->();
    #        }
    #        else {
    #            undef $fetch;
    #            $end4->send;
    #        }
    #    });
    #}; $fetch->();

    #my $fetch; $fetch = sub {
    #    $fth->fetchrow_hashref(sub {
    #        if( $_[0] ) {
    #            warn "Get! (@{[%{$_[0]}]})";
    #            $fetch->();
    #        }
    #        else {
    #            undef $fetch;
    #            $end4->send;
    #        }
    #    });
    #}; $fetch->();

    $end4->recv;

    #tcp_connect 0, 3306, sub {
    #    my $fh = shift;
    #    my $hd = AnyEvent::Handle->new( fh => $fh );

lib/AnyEvent/MySQL.pm  view on Meta::CPAN

    or tried but failed.

    If failed, the $dbh in the $cb's args will be undef.

    You can do some connection initialization here, such as
     set names utf8;

    But you should NOT rely on this for work flow control,
    cause the reconnection can occur anytime.

=cut
sub new {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my($class, $dsn, $username, $auth, $attr) = @_;

    my $dbh = bless { _ => [] }, $class;
    if( $dsn =~ /^DBI:mysql:(.*)$/ ) {
        $dbh->{Name} = $1;
    }
    else {
        die "invalid dsn format";
    }
    $dbh->{Username} = $username;
    $dbh->{_}[AUTHi] = $auth;
    $dbh->{_}[ATTRi] = +{ Verbose => 1, %{ $attr || {} } };
    $dbh->{_}[CONN_STATEi] = BUSY_CONN;
    $dbh->{_}[TXN_STATEi] = NO_TXN;
    $dbh->{_}[TASKi] = [];
    $dbh->{_}[ON_CONNi] = $cb;

    _connect($dbh);

    return $dbh;
}

=head2 $error_num = $dbh->err

=cut
sub err {
    return $_[0]{_}[ERRi];
}

=head2 $error_str = $dbh->errstr

=cut
sub errstr {
    return $_[0]{_}[ERRSTRi];
}

=head2 $rv = $dbh->last_insert_id

    Non-blocking get the value immediately

=cut
sub last_insert_id {
    $_[0]{mysql_insertid};
}

sub _do {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my($rev_dir, $dbh, $statement, $attr, @bind_values) = @_;

    if( $dbh->{_}[ATTRi]{ReadOnly} && $statement !~ /^\s*(?:show|select|set\s+names)\s+/i ){
        _report_error($dbh, 'do', 1227, 'unable to perform write queries on a ReadOnly handle');
        $cb->();
        return;
    }

    my @args = ($dbh, [TXN_TASK, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                    $dbh->{mysql_insertid} = $_[2];
                    $cb->($_[1]);
                }
                elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                    _report_error($dbh, 'do', $_[1], $_[3]);
                    $cb->();
                }
                else {
                    $cb->(0+@{$_[2]});
                }
            };
            $next_act->();
        });
    }, $cb, 0]);

    if( $rev_dir ) {
        _unshift_task(@args);
    }
    else {
        _push_task(@args);
    }
}

=head2 $dbh->do($statement, [\%attr, [@bind_values,]] [$cb->($rv)])

=cut
sub do {
    unshift @_, 0;
    &_do;
}

=head2 $dbh->pre_do($statement, [\%attr, [@bind_values,]] [$cb->($rv)])

    This method is like $dbh->do except that $dbh->pre_do will unshift
    job into the queue instead of push.

    This method is for the initializing actions in the AnyEvent::MySQL->connect's callback

=cut
sub pre_do {
    unshift @_, 1;
    &_do;
}

=head2 $dbh->selectall_arrayref($statement, [\%attr, [@bind_values,]] $cb->($ary_ref))

=cut
sub selectall_arrayref {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my($dbh, $statement, $attr, @bind_values) = @_;

    _push_task($dbh, [TXN_TASK, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                    $dbh->{mysql_insertid} = $_[2];
                    $cb->([]);
                }
                elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                    _report_error($dbh, 'selectall_arrayref', $_[1], $_[3]);
                    $cb->();
                }
                else {
                    $cb->($_[2]);
                }
            };
            $next_act->();
        });
    }, $cb, 0]);
}


=head2 $dbh->selectall_hashref($statement, [$key_field|\@key_field], [\%attr, [@bind_values,]] $cb->($hash_ref))

=cut

sub selectall_hashref {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my($dbh, $statement, $key_field) = splice @_, 0, 3;

    my @key_field;
    if( ref($key_field) eq 'ARRAY' ) {
        @key_field = @$key_field;
    }
    elsif( ref($key_field) eq 'HASH' ) {
        unshift @_, $key_field;
        @key_field = ();
    }
    elsif( defined($key_field) ) {
        @key_field = ($key_field);
    }
    else {
        @key_field = ();
    }

    my($attr, @bind_values) = @_;

    _push_task($dbh, [TXN_TASK, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                    $dbh->{mysql_insertid} = $_[2];
                    if( @key_field ) {
                        $cb->({});
                    }
                    else {
                        $cb->([]);
                    }
                }
                elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                    _report_error($dbh, 'selectall_hashref', $_[1], $_[3]);
                    $cb->();
                }
                else {
                    my $res;
                    if( @key_field ) {
                        $res = {};
                    }
                    else {
                        $res = [];
                    }
                    for(my $i=$#{$_[2]}; $i>=0; --$i) {
                        my %record;
                        for(my $j=$#{$_[2][$i]}; $j>=0; --$j) {
                            $record{$_[1][$j][4]} = $_[2][$i][$j];
                        }
                        if( @key_field ) {
                            my $h = $res;
                            for(@key_field[0..$#key_field-1]) {
                                $h->{$record{$_}} ||= {};
                                $h = $h->{$record{$_}};
                            }
                            $h->{$record{$key_field[-1]}} = \%record;
                        }
                        else {
                            push @$res, \%record;
                        }
                    }
                    $cb->($res);
                }
            };
            $next_act->();
        });
    }, $cb, 0]);
}

=head2 $dbh->selectcol_arrayref($statement, [\%attr, [@bind_values,]] $cb->($ary_ref))

=cut
sub selectcol_arrayref {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my($dbh, $statement, $attr, @bind_values) = @_;
    $attr ||= {};
    my @columns = map { $_-1 } @{ $attr->{Columns} || [1] };

    _push_task($dbh, [TXN_TASK, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                    $dbh->{mysql_insertid} = $_[2];
                    $cb->([]);
                }
                elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                    _report_error($dbh, 'selectcol_arrayref', $_[1], $_[3]);
                    $cb->();
                }
                else {
                    my @res = map {
                        my $r = $_;
                        map { $r->[$_] } @columns
                    } @{$_[2]};
                    $cb->(\@res);
                }
            };
            $next_act->();
        });
    }, $cb, 0]);
}

=head2 $dbh->selectrow_array($statement, [\%attr, [@bind_values,]], $cb->(@row_ary))

=cut
sub selectrow_array {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my($dbh, $statement, $attr, @bind_values) = @_;

    _push_task($dbh, [TXN_TASK, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                    $dbh->{mysql_insertid} = $_[2];
                    $cb->();
                }
                elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                    _report_error($dbh, 'selectrow_array', $_[1], $_[3]);
                    $cb->();
                }
                else {
                    $cb->($_[2][0] ? @{$_[2][0]} : ());
                }
            };
            $next_act->();
        });
    }, $cb, 0]);
}

=head2 $dbh->selectrow_arrayref($statement, [\%attr, [@bind_values,]], $cb->($ary_ref))

=cut
sub selectrow_arrayref {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my($dbh, $statement, $attr, @bind_values) = @_;

    _push_task($dbh, [TXN_TASK, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                    $dbh->{mysql_insertid} = $_[2];
                    $cb->(undef);
                }
                elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                    _report_error($dbh, 'selectrow_arrayref', $_[1], $_[3]);
                    $cb->(undef);
                }
                else {
                    $cb->($_[2][0]);
                }
            };
            $next_act->();
        });
    }, $cb, 0]);
}

=head2 $dbh->selectrow_hashref($statement, [\%attr, [@bind_values,]], $cb->($hash_ref))

=cut
sub selectrow_hashref {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my($dbh, $statement, $attr, @bind_values) = @_;

    _push_task($dbh, [TXN_TASK, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, _text_prepare($statement, @bind_values));
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                    $dbh->{mysql_insertid} = $_[2];
                    $cb->(undef);
                }
                elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                    _report_error($dbh, 'selectrow_hashref', $_[1], $_[3]);
                    $cb->(undef);
                }
                else {
                    if( $_[2][0] ) {
                        my %record;
                        for(my $j=$#{$_[2][0]}; $j>=0; --$j) {
                            $record{$_[1][$j][4]} = $_[2][0][$j];
                        }
                        $cb->(\%record);
                    }
                    else {
                        $cb->(undef);
                    }
                }
            };
            $next_act->();
        });
    }, $cb, 0]);
}

=head2 $sth = $dbh->prepare($statement, [$cb->($sth)])

    $cb will be called each time when this statement is prepared
    (or re-prepared when the db connection is reconnected)

    if the preparation is not success,
    the $sth in the $cb's arg will be undef.

    So you should NOT rely on this for work flow controlling.

=cut
sub prepare {
    my $dbh = $_[0];

    my $sth = AnyEvent::MySQL::st->new(@_);
    push @{$dbh->{_}[STi]}, $sth;
    weaken($dbh->{_}[STi][-1]);
    return $sth;
}

=head2 $dbh->begin_work([$cb->($rv)])

=cut
sub begin_work {
    my $dbh = shift;
    my $cb = shift || \&AnyEvent::MySQL::_empty_cb;

    _push_task($dbh, [TXN_BEGIN, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_QUERY, 'begin');
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {

lib/AnyEvent/MySQL.pm  view on Meta::CPAN

                $cb->() if $cb;
            }
        };
        $next_act->();
    });
}

=head2 $dbh->ping(sub {my $alive = shift;});

=cut

sub ping {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my ($dbh) = @_;

    _push_task($dbh, [TXN_TASK, sub {
        my $next_act = shift;
        AnyEvent::MySQL::Imp::send_packet($dbh->{_}[HDi], 0, AnyEvent::MySQL::Imp::COM_PING);
        AnyEvent::MySQL::Imp::recv_response($dbh->{_}[HDi], sub {
            eval {
                if ($_[0]==AnyEvent::MySQL::Imp::RES_OK) {
                    $cb->(1);
                }
                else {
                    $cb->(0);
                }
            };
            $next_act->();
        });
    }, $cb, 0]);
}

package AnyEvent::MySQL::st;

use strict;
use warnings;

use Scalar::Util qw(weaken);

use constant {
    DBHi => 0,
    IDi => 1,
    PARAMi => 2,
    FIELDi => 3,
    STATEMENTi => 4,
};

=head2 $sth = AnyEvent::MySQL::st->new($dbh, $statement, [$cb->($sth)])

=cut
sub new {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my($class, $dbh, $statement) = @_;
    my $sth = bless [], $class;
    $sth->[DBHi] = $dbh;
    $sth->[STATEMENTi] = $statement;

    return $sth;
}

=head2 $sth->execute(@bind_values, [\%attr,] [$cb->($fth/$rv)])

=cut
sub execute {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : \&AnyEvent::MySQL::_empty_cb;
    my $attr = ref($_[-1]) eq 'HASH' ? pop : {};
    my($sth, @bind_values) = @_;
    my $dbh = $sth->[DBHi];


    AnyEvent::MySQL::db::_push_task($dbh, [AnyEvent::MySQL::db::TXN_TASK, sub {
        my $next_act = shift;

        my $execute = sub {
            AnyEvent::MySQL::Imp::do_execute_param($dbh->{_}[AnyEvent::MySQL::db::HDi], $sth->[IDi], \@bind_values, $sth->[PARAMi]);
            AnyEvent::MySQL::Imp::recv_response($dbh->{_}[AnyEvent::MySQL::db::HDi], execute => 1, sub {
                eval {
                    if( $_[0]==AnyEvent::MySQL::Imp::RES_OK ) {
                        $cb->($_[1]);
                    }
                    elsif( $_[0]==AnyEvent::MySQL::Imp::RES_RESULT ) {
                        $cb->(AnyEvent::MySQL::ft->new($sth->[FIELDi], $_[2]));
                    }
                    elsif( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                        AnyEvent::MySQL::db::_report_error($dbh, 'execute', $_[1], $_[3]);
                        $cb->();
                    }
                    else {
                        AnyEvent::MySQL::db::_report_error($dbh, 'execute', 2000, "Unknown response: $_[0]");
                        $cb->();
                    }
                };
                $next_act->();
            });
        };

        if( $sth->[IDi] ) {
            $execute->();
        }
        else {
            AnyEvent::MySQL::Imp::send_packet($dbh->{_}[AnyEvent::MySQL::db::HDi], 0, AnyEvent::MySQL::Imp::COM_STMT_PREPARE, $sth->[STATEMENTi]);
            AnyEvent::MySQL::Imp::recv_response($dbh->{_}[AnyEvent::MySQL::db::HDi], prepare => 1, sub {
                if( $_[0]==AnyEvent::MySQL::Imp::RES_PREPARE ) {
                    $sth->[IDi] = $_[1];
                    $sth->[PARAMi] = $_[2];
                    $sth->[FIELDi] = $_[3];

                    $execute->();
                }
                else {
                    if( $_[0]==AnyEvent::MySQL::Imp::RES_ERROR ) {
                        AnyEvent::MySQL::db::_report_error($dbh, 'execute', $_[1], $_[3]);
                        $cb->();
                    }
                    else {
                        AnyEvent::MySQL::db::_report_error($dbh, 'execute', 2000, "Unexpected response: $_[0]");
                        $cb->();
                    }
                }
            });
        }
    }, $cb, 0]);
}

package AnyEvent::MySQL::ft;

use strict;
use warnings;

use constant {
    DATAi => 0,
    BINDi => 1,
    FIELDi => 2,
};

=head2 $fth = AnyEvent::MySQL::ft->new(\@data_set)

=cut
sub new {
    my($class, $field_set, $data_set) = @_;

    my $fth = bless [], $class;
    $fth->[FIELDi] = $field_set;
    $fth->[DATAi] = $data_set;

    return $fth;
}

=head2 $rc = $fth->bind_columns(@list_of_refs_to_vars_to_bind, [$cb->($rc)])

=cut
sub bind_columns {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
    my $fth = shift;
    my @list_of_refs_to_vars_to_bind = @_;

    if( !@{$fth->[DATAi]} ) {
        $cb->(1) if $cb;
        return 1;
    }
    elsif( @{$fth->[DATAi][0]} == @list_of_refs_to_vars_to_bind ) {
        $fth->[BINDi] = \@list_of_refs_to_vars_to_bind;
        $cb->(1) if $cb;
        return 1;
    }
    else {
        $cb->() if $cb;
        return;
    }
}

=head2 $rc = $fth->bind_col($col_num, \$col_variable, [$cb->($rc)])

=cut
sub bind_col {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
    my($fth, $col_num, $col_ref) = @_;

    if( !@{$fth->[DATAi]} ) {
        $cb->(1) if $cb;
        return 1;
    }
    elsif( 0<=$col_num && $col_num<=$#{$fth->[DATAi][0]} ) {
        $fth->[BINDi][$col_num] = $col_ref;
        $cb->(1) if $cb;
        return 1;
    }
    else {
        $cb->() if $cb;
        return;
    }
}

=head2 $rv = $fth->fetch([$cb->($rv)])

=cut
sub fetch {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
    my $fth = shift;

    if( $fth->[BINDi] && $fth->[DATAi] && @{$fth->[DATAi]} ) {
        my $bind = $fth->[BINDi];
        my $row = shift @{$fth->[DATAi]};
        for(my $i=0; $i<@$row; ++$i) {
            ${$bind->[$i]} = $row->[$i] if $bind->[$i];
        }
        $cb->(1) if $cb;
        return 1;
    }
    else {
        $cb->() if $cb;
        return;
    }
}

=head2 @row_ary = $fth->fetchrow_array([$cb->(@row_ary)])

=cut
sub fetchrow_array {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
    my $fth = shift;

    if( $fth->[DATAi] && @{$fth->[DATAi]} ) {
        my $row = shift @{$fth->[DATAi]};
        $cb->(@$row) if $cb;
        return @$row if defined wantarray;
    }
    else {
        $cb->() if $cb;
        return ();
    }
}

=head2 $ary_ref = $fth->fetchrow_arrayref([$cb->($ary_ref)])

=cut
sub fetchrow_arrayref {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
    my $fth = shift;

    if( $fth->[DATAi] && @{$fth->[DATAi]} ) {
        my $row = shift @{$fth->[DATAi]};
        $cb->($row) if $cb;
        return $row;
    }
    else {
        $cb->() if $cb;
        return;
    }
}

=head2 $hash_ref = $fth->fetchrow_hashref([$cb->($hash_ref)])

=cut
sub fetchrow_hashref {
    my $cb = ref($_[-1]) eq 'CODE' ? pop : undef;
    my $fth = shift;

    if( $fth->[DATAi] && @{$fth->[DATAi]} ) {
        my $field = $fth->[FIELDi];
        my $hash = {};
        my $row = shift @{$fth->[DATAi]};
        for(my $i=0; $i<@$row; ++$i) {
            $hash->{$field->[$i][4]} = $row->[$i];
        }



( run in 1.778 second using v1.01-cache-2.11-cpan-5a3173703d6 )