Amazon-SQS-ProducerConsumer

 view release on metacpan or  search on metacpan

lib/Amazon/SQS/Consumer.pm  view on Meta::CPAN

	DEFAULT_WAIT_SECONDS => 30,
	SECONDS_BETWEEN_TRIES => 10
};

=head1 NAME

Amazon::SQS::Consumer - Receive messages from an Amazon Simple Queue Service (SQS) queue

=cut

sub say (@) { warn join ' ', (split ' ', scalar localtime)[2,1,4,3], "[$$]", (split '/', $0)[-1], @_, "\n"; return @_; }
$SIG{INT} = sub { say 'caught signal INT'; exit 0; };
$SIG{CHLD} = 'IGNORE';

=head1 SYNOPSIS

  use Amazon::SQS::Consumer;

  my $in_queue = new Amazon::SQS::Consumer
    AWSAccessKeyId => 'PUBLIC_KEY_HERE',
    SecretAccessKey => 'SECRET_KEY_HERE',
    queue => 'YourInputQueue';

lib/Amazon/SQS/Consumer.pm  view on Meta::CPAN

		# If there's a message in the cache, return it
		if ( my $message = shift @{$me->{messages}} ) {
			$me->{DeleteMessageHandle} = $message->{ReceiptHandle};
			my $object;
			eval {
				my $body = $message->{Body};
				$body = encode_utf8( $body ) if is_utf8( $body );
				$object = decode_json $body;
			};
			if ( $@ ) {
				say "left bad message in queue; could not decode JSON from $message->{Body}: $@";
			} else {
				return $object;
			}
		} elsif ( $me->{no_loop} ) {
			$seconds_to_wait = 0;
		} else {
			# Otherwise, wait a few seconds and try again
			say "waiting $seconds_to_wait seconds for new messages"
				if $seconds_to_wait == $me->{wait_seconds};
			sleep SECONDS_BETWEEN_TRIES;
			$seconds_to_wait -= SECONDS_BETWEEN_TRIES;
		}

	} while ( $me->{forever} or $seconds_to_wait > 0 );

	# If we've retried for a while and gotten no messages, give up
	return undef;

}

sub delete_previous {
	my $me = shift;

	if ( $me->{DeleteMessageHandle} ) {
		say "deleting message $me->{DeleteMessageHandle}" if $me->{debug};
		$me->delete_message( Queue => $me->{queue}, ReceiptHandle => $me->{DeleteMessageHandle} );
	}
}

sub defer { delete $_[0]->{DeleteMessageHandle} }


=head1 AUTHOR

Nic Wolff, <nic@angel.net>

lib/Amazon/SQS/Producer.pm  view on Meta::CPAN

use JSON::XS;

use constant MAX_RETRIES => 3;

=head1 NAME

Amazon::SQS::Producer - Publish messages to an Amazon Simple Queue Service (SQS) queue

=cut

sub say (@) { warn join ' ', (split ' ', scalar localtime)[2,1,4,3], "[$$]", (split '/', $0)[-1], @_, "\n"; return @_; }
$SIG{INT} = sub { say 'caught signal INT'; exit 0; };
$SIG{CHLD} = 'IGNORE';

=head1 SYNOPSIS

  use Amazon::SQS::Producer;

  my $out_queue = new Amazon::SQS::Producer
    AWSAccessKeyId => 'PUBLIC_KEY_HERE',
    SecretAccessKey => 'SECRET_KEY_HERE',
    queue => 'YourOutputQueue',

lib/Amazon/SQS/Producer.pm  view on Meta::CPAN


sub publish {
	if ( ref $_[0] and ! $_[0]->{queue} ) { goto &fork_consumer }
	if ( ref $_[1] and $_[1]->{_chain_consumers} ) { goto &fork_consumer }

	my $me = shift;
	my $old_data = shift if ref $_[0];
	my $data = { %$old_data, @_ };
	my $encoded_data = encode_json $data;

	say "Queueing message: $encoded_data" if $data->{_debug};
	return if $data->{_test};

	my $retries;
	my $message_id;
	until (
		$message_id = $me->send_message(
			Queue => $me->{queue},
			MessageBody => $encoded_data,
		)
	) {
		say "couldn't queue message: ", $me->error;
		if ( $retries++ < MAX_RETRIES ) {
			say "trying again in $retries seconds";
			sleep $retries;
		} else {
			say "giving up trying to publish to queue $me->{queue} with message body: $encoded_data",
			return;
		}
	}

	if ( $me->{consumer} and $me->{started_consumers}++ < $me->{start_consumers} ) {
		my $pid = fork;
		if ( not defined $pid ) {
			say "couldn't fork";
		} elsif ( not $pid ) {
			close STDIN; open STDIN, '/dev/null';
			close STDOUT; open STDOUT, '/dev/null';
			close STDERR; open STDERR, '>>/tmp/getfeeds.log';
			sleep $me->{sleep_after_starting_consumer};
			exec $me->{consumer};
		} else {
			say "started consumer $me->{consumer} with PID $pid for queue $me->{queue}";
		}
	}

	return $message_id;

}

sub fork_and_publish {
	my $me = shift;

	my $pid = fork;
	if ( not defined $pid ) {
		say "couldn't fork";
	} elsif ( not $pid ) {
		$me->publish( @_ );
	} else {
		say "forked to publish to queue $me->{queue} with PID $pid";
	}
}

sub fork_consumer {
	my $me = shift;
	my $old_data = shift if ref $_[0];
	my %data = @_;

	if ( $me->{consumer} ) {
		my $pid = fork;
		if ( not defined $pid ) {
			say "couldn't fork";
		} elsif ( not $pid ) {
			close STDIN; open STDIN, '/dev/null';
			close STDOUT; open STDOUT, '/dev/null';
			close STDERR; open STDERR, '>>/tmp/getfeeds.log';
			$ENV{PATH} .= ':.';
			sleep $me->{sleep_after_starting_consumer};
			exec $me->{consumer}, encode_json { %$old_data, %data };
		} else {
			say "forked consumer $me->{consumer} with PID $pid";
		}
	}
}

=head1 AUTHOR

Nic Wolff, <nic@angel.net>

=head1 BUGS



( run in 0.921 second using v1.01-cache-2.11-cpan-71847e10f99 )