Amazon-SQS-Client
view release on metacpan or search on metacpan
bin/QueueDaemon.pl view on Meta::CPAN
#!/usr/bin/perl
use strict;
use warnings;
########################################################################
package main;
########################################################################
use Class::Inspector;
use Class::Unload;
use Cwd;
use Data::Dumper;
use English qw(-no_match_vars);
use File::Basename qw(fileparse);
use Getopt::Long qw(:config no_ignore_case);
use List::Util qw(max min);
use Log::Log4perl;
use Log::Log4perl::Level;
use Pod::Usage;
use Proc::Daemon;
use Proc::PID::File;
use Module::Load qw(autoload);
use Amazon::SQS::Config;
use Amazon::SQS::Client;
use Readonly;
Readonly::Scalar our $TRUE => 1;
Readonly::Scalar our $FALSE => 0;
Readonly::Scalar our $DEFAULT_SLEEP_TIME => 5;
Readonly::Scalar our $MAX_SLEEP_TIME => 60;
Readonly::Scalar our $APPENDER_NAME => 'LOGFILE';
Readonly::Scalar our $LOGFILE_CONFIG => <<'END_OF_LOGGER';
log4perl.rootLogger=INFO, LOGFILE
log4perl.appender.LOGFILE=Log::Log4perl::Appender::File
log4perl.appender.LOGFILE.filename=%s
log4perl.appender.LOGFILE.mode=append
log4perl.appender.LOGFILE.layout=PatternLayout
log4perl.appender.LOGFILE.layout.ConversionPattern=%%d (%%r,%%R) (%%p/%%c) [%%P] [%%M:%%L] - %%m%%n
END_OF_LOGGER
Readonly::Scalar our $SCREEN_CONFIG => <<'END_OF_LOGGER';
log4perl.rootLogger=INFO, SCREEN
log4perl.appender.SCREEN=Log::Log4perl::Appender::Screen
log4perl.appender.SCREEN.stderr=%s
log4perl.appender.SCREEN.layout=PatternLayout
log4perl.appender.SCREEN.layout.ConversionPattern=%%d (%%r,%%R) (%%p/%%c) [%%P] [%%M:%%L] - %%m%%n
END_OF_LOGGER
our $KEEP_GOING = $TRUE;
our $RELOAD = $FALSE;
########################################################################
sub get_options {
########################################################################
my @option_specs = qw(
config|c=s
create-queue|C
daemonize|d!
delete-when|D=s
exit-when|E=s
endpoint_url|e=s
help|h
logfile|L=s
loglevel|l=s
max-children|m=i
max-sleep-time=i
max-messages=i
pidfile|p=s
queue|q=s
queue-interval|I=i
handler|H=s
message-type|M=s
visibility-timeout|v=i
wait-time|w=i
);
# default options
my %options = (
daemonize => $TRUE,
'exit-when' => 'never',
'delete-when' => 'true', # delete message if handled successfully
handler => 'Amazon::SQS::QueueHandler',
);
my $retval = GetOptions( \%options, @option_specs );
if ( !$retval || $options{help} ) {
pod2usage(1);
}
die "set 'wait-time' or 'queue-interval' but not both\n"
if $options{'wait-time'} && $options{'queue-interval'};
return %options;
}
########################################################################
sub main {
########################################################################
my %options = get_options();
die sprintf "no such file %s\n", $options{config}
if $options{config} && ( !-e $options{config} || !-r $options{config} );
my $config = load_config( \%options );
if ( !defined $options{'wait-time'} && !defined $options{'queue-interval'} ) {
$options{'queue-interval'} = $DEFAULT_SLEEP_TIME;
}
if ( $options{'queue-interval'} && !defined $options{'max-sleep-time'} ) {
$options{'max-sleep-time'} = $MAX_SLEEP_TIME;
}
my $logger = init_logger( \%options );
$logger->trace(
Dumper(
[ config => $config,
options => \%options
]
)
);
# instantiate Amazon::SQS::QueueHandler class
my $handler = eval { return load_handler( config => $config, options => \%options, logger => $logger ); };
if ( !$handler || $EVAL_ERROR ) {
my $err = $EVAL_ERROR;
if ( ref $err ) {
die sprintf
"\rERROR: could not instantiate handler:\nMessage:\t[%s]\nCode:\t\t[%s]\nHTTP Response:\t[%s]\n",
$err->getMessage,
$err->getErrorCode,
$err->getHTTPError;
}
elsif ($err) {
die "ERROR: could not instantiate handler:\n$err";
}
else {
die "ERROR: could not instantiate handler\n";
}
}
# set up signal handlers
setup_signal_handlers( \%options, \$handler );
my $service = $handler->get_service;
if ( $options{daemonize} ) {
my @dont_close_fh;
if ( $options{logfile} && $options{logfile} !~ /(?:stderr|stdout)/ixsm ) {
my $appender = Log::Log4perl->appender_by_name($APPENDER_NAME);
push @dont_close_fh, $appender->{fh};
}
push @dont_close_fh, 'STDERR';
push @dont_close_fh, 'STDOUT';
my %daemon_config = (
work_dir => cwd,
@dont_close_fh ? ( dont_close_fh => \@dont_close_fh ) : (),
);
Proc::Daemon->new(%daemon_config)->Init();
my %pidfile;
if ( $config && $config->get_handler_pidfile ) {
my ( $name, $path, $ext ) = fileparse( $config->get_handler_pidfile, qr/[.][^.]+$/xsm );
$pidfile{dir} = $path;
$ext //= 'pid';
$pidfile{name} = sprintf '%s.%s', $name, $ext;
}
# If already running, then exit
if ( Proc::PID::File->running(%pidfile) ) {
$logger->error('already running...');
exit 0;
}
}
my $sleep;
while ($KEEP_GOING) {
if ( !$sleep || $RELOAD ) {
$sleep = $options{'queue-interval'};
$RELOAD = $FALSE;
}
$logger->info( sprintf 'reading queue: %s', $handler->get_url );
my $message = eval {
my $message = $handler->get_next_message();
return $message
if $message;
die $EVAL_ERROR
if $EVAL_ERROR;
$logger->info('no messages...');
if ( !$handler->get_wait_time ) {
$logger->info( sprintf '...sleeping for %d seconds', $sleep );
sleep $sleep;
$sleep = min( $options{'max-sleep-time'}, sleep_time( $sleep, \%options ) );
}
return;
bin/QueueDaemon.pl view on Meta::CPAN
if !$handler->get_message();
# handle message disposition
if ($err) {
if ( $options{'delete-when'} =~ /(?:error|always)/xsm ) {
$logger->info('deleting message');
$handler->delete_message();
}
if ( $options{'exit-when'} =~ /(?:error|always)/xsm ) {
$KEEP_GOING = $FALSE;
}
}
else {
if ( $options{'delete-when'} =~ /(?:false|always)/xsm ) {
$logger->info('deleting message');
$handler->delete_message();
}
if ( $options{'exit-when'} =~ /(?:always|false)/xsm ) {
$KEEP_GOING = $FALSE;
}
}
}
else {
$logger->info( sprintf 'message (%s) handled successfully', $handler->get_message_id );
if ( $options{'delete-when'} =~ /(?:always|true)/xsm ) {
$logger->info('deleting message');
$handler->delete_message();
}
}
if ( $options{'exit-when'} eq 'always' ) {
$KEEP_GOING = $FALSE;
}
}
return 0;
}
########################################################################
sub init_logger {
########################################################################
my ($options) = @_;
my ( $logfile, $loglevel ) = @{$options}{qw(logfile loglevel)};
$loglevel //= 'info';
$loglevel = {
error => $ERROR,
debug => $DEBUG,
trace => $TRACE,
info => $INFO,
warn => $WARN,
}->{ lc $loglevel };
$loglevel //= $INFO;
$logfile //= 'stderr';
my $log4perl_config;
if ( !$logfile || $logfile =~ /(?:stderr|stdout)/xsmi ) {
$log4perl_config = sprintf $SCREEN_CONFIG, $logfile eq 'stdout' ? 0 : 1;
$options->{logfile} = lc $logfile;
}
else {
$log4perl_config = sprintf $log4perl_config, $logfile;
}
if ( Log::Log4perl->initialized() ) {
my $logger = Log::Log4perl->get_logger;
$logger->level($loglevel);
return $logger;
}
Log::Log4perl->init( \$log4perl_config );
my $logger = Log::Log4perl->get_logger;
$logger->level($loglevel);
return $logger;
}
########################################################################
sub setup_signal_handlers {
########################################################################
my ( $options, $handler ) = @_;
$SIG{HUP} = sub {
print {*STDERR} "Caught SIGHUP: re-reading config file.\n";
$KEEP_GOING = $TRUE;
my $config = load_config($options);
${$handler} = load_handler(
options => $options,
logger => ${$handler}->get_logger,
credentials => ${$handler}->get_credentials,
config => $config,
);
init_logger($options); # just reset loglevel (potentially)
$RELOAD = $TRUE;
};
$SIG{INT} = sub {
print {*STDERR} ("Caught SIGINT: exiting gracefully\n");
$KEEP_GOING = $FALSE;
};
$SIG{QUIT} = sub {
print {*STDERR} ("Caught SIGQUIT: exiting gracefully\n");
$KEEP_GOING = $FALSE;
};
$SIG{TERM} = sub {
print {*STDERR} ("Caught SIGTERM: exiting gracefully\n");
$KEEP_GOING = $FALSE;
};
return;
}
########################################################################
sub load_config {
########################################################################
my ($options) = @_;
return
if !$options->{config};
my $config = Amazon::SQS::Config->new( file => $options->{config} );
$options->{loglevel} //= $config->get_log_level;
$options->{logfile} //= $config->get_log_file;
$options->{logfile} //= 'stderr';
$options->{'delete-when'} //= $config->get_error_delete;
$options->{'exit-when'} //= $config->get_error_exit;
$options->{handler} //= $config->get_handler_class;
$options->{'max-sleep-time'} //= $config->get_queue_max_wait;
$options->{'max-messages'} //= $config->get_queue_max_messages // 1;
$options->{queue} //= $config->get_queue_name;
$options->{'queue-url'} //= $config->get_queue_url;
$options->{'queue-interval'} //= $config->get_queue_interval;
$options->{'create-queue'} //= $config->get_queue_create_queue // $FALSE;
$options->{'visibility-timeout'} //= $config->get_queue_visibility_timeout;
$options->{'wait-time'} //= $config->get_queue_wait_time;
return $config;
}
########################################################################
sub load_handler {
########################################################################
my %args = @_;
my ( $config, $options, $logger, $credentials ) = @args{qw(config options logger credentials)};
if ( Class::Inspector->loaded( $options->{handler} ) ) {
Class::Unload->unload( $options->{handler} );
}
autoload $options->{handler};
my $handler = $options->{handler}->new(
config => $config,
logger => $logger,
endpoint_url => $options->{endpoint_url},
name => $options->{queue},
url => $options->{'queue-url'},
message_type => $options->{'message-type'},
create_queue => $options->{'create-queue'},
wait_time => $options->{'wait-time'},
visibility_timeout => $options->{'visibility-timeout'},
credentials => $credentials,
);
die "not an Amazon::SQS::QueueHandler\n"
if !$handler->isa('Amazon::SQS::QueueHandler');
return $handler;
}
########################################################################
sub sleep_time {
( run in 0.707 second using v1.01-cache-2.11-cpan-39bf76dae61 )