Amazon-SQS-ProducerConsumer
view release on metacpan or search on metacpan
lib/Amazon/SQS/Consumer.pm view on Meta::CPAN
DEFAULT_WAIT_SECONDS => 30,
SECONDS_BETWEEN_TRIES => 10
};
=head1 NAME
Amazon::SQS::Consumer - Receive messages from an Amazon Simple Queue Service (SQS) queue
=cut
sub say (@) { warn join ' ', (split ' ', scalar localtime)[2,1,4,3], "[$$]", (split '/', $0)[-1], @_, "\n"; return @_; }
$SIG{INT} = sub { say 'caught signal INT'; exit 0; };
$SIG{CHLD} = 'IGNORE';
=head1 SYNOPSIS
use Amazon::SQS::Consumer;
my $in_queue = new Amazon::SQS::Consumer
AWSAccessKeyId => 'PUBLIC_KEY_HERE',
SecretAccessKey => 'SECRET_KEY_HERE',
queue => 'YourInputQueue';
lib/Amazon/SQS/Consumer.pm view on Meta::CPAN
# If there's a message in the cache, return it
if ( my $message = shift @{$me->{messages}} ) {
$me->{DeleteMessageHandle} = $message->{ReceiptHandle};
my $object;
eval {
my $body = $message->{Body};
$body = encode_utf8( $body ) if is_utf8( $body );
$object = decode_json $body;
};
if ( $@ ) {
say "left bad message in queue; could not decode JSON from $message->{Body}: $@";
} else {
return $object;
}
} elsif ( $me->{no_loop} ) {
$seconds_to_wait = 0;
} else {
# Otherwise, wait a few seconds and try again
say "waiting $seconds_to_wait seconds for new messages"
if $seconds_to_wait == $me->{wait_seconds};
sleep SECONDS_BETWEEN_TRIES;
$seconds_to_wait -= SECONDS_BETWEEN_TRIES;
}
} while ( $me->{forever} or $seconds_to_wait > 0 );
# If we've retried for a while and gotten no messages, give up
return undef;
}
sub delete_previous {
my $me = shift;
if ( $me->{DeleteMessageHandle} ) {
say "deleting message $me->{DeleteMessageHandle}" if $me->{debug};
$me->delete_message( Queue => $me->{queue}, ReceiptHandle => $me->{DeleteMessageHandle} );
}
}
sub defer { delete $_[0]->{DeleteMessageHandle} }
=head1 AUTHOR
Nic Wolff, <nic@angel.net>
lib/Amazon/SQS/Producer.pm view on Meta::CPAN
use JSON::XS;
use constant MAX_RETRIES => 3;
=head1 NAME
Amazon::SQS::Producer - Publish messages to an Amazon Simple Queue Service (SQS) queue
=cut
sub say (@) { warn join ' ', (split ' ', scalar localtime)[2,1,4,3], "[$$]", (split '/', $0)[-1], @_, "\n"; return @_; }
$SIG{INT} = sub { say 'caught signal INT'; exit 0; };
$SIG{CHLD} = 'IGNORE';
=head1 SYNOPSIS
use Amazon::SQS::Producer;
my $out_queue = new Amazon::SQS::Producer
AWSAccessKeyId => 'PUBLIC_KEY_HERE',
SecretAccessKey => 'SECRET_KEY_HERE',
queue => 'YourOutputQueue',
lib/Amazon/SQS/Producer.pm view on Meta::CPAN
sub publish {
if ( ref $_[0] and ! $_[0]->{queue} ) { goto &fork_consumer }
if ( ref $_[1] and $_[1]->{_chain_consumers} ) { goto &fork_consumer }
my $me = shift;
my $old_data = shift if ref $_[0];
my $data = { %$old_data, @_ };
my $encoded_data = encode_json $data;
say "Queueing message: $encoded_data" if $data->{_debug};
return if $data->{_test};
my $retries;
my $message_id;
until (
$message_id = $me->send_message(
Queue => $me->{queue},
MessageBody => $encoded_data,
)
) {
say "couldn't queue message: ", $me->error;
if ( $retries++ < MAX_RETRIES ) {
say "trying again in $retries seconds";
sleep $retries;
} else {
say "giving up trying to publish to queue $me->{queue} with message body: $encoded_data",
return;
}
}
if ( $me->{consumer} and $me->{started_consumers}++ < $me->{start_consumers} ) {
my $pid = fork;
if ( not defined $pid ) {
say "couldn't fork";
} elsif ( not $pid ) {
close STDIN; open STDIN, '/dev/null';
close STDOUT; open STDOUT, '/dev/null';
close STDERR; open STDERR, '>>/tmp/getfeeds.log';
sleep $me->{sleep_after_starting_consumer};
exec $me->{consumer};
} else {
say "started consumer $me->{consumer} with PID $pid for queue $me->{queue}";
}
}
return $message_id;
}
sub fork_and_publish {
my $me = shift;
my $pid = fork;
if ( not defined $pid ) {
say "couldn't fork";
} elsif ( not $pid ) {
$me->publish( @_ );
} else {
say "forked to publish to queue $me->{queue} with PID $pid";
}
}
sub fork_consumer {
my $me = shift;
my $old_data = shift if ref $_[0];
my %data = @_;
if ( $me->{consumer} ) {
my $pid = fork;
if ( not defined $pid ) {
say "couldn't fork";
} elsif ( not $pid ) {
close STDIN; open STDIN, '/dev/null';
close STDOUT; open STDOUT, '/dev/null';
close STDERR; open STDERR, '>>/tmp/getfeeds.log';
$ENV{PATH} .= ':.';
sleep $me->{sleep_after_starting_consumer};
exec $me->{consumer}, encode_json { %$old_data, %data };
} else {
say "forked consumer $me->{consumer} with PID $pid";
}
}
}
=head1 AUTHOR
Nic Wolff, <nic@angel.net>
=head1 BUGS
( run in 0.921 second using v1.01-cache-2.11-cpan-71847e10f99 )