Lim

 view release on metacpan or  search on metacpan

lib/Lim/Util/DBI.pm  view on Meta::CPAN

use Carp;
use Scalar::Util qw(weaken);

use Log::Log4perl ();
use DBI ();
use JSON::XS ();

use AnyEvent ();
use AnyEvent::Util ();

use Lim ();

=encoding utf8

=head1 NAME

Lim::Util::DBI - Create a DBH that is executed in a forked process

=head1 VERSION

See L<Lim> for version.

=cut

our $VERSION = $Lim::VERSION;
our %METHOD = (
    connect => 1,
    disconnect => 1,
    execute => 1,
    begin_work => 1,
    commit => 1,
    rollback => 1
);

=head1 SYNOPSIS

=over 4

use Lim::Util::DBI;

=back

=head1 METHODS

=over 4

=item new

=cut

sub new {
    my $this = shift;
    my $class = ref($this) || $this;
    my $dbi = shift;
    my $user = shift;
    my $password = shift;
    my %args = ( @_ );
    my $self = {
        logger => Log::Log4perl->get_logger($class),
        json => JSON::XS->new->utf8->convert_blessed,
        busy => 0
    };
    bless $self, $class;
    weaken($self->{logger});
    my $real_self = $self;
    weaken($self);

    unless (defined $dbi) {
        confess __PACKAGE__, ': Missing dbi connection string';
    }
    unless (defined $args{on_connect} and ref($args{on_connect}) eq 'CODE') {
        confess __PACKAGE__, ': Missing on_connect or it is not CODE';
    }

    my $on_connect = delete $args{on_connect};

    if (defined $args{on_error}) {
        unless (ref($args{on_error}) eq 'CODE') {
            confess __PACKAGE__, ': on_error is not CODE';
        }
        $self->{on_error} = delete $args{on_error};
    }

    my ($child, $parent) = AnyEvent::Util::portable_socketpair;
    unless (defined $child and defined $parent) {
        confess __PACKAGE__, ': Unable to create client/server socket pairs: ', $!;
    }

    AnyEvent::Util::fh_nonblocking $child, 1;
    $self->{child} = $child;

    my $pid = fork;

    if ($pid) {
        #
        # Parent process
        #

        close $parent;

        $self->{child_pid} = $pid;
        $self->{child_watcher} = AnyEvent->io(
            fh => $child,
            poll => 'r',
            cb => sub {
                unless (defined $self and exists $self->{child}) {
                    return;
                }

                my $response;
                my $len = sysread $self->{child}, my $buf, 64*1024;
                if ($len > 0) {
                    undef $@;

                    eval {
                        $response = $self->{json}->incr_parse($buf);
                    };
                    if ($@) {
                        Lim::DEBUG and $self->{logger}->debug('Response JSON parse failed: ', $@);
                        $response = [];
                    }
                    else {
                        my $errstr = shift @$response;
                        if ($errstr) {
                            $@ = $errstr;
                        }
                    }
                }
                elsif (defined $len) {
                    $@ = 'Unexpected EOF';
                    Lim::DEBUG and $self->{logger}->debug($@);

                    shutdown($self->{child}, 2);
                    close(delete $self->{child});
                    $response = [];
                }
                elsif ($! != Errno::EAGAIN) {
                    $@ = 'Unable to read from child: '.$!;
                    Lim::DEBUG and $self->{logger}->debug($@);

                    shutdown($self->{child}, 2);
                    close(delete $self->{child});
                    $response = [];
                }

                if (defined $response and exists $self->{cb}) {
                    unless (ref($response) eq 'ARRAY') {
                        $@ = 'Invalid response';
                        Lim::DEBUG and $self->{logger}->debug($@);
                        $response = [];
                    }

                    my $cb = delete $self->{cb};
                    $self->{busy} = 0;
                    $cb->($self, @$response);
                }
            });
    }
    elsif (defined $pid) {
        #
        # Child process
        #

        $SIG{HUP} => 'IGNORE';
        $SIG{INT} => 'IGNORE';
        $SIG{TERM} => 'IGNORE';
        $SIG{PIPE} => 'IGNORE';
        $SIG{QUIT} => 'IGNORE';
        $SIG{ALRM} => 'IGNORE';

        Log::Log4perl->init( \q(
log4perl.threshold                = OFF
log4perl.logger                   = DEBUG, Screen
log4perl.appender.Screen          = Log::Log4perl::Appender::Screen
log4perl.appender.Screen.stderr   = 0
log4perl.appender.Screen.layout   = Log::Log4perl::Layout::PatternLayout
log4perl.appender.Screen.layout.ConversionPattern = %d %F [%L] %p: %m%n
) );

        if (exists $self->{close_fds} and $self->{close_fds}) {
            my $parent_fno = fileno $parent;

            foreach ($^F+1 .. (POSIX::sysconf (&POSIX::_SC_OPEN_MAX) || 1024)) {
                unless ($_ == $parent_fno) {
                    POSIX::close($_);
                }
            }
        }

        while () {
            my $request;
            my $len = sysread $parent, my $buf, 64*1024;
            if ($len > 0) {
                undef $@;

                eval {
                    $request = $self->{json}->incr_parse($buf);
                };
                if ($@) {
                    last;
                }
            }
            elsif (defined $len) {
                last;
            }
            else {
                last;
            }

            if (defined $request) {
                unless (ref($request) eq 'ARRAY') {
                    last;
                }

lib/Lim/Util/DBI.pm  view on Meta::CPAN


    if (exists $self->{child}) {
        shutdown($self->{child}, 2);
        close($self->{child});
    }
}

=item process

=cut

sub process {
    my $self = shift;
    my $method = shift;
    my $response;

    unless (exists $METHOD{$method}) {
        return ['Method '.$method.' is not allowed'];
    }

    $method = 'child_'.$method;
    eval {
        $response = $self->$method(@_);
    };
    if ($@) {
        return [$@];
    }

    return $response;
}

=item request

=cut

sub request {
    my $self = shift;
    my $cb = shift;
    my ($method) = @_;
    my $request;
    weaken($self);

    undef $@;

    unless (ref($cb) eq 'CODE') {
        confess __PACKAGE__, 'cb is not CODE';
    }

    unless (exists $METHOD{$method}) {
        $@ = 'Method '.$method.' is not allowed';
        $cb->();
        return;
    }

    unless (exists $self->{child}) {
        $@ = 'No connection to the DBI process';
        $cb->();
        return;
    }

    if ($self->{busy}) {
        $@ = 'DBH is busy, multiple command execution is not allowed';
        $cb->();
        return;
    }

    eval {
        $request = $self->{json}->encode(\@_);
    };
    if ($@) {
        $cb->();
        return;
    }

    $self->{busy} = 1;
    $self->{cb} = $cb;

    Lim::DEBUG and $self->{logger}->debug('Sending DBI request ', $method);

    my $len = syswrite $self->{child}, $request;
    unless (defined $len and $len > 0) {
        $@ = 'Connection broken';
        $self->kill;
        $cb->();
        return;
    }

    unless ($len >= length $request) {
        $request = substr $request, $len;

        $self->{request_watcher} = AnyEvent->io(
            fh => $request,
            poll => 'w',
            cb => sub {
                unless (defined $self) {
                    return;
                }

                $len = syswrite $self->{child}, $request;
                unless (defined $len and $len > 0) {
                    $@ = 'Connection broken';
                    my $cb = $self->{cb};
                    $self->kill;
                    $cb->();
                    return;
                }

                unless ($len >= length $request) {
                    $request = substr $request, $len;
                    return;
                }

                delete $self->{request_watcher};
            });
    }

    return 1;
}

=item kill

=cut

sub kill {
    my ($self) = @_;

    if (exists $self->{child}) {
        shutdown($self->{child}, 2);
        close(delete $self->{child});
    }

    delete $self->{child_watcher};
    delete $self->{request_watcher};
    $self->{busy} = 0;
    delete $self->{cb};
}

=item child_connect

=cut

sub child_connect {
    my ($self, $dbi, $user, $pass, $attr) = @_;

    unless (($self->{dbh} = DBI->connect($dbi, $user, $pass, $attr))) {
        return [$DBI::errstr];
    }

    [0, 1];
}

=item child_disconnect

=cut

sub child_disconnect {
    my ($self) = @_;

    unless (defined $self->{dbh}) {
        return ['No connect to the database exists'];
    }

    $self->{dbh}->disconnect;
    delete $self->{dbh};

    [0, 1];
}

=item child_execute

=cut

sub child_execute {
    my ($self, $statement, @args) = @_;
    my ($sth, $rv, $rows);

    unless (defined $self->{dbh}) {
        return ['No connect to the database exists'];
    }

    unless (($sth = $self->{dbh}->prepare_cached($statement, undef, 1))) {
        return [$DBI::errstr];
    }

    unless (($rv = $sth->execute(@args))) {
        return [$sth->errstr];
    }

    $rows = $sth->fetchall_arrayref;
    $sth->finish;

    [0, $rows, $rv];
}



( run in 2.441 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )