Amazon-SQS-ProducerConsumer
view release on metacpan or search on metacpan
lib/Amazon/SQS/Consumer.pm view on Meta::CPAN
Your secret key, WARNING! don't give this out or someone will be able to use your account and incur charges on your behalf.
=item queue (required)
The URL of the queue to receive messages from.
=item wait_seconds (optional)
The number of seconds to wait for a new message when the queue is empty.
=item debug (optional)
A flag to turn on debugging. It is turned off by default.
=back
=cut
sub new {
my $class = shift;
my %args = @_;
my $me = \%args;
lib/Amazon/SQS/Consumer.pm view on Meta::CPAN
# If we've retried for a while and gotten no messages, give up
return undef;
}
sub delete_previous {
my $me = shift;
if ( $me->{DeleteMessageHandle} ) {
say "deleting message $me->{DeleteMessageHandle}" if $me->{debug};
$me->delete_message( Queue => $me->{queue}, ReceiptHandle => $me->{DeleteMessageHandle} );
}
}
sub defer { delete $_[0]->{DeleteMessageHandle} }
=head1 AUTHOR
Nic Wolff, <nic@angel.net>
lib/Amazon/SQS/Producer.pm view on Meta::CPAN
The URL of the queue to publish messages to.
=item consumer (optional)
The name of an executable that will consume messages from the queue we're publishing to. An instance will be launched after the each message is published, up to the maximum set by...
=item start_consumers (optional)
The maximum number of consumer instance to launch.
=item debug (optional)
A flag to turn on debugging. It is turned off by default.
=back
=cut
sub new {
my $class = shift;
my %args = @_;
my $me = \%args;
lib/Amazon/SQS/Producer.pm view on Meta::CPAN
sub publish {
if ( ref $_[0] and ! $_[0]->{queue} ) { goto &fork_consumer }
if ( ref $_[1] and $_[1]->{_chain_consumers} ) { goto &fork_consumer }
my $me = shift;
my $old_data = shift if ref $_[0];
my $data = { %$old_data, @_ };
my $encoded_data = encode_json $data;
say "Queueing message: $encoded_data" if $data->{_debug};
return if $data->{_test};
my $retries;
my $message_id;
until (
$message_id = $me->send_message(
Queue => $me->{queue},
MessageBody => $encoded_data,
)
) {
lib/Amazon/SQS/ProducerConsumer/Base.pm view on Meta::CPAN
my @params;
for ( sort keys %args ) {
push @params, join '=', $_, uri_escape_utf8( $args{$_}, "^A-Za-z0-9\-_.~" );
}
$me->{resource_path} =~ s|http://$me->{host}/||;
my $string_to_sign = join( "\n",
'POST', $me->{host}, $me->{resource_path}, join( '&', @params )
);
$me->debug("QUERY TO SIGN: $string_to_sign");
my $hashed = Digest::HMAC_SHA1->new( $me->{SecretAccessKey} );
$hashed->add( $string_to_sign );
my $encoded = encode_base64( $hashed->digest, '' );
$me->debug("ENCODED SIGNATURE: $encoded");
$args{Signature} = $encoded;
my $result = LWP::UserAgent->new->post( "http://$me->{host}$me->{resource_path}", \%args );
$me->debug("REQUEST RETURNED: " . $result->content);
if ( $result->is_success ) {
my $parser = XML::Simple->new( ForceArray => [ 'item', 'QueueURL','AttributedValue', 'Attribute' ] );
return $parser->XMLin( $result->content() );
} else {
return { Errors => { Error => { Message => 'HTTP POST failed with error ' . $result->status_line } } };
}
}
sub check_error {
my ($me, $xml) = @_;
if ( defined $xml->{Errors} && defined $xml->{Errors}{Error} ) {
$me->debug("ERROR: $xml->{Errors}{Error}{Message}");
$me->{error} = $xml->{Errors}{Error}{Message};
warn $me->{error};
return 1;
}
}
sub error { $_[0]->{error} }
sub debug {
my ($me, $message) = @_;
if ((grep { defined && length } $me->{debug}) && $me->{debug} == 1) {
warn "$message\n";
}
}
=head1 AUTHOR
Nic Wolff, <nic@angel.net>
=head1 BUGS
( run in 1.136 second using v1.01-cache-2.11-cpan-49f99fa48dc )