Amazon-SQS-ProducerConsumer
view release on metacpan or search on metacpan
lib/Amazon/SQS/Consumer.pm view on Meta::CPAN
=head1 NAME
Amazon::SQS::Consumer - Receive messages from an Amazon Simple Queue Service (SQS) queue
=cut
sub say (@) { warn join ' ', (split ' ', scalar localtime)[2,1,4,3], "[$$]", (split '/', $0)[-1], @_, "\n"; return @_; }
$SIG{INT} = sub { say 'caught signal INT'; exit 0; };
$SIG{CHLD} = 'IGNORE';
=head1 SYNOPSIS
use Amazon::SQS::Consumer;
my $in_queue = new Amazon::SQS::Consumer
AWSAccessKeyId => 'PUBLIC_KEY_HERE',
SecretAccessKey => 'SECRET_KEY_HERE',
queue => 'YourInputQueue';
while ( my $item = $in_queue->next ) {
# Do stuff with the item
}
=head1 METHODS
=head2 new(%params)
This is the constructor, it will return you an Amazon::SQS::Consumer object to work with. It takes these parameters:
=over
=item AWSAccessKeyId (required)
Your AWS access key.
=item SecretAccessKey (required)
Your secret key, WARNING! don't give this out or someone will be able to use your account and incur charges on your behalf.
=item queue (required)
The URL of the queue to receive messages from.
=item wait_seconds (optional)
The number of seconds to wait for a new message when the queue is empty.
=item debug (optional)
A flag to turn on debugging. It is turned off by default.
=back
=cut
sub new {
my $class = shift;
my %args = @_;
my $me = \%args;
bless $me, $class;
$me->initialize;
return $me;
}
sub initialize {
my $me = shift;
$me->{n_messages} ||= DEFAULT_N_MESSAGES;
$me->{wait_seconds} ||= DEFAULT_WAIT_SECONDS;
$me->SUPER::initialize;
}
=head2 next()
This will receive a message from this Publisher's queue. When the queue is empty it will wait a new message for wait_seconds seconds.
=cut
sub next {
my $me = shift;
# If we're done with the previous message, delete it
$me->delete_previous();
if ( @ARGV ) {
$me->{messages} = [ map { MessageId => undef, Body => $_ }, @ARGV ];
undef @ARGV;
$me->{no_loop} = 't';
}
my $seconds_to_wait = $me->{wait_seconds};
do {
# If there no messages in the cache, get some from the queue
$me->{messages} = $me->receive_messages(
Queue => $me->{queue},
MaxNumberOfMessages => $me->{n_messages},
defined $me->{timeout} ? ( VisibilityTimeout => $me->{timeout} ) : ()
) unless defined $me->{messages} && @{$me->{messages}} or $me->{no_loop};
# If there's a message in the cache, return it
if ( my $message = shift @{$me->{messages}} ) {
$me->{DeleteMessageHandle} = $message->{ReceiptHandle};
my $object;
eval {
my $body = $message->{Body};
$body = encode_utf8( $body ) if is_utf8( $body );
$object = decode_json $body;
};
if ( $@ ) {
say "left bad message in queue; could not decode JSON from $message->{Body}: $@";
} else {
return $object;
}
} elsif ( $me->{no_loop} ) {
$seconds_to_wait = 0;
} else {
# Otherwise, wait a few seconds and try again
say "waiting $seconds_to_wait seconds for new messages"
if $seconds_to_wait == $me->{wait_seconds};
( run in 2.610 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )