App-MultiModule-Tasks-DocGateway

 view release on metacpan or  search on metacpan

lib/App/MultiModule/Tasks/DocGateway.pm  view on Meta::CPAN

package App::MultiModule::Tasks::DocGateway;
$App::MultiModule::Tasks::DocGateway::VERSION = '1.161330';
use 5.006;
use strict;
use warnings FATAL => 'all';
use Postgres::Mongo;
use Storable;

use parent 'App::MultiModule::Task';

=head1 NAME

App::MultiModule::Tasks::DocGateway - Interface with a persistent document store

=cut

=head2 message

=cut

sub message {
    my $self = shift;
    my $message = shift;
    my %args = @_;
    $self->debug('message', message => $message)
        if $self->{debug} > 5;
    my $state = $self->{state};
    #TODO: validate document_database, document_collection, document_method here
    $self->_find($message) if $message->{document_method} eq 'find';
    $self->_insert($message) if $message->{document_method} eq 'insert';
    $self->_remove($message) if $message->{document_method} eq 'remove';
    $self->_upsert($message) if $message->{document_method} eq 'upsert';
}

sub _upsert {
    my $self = shift;
    my $message = shift;
    #TODO: validate $message->{document_filter} here
    eval {
        local $SIG{ALRM} = sub { die "timed out\n"; };
        my $timeout = $self->{config}->{pg_upsert_timeout} || 2;
        alarm $timeout;
        my $c = $self->_get_connection();
        my $update;
        if($message->{document_update}) {
            $update = Storable::dclone($message->{document_update});
        } else {
            $update = Storable::dclone($message);
        }
        delete $update->{document_filter};
        delete $update->{'.ipc_transit_meta'};
        $c->mongo_do(
            $message->{document_database},
            $message->{document_collection},
            'upsert',
            {   filter => $message->{document_filter}},
            update => $update,
        );
    };
    alarm 0;
    if($@) {
        $self->error("App::MultiModule::Tasks::DocGateway::_upsert failed: $@");
        return;
    }
}

sub _remove {
    my $self = shift;
    my $message = shift;
    #TODO: validate $message->{document_filter} here
    eval {
        local $SIG{ALRM} = sub { die "timed out\n"; };
        my $timeout = $self->{config}->{pg_remove_timeout} || 2;
        alarm $timeout;
        my $c = $self->_get_connection();
        $c->mongo_do(
            $message->{document_database},
            $message->{document_collection},
            'remove',
            { filter => $message->{document_filter}});
    };
    alarm 0;
    if($@) {
        $self->error("App::MultiModule::Tasks::DocGateway::_remove failed: $@");
        return;
    }
}

sub _insert {
    my $self = shift;
    my $message = shift;
    eval {
        local $SIG{ALRM} = sub { die "timed out\n"; };
        my $timeout = $self->{config}->{pg_insert_timeout} || 2;
        alarm $timeout;
        my $c = $self->_get_connection();
        my $insert = Storable::dclone($message);
        delete $insert->{'.ipc_transit_meta'};
        $c->mongo_do(
            $message->{document_database},
            $message->{document_collection},
            'insert',
            $insert);
    };
    alarm 0;
    if($@) {
        $self->error("App::MultiModule::Tasks::DocGateway::_insert failed: $@");
        return;
    }
}

sub _find {
    my $self = shift;
    my $message = shift;
    #TODO: validate $message->{document_filter} here
    #TODO: variant that emits once per return document
    my @emits = eval {
        local $SIG{ALRM} = sub { die "timed out\n"; };
        my $timeout = $self->{config}->{pg_find_timeout} || 7;
        alarm $timeout;
        my $c = $self->_get_connection();
        my $documents = $c->mongo_find(
            $message->{document_database},
            $message->{document_collection},
            $message->{document_filter},
        );
        $message->{document_returns} = $documents;
        return ($message);
    };
    alarm 0;
    if($@) {
        $self->error("App::MultiModule::Tasks::DocGateway::_find failed: $@");
        return;
    }
    $self->emit($_) for @emits;
}

sub _get_connection {
    my $self = shift;
    my $handle = eval {
        local $SIG{ALRM} = sub { die "timed out\n"; };
        my $timeout = $self->{config}->{pg_connect_timeout} || 5;
        alarm $timeout;
        return Postgres::Mongo->new(
            userid => $self->{pg_userid},
            password => $self->{pg_password},
        );
    };
    alarm 0;
    $self->error("App::MultiModule::Tasks::DocGateway::_get_connection failed: $@") if $@;
    return $handle;
}

=head2 set_config

=cut
sub set_config {
    my $self = shift;
    my $config = shift;
    $self->{config} = $config;
    my $state = $self->{state};
    $self->{pg_userid} = $config->{pg_userid} || 'testuser1';
    $self->{pg_password} = $config->{pg_password} || 'testuser1';
}

=head2 is_stateful

=cut
sub is_stateful {
    return 'absolultely';
}


=head1 AUTHOR



( run in 1.409 second using v1.01-cache-2.11-cpan-39bf76dae61 )