App-MultiModule-Tasks-DocGateway
view release on metacpan or search on metacpan
lib/App/MultiModule/Tasks/DocGateway.pm view on Meta::CPAN
package App::MultiModule::Tasks::DocGateway;
$App::MultiModule::Tasks::DocGateway::VERSION = '1.161330';
use 5.006;
use strict;
use warnings FATAL => 'all';
use Postgres::Mongo;
use Storable;
use parent 'App::MultiModule::Task';
=head1 NAME
App::MultiModule::Tasks::DocGateway - Interface with a persistent document store
=cut
=head2 message
=cut
sub message {
my $self = shift;
my $message = shift;
my %args = @_;
$self->debug('message', message => $message)
if $self->{debug} > 5;
my $state = $self->{state};
#TODO: validate document_database, document_collection, document_method here
$self->_find($message) if $message->{document_method} eq 'find';
$self->_insert($message) if $message->{document_method} eq 'insert';
$self->_remove($message) if $message->{document_method} eq 'remove';
$self->_upsert($message) if $message->{document_method} eq 'upsert';
}
sub _upsert {
my $self = shift;
my $message = shift;
#TODO: validate $message->{document_filter} here
eval {
local $SIG{ALRM} = sub { die "timed out\n"; };
my $timeout = $self->{config}->{pg_upsert_timeout} || 2;
alarm $timeout;
my $c = $self->_get_connection();
my $update;
if($message->{document_update}) {
$update = Storable::dclone($message->{document_update});
} else {
$update = Storable::dclone($message);
}
delete $update->{document_filter};
delete $update->{'.ipc_transit_meta'};
$c->mongo_do(
$message->{document_database},
$message->{document_collection},
'upsert',
{ filter => $message->{document_filter}},
update => $update,
);
};
alarm 0;
if($@) {
$self->error("App::MultiModule::Tasks::DocGateway::_upsert failed: $@");
return;
}
}
sub _remove {
my $self = shift;
my $message = shift;
#TODO: validate $message->{document_filter} here
eval {
local $SIG{ALRM} = sub { die "timed out\n"; };
my $timeout = $self->{config}->{pg_remove_timeout} || 2;
alarm $timeout;
my $c = $self->_get_connection();
$c->mongo_do(
$message->{document_database},
$message->{document_collection},
'remove',
{ filter => $message->{document_filter}});
};
alarm 0;
if($@) {
$self->error("App::MultiModule::Tasks::DocGateway::_remove failed: $@");
return;
}
}
sub _insert {
my $self = shift;
my $message = shift;
eval {
local $SIG{ALRM} = sub { die "timed out\n"; };
my $timeout = $self->{config}->{pg_insert_timeout} || 2;
alarm $timeout;
my $c = $self->_get_connection();
my $insert = Storable::dclone($message);
delete $insert->{'.ipc_transit_meta'};
$c->mongo_do(
$message->{document_database},
$message->{document_collection},
'insert',
$insert);
};
alarm 0;
if($@) {
$self->error("App::MultiModule::Tasks::DocGateway::_insert failed: $@");
return;
}
}
sub _find {
my $self = shift;
my $message = shift;
#TODO: validate $message->{document_filter} here
#TODO: variant that emits once per return document
my @emits = eval {
local $SIG{ALRM} = sub { die "timed out\n"; };
my $timeout = $self->{config}->{pg_find_timeout} || 7;
alarm $timeout;
my $c = $self->_get_connection();
my $documents = $c->mongo_find(
$message->{document_database},
$message->{document_collection},
$message->{document_filter},
);
$message->{document_returns} = $documents;
return ($message);
};
alarm 0;
if($@) {
$self->error("App::MultiModule::Tasks::DocGateway::_find failed: $@");
return;
}
$self->emit($_) for @emits;
}
sub _get_connection {
my $self = shift;
my $handle = eval {
local $SIG{ALRM} = sub { die "timed out\n"; };
my $timeout = $self->{config}->{pg_connect_timeout} || 5;
alarm $timeout;
return Postgres::Mongo->new(
userid => $self->{pg_userid},
password => $self->{pg_password},
);
};
alarm 0;
$self->error("App::MultiModule::Tasks::DocGateway::_get_connection failed: $@") if $@;
return $handle;
}
=head2 set_config
=cut
sub set_config {
my $self = shift;
my $config = shift;
$self->{config} = $config;
my $state = $self->{state};
$self->{pg_userid} = $config->{pg_userid} || 'testuser1';
$self->{pg_password} = $config->{pg_password} || 'testuser1';
}
=head2 is_stateful
=cut
sub is_stateful {
return 'absolultely';
}
=head1 AUTHOR
( run in 1.409 second using v1.01-cache-2.11-cpan-39bf76dae61 )