Amazon-SQS-ProducerConsumer

 view release on metacpan or  search on metacpan

lib/Amazon/SQS/Consumer.pm  view on Meta::CPAN

Your secret key, WARNING! don't give this out or someone will be able to use your account and incur charges on your behalf.

=item queue (required)

The URL of the queue to receive messages from.

=item wait_seconds (optional)

The number of seconds to wait for a new message when the queue is empty.

=item debug (optional)

A flag to turn on debugging. It is turned off by default.

=back

=cut

sub new {
	my $class = shift;
	my %args = @_;

	my $me = \%args;

lib/Amazon/SQS/Consumer.pm  view on Meta::CPAN


	# If we've retried for a while and gotten no messages, give up
	return undef;

}

sub delete_previous {
	my $me = shift;

	if ( $me->{DeleteMessageHandle} ) {
		say "deleting message $me->{DeleteMessageHandle}" if $me->{debug};
		$me->delete_message( Queue => $me->{queue}, ReceiptHandle => $me->{DeleteMessageHandle} );
	}
}

sub defer { delete $_[0]->{DeleteMessageHandle} }


=head1 AUTHOR

Nic Wolff, <nic@angel.net>

lib/Amazon/SQS/Producer.pm  view on Meta::CPAN

The URL of the queue to publish messages to.

=item consumer (optional)

The name of an executable that will consume messages from the queue we're publishing to. An instance will be launched after the each message is published, up to the maximum set by...

=item start_consumers (optional)

The maximum number of consumer instance to launch.

=item debug (optional)

A flag to turn on debugging. It is turned off by default.

=back

=cut

sub new {
	my $class = shift;
	my %args = @_;

	my $me = \%args;

lib/Amazon/SQS/Producer.pm  view on Meta::CPAN


sub publish {
	if ( ref $_[0] and ! $_[0]->{queue} ) { goto &fork_consumer }
	if ( ref $_[1] and $_[1]->{_chain_consumers} ) { goto &fork_consumer }

	my $me = shift;
	my $old_data = shift if ref $_[0];
	my $data = { %$old_data, @_ };
	my $encoded_data = encode_json $data;

	say "Queueing message: $encoded_data" if $data->{_debug};
	return if $data->{_test};

	my $retries;
	my $message_id;
	until (
		$message_id = $me->send_message(
			Queue => $me->{queue},
			MessageBody => $encoded_data,
		)
	) {

lib/Amazon/SQS/ProducerConsumer/Base.pm  view on Meta::CPAN

	my @params;
	for ( sort keys %args ) {
		push @params, join '=', $_, uri_escape_utf8( $args{$_}, "^A-Za-z0-9\-_.~" );
	}

	$me->{resource_path} =~ s|http://$me->{host}/||;
	my $string_to_sign = join( "\n",
                'POST', $me->{host}, $me->{resource_path}, join( '&', @params )
        );

	$me->debug("QUERY TO SIGN: $string_to_sign");

	my $hashed = Digest::HMAC_SHA1->new( $me->{SecretAccessKey} );
	$hashed->add( $string_to_sign );
	my $encoded = encode_base64( $hashed->digest, '' );
	$me->debug("ENCODED SIGNATURE: $encoded");
	$args{Signature} = $encoded;

	my $result = LWP::UserAgent->new->post( "http://$me->{host}$me->{resource_path}", \%args );

	$me->debug("REQUEST RETURNED: " . $result->content);

	if ( $result->is_success ) {
		my $parser = XML::Simple->new( ForceArray => [ 'item', 'QueueURL','AttributedValue', 'Attribute' ] );
		return $parser->XMLin( $result->content() );
	} else {
		return { Errors => { Error => { Message => 'HTTP POST failed with error ' . $result->status_line } } };
	}

}

sub check_error {
	my ($me, $xml) = @_;

	if ( defined $xml->{Errors} && defined $xml->{Errors}{Error} ) {
		$me->debug("ERROR: $xml->{Errors}{Error}{Message}");
		$me->{error} = $xml->{Errors}{Error}{Message};
		warn $me->{error};
		return 1;
	}
}

sub error { $_[0]->{error} }

sub debug {
	my ($me, $message) = @_;

	if ((grep { defined && length } $me->{debug}) && $me->{debug} == 1) {
		warn "$message\n";
	}
}


=head1 AUTHOR

Nic Wolff, <nic@angel.net>

=head1 BUGS



( run in 1.082 second using v1.01-cache-2.11-cpan-49f99fa48dc )