Amazon-SQS-Client
view release on metacpan or search on metacpan
lib/Amazon/SQS/QueueHandler.pm view on Meta::CPAN
use strict;
use warnings;
package Amazon::SQS::QueueHandler;
use Data::Dumper;
use English qw(-no_match_vars);
use Amazon::Credentials;
use Amazon::SQS::Model::DeleteMessageRequest;
use Amazon::SQS::Model::ReceiveMessageRequest;
use Amazon::SQS::Client;
use CGI::Simple;
use JSON;
use List::Util qw(none max);
__PACKAGE__->follow_best_practice;
__PACKAGE__->mk_accessors(
qw(
config
create_queue
credentials
endpoint_url
logger
max_error_retry
message
message_id
message_type
message_body
raw_message
receipt_handle
request
region
service
signature_version
queue_list
name
max_messages
url
visibility_timeout
wait_time
)
);
use parent qw(Class::Accessor::Fast);
our @VALID_MESSAGE_TYPES = qw(
text/plain
application/json
application/x-www-form-urlencoded
);
our $DEFAULT_ENDPOINT_URL = 'https://queue.amazonaws.com';
our $MAX_MESSAGES = 1;
our $TRUE = 1;
our $FALSE = 0;
########################################################################
sub new {
########################################################################
my ( $class, @args ) = @_;
my $options = ref $args[0] ? $args[0] : {@args};
$options->{credentials} //= Amazon::Credentials->new;
my $self = $class->SUPER::new($options);
$self->init_defaults();
$self->create_service();
if ( $self->get_name && !$self->get_url ) {
my %queue_list = reverse $self->list_queues();
if ( $self->get_create_queue ) {
$self->create_queue( $self->get_name );
}
else {
my $queue_url = $queue_list{ $self->get_name };
die sprintf "no such queue [%s]\n", $self->get_name
if !$queue_url;
$self->set_url($queue_url);
}
}
die "no queue url set\n"
if !$self->get_url;
$self->create_request;
return $self;
}
########################################################################
sub init_defaults {
########################################################################
my ($self) = @_;
my $config = $self->get_config;
# init options from config...
if ($config) {
foreach (
qw(
handler_message_type
aws_endpoint_url
queue_max_error_retry
queue_max_messages
queue_url
queue_name
queue_create_queue
queue_visibility_timeout
queue_wait_time
)
) {
my $getter = "get_$_";
if ( $config->can($getter) ) {
my @local_name = split /_/xsm, $_;
shift @local_name;
my $var = join q{_}, @local_name;
lib/Amazon/SQS/QueueHandler.pm view on Meta::CPAN
########################################################################
my ( $self, $queue_name ) = @_;
my $queue_list = $self->get_queue_list // {};
my $queue_url = eval {
return $queue_list->{$queue_name}
if $queue_list->{$queue_name};
my $service = $self->get_service;
my $rsp = $service->createQueue( { QueueName => $queue_name } );
my $result = $rsp->getCreateQueueResult();
return $result->getQueueUrl;
};
die "could not create queue $queue_name\n$EVAL_ERROR"
if !$queue_url || $EVAL_ERROR;
$self->set_url($queue_url);
return $queue_url;
}
########################################################################
sub list_queues {
########################################################################
my ($self) = @_;
my $service = $self->get_service();
my $rsp = $service->listQueues();
my $result = $rsp->getListQueuesResult();
my $queueUrls = $result->getQueueUrl();
my %queue_list;
foreach ( @{$queueUrls} ) {
if (/\/([^\/]+)$/xsm) {
$queue_list{$_} = $1;
}
}
$self->set_queue_list( \%queue_list );
return %queue_list;
}
########################################################################
sub create_service {
########################################################################
my ($self) = @_;
my %options = (
ServiceURL => $self->get_endpoint_url,
MaxErrorRetry => $self->get_max_error_retry,
credentials => $self->get_credentials,
);
my $service = eval { return Amazon::SQS::Client->new( undef, undef, \%options ); };
die "could not create service\n$EVAL_ERROR"
if !$service || $EVAL_ERROR;
$self->set_service($service);
return $service;
}
########################################################################
sub decode_message {
########################################################################
my ( $self, $message_type, $message_body ) = @_;
$message_type //= $self->get_message_type;
$message_body //= $self->get_message_body;
$self->get_logger->trace(
Dumper(
[ type => $message_type,
body => $message_body,
]
)
);
my $decoded_message = eval {
return $message_body
if $message_type eq 'text/plain';
return JSON->new->decode($message_body)
if $message_type eq 'application/json';
if ( $message_type eq 'application/x-www-form-encoded' ) {
my %vars = CGI::Simple->new($message_body)->Vars();
# create array refs from multi-value params
foreach ( keys %vars ) {
next if $vars{$_} !~ /\0/;
$vars{$_} = [ split /\0/xsm, $vars{$_} ];
}
return \%vars;
}
};
die "unable to decode message\n$EVAL_ERROR"
if !defined $decoded_message || $EVAL_ERROR;
$self->set_message($decoded_message);
return $decoded_message;
}
########################################################################
sub handler {
########################################################################
my ( $self, $message ) = @_;
( run in 0.759 second using v1.01-cache-2.11-cpan-39bf76dae61 )