Amazon-SQS-Client
view release on metacpan or search on metacpan
lib/Amazon/SQS/QueueHandler.pm view on Meta::CPAN
my ($self) = @_;
my $service = $self->get_service();
my $rsp = $service->listQueues();
my $result = $rsp->getListQueuesResult();
my $queueUrls = $result->getQueueUrl();
my %queue_list;
foreach ( @{$queueUrls} ) {
if (/\/([^\/]+)$/xsm) {
$queue_list{$_} = $1;
}
}
$self->set_queue_list( \%queue_list );
return %queue_list;
}
########################################################################
sub create_service {
########################################################################
my ($self) = @_;
my %options = (
ServiceURL => $self->get_endpoint_url,
MaxErrorRetry => $self->get_max_error_retry,
credentials => $self->get_credentials,
);
my $service = eval { return Amazon::SQS::Client->new( undef, undef, \%options ); };
die "could not create service\n$EVAL_ERROR"
if !$service || $EVAL_ERROR;
$self->set_service($service);
return $service;
}
########################################################################
sub decode_message {
########################################################################
my ( $self, $message_type, $message_body ) = @_;
$message_type //= $self->get_message_type;
$message_body //= $self->get_message_body;
$self->get_logger->trace(
Dumper(
[ type => $message_type,
body => $message_body,
]
)
);
my $decoded_message = eval {
return $message_body
if $message_type eq 'text/plain';
return JSON->new->decode($message_body)
if $message_type eq 'application/json';
if ( $message_type eq 'application/x-www-form-encoded' ) {
my %vars = CGI::Simple->new($message_body)->Vars();
# create array refs from multi-value params
foreach ( keys %vars ) {
next if $vars{$_} !~ /\0/;
$vars{$_} = [ split /\0/xsm, $vars{$_} ];
}
return \%vars;
}
};
die "unable to decode message\n$EVAL_ERROR"
if !defined $decoded_message || $EVAL_ERROR;
$self->set_message($decoded_message);
return $decoded_message;
}
########################################################################
sub handler {
########################################################################
my ( $self, $message ) = @_;
$self->get_logger->info( Dumper( [ message => $message ] ) );
return $TRUE;
}
########################################################################
sub get_next_message {
########################################################################
my ($self) = @_;
$self->set_message(undef);
my $service = $self->get_service;
my $request = $self->get_request;
my $response = $service->receiveMessage($request);
return
if !$response || !$response->isSetReceiveMessageResult();
my $receiveMessageResult = $response->getReceiveMessageResult();
my $messageList = $receiveMessageResult->getMessage();
my ($message) = @{ $messageList // [] };
return
if !ref $message || !$message->isSetMessageId();
$self->set_raw_message($message);
$self->set_receipt_handle( $message->getReceiptHandle );
$self->set_message_body( $message->getBody() );
$self->set_message_id( $message->getMessageId() );
my $decoded_message = $self->decode_message();
$self->set_message($decoded_message);
return $decoded_message;
}
########################################################################
sub create_request {
########################################################################
my ($self) = @_;
return $self->get_request
if $self->get_request;
my $max_messages = max( 1, $self->get_max_messages ); # max of 1 currently
my $wait_time = $self->get_wait_time // 0;
my $visibility_timeout = $self->get_visibility_timeout;
my $request = Amazon::SQS::Model::ReceiveMessageRequest->new(
{ QueueUrl => $self->get_url,
MaxNumberOfMessages => $max_messages,
VisibilityTimeout => $visibility_timeout,
WaitTimeSeconds => $wait_time,
}
);
$self->set_request($request);
return;
}
########################################################################
sub change_message_visibility {
########################################################################
my ( $self, $timeout ) = @_;
my $service = $self->get_service;
$service->changeMessageVisibility(
QueueUrl => $self->get_url,
ReceiptHandle => $self->get_receipt_handle,
VisibilityTimeout => $timeout,
);
return;
}
########################################################################
sub delete_message {
########################################################################
my ( $self, $handle ) = @_;
$handle //= $self->get_receipt_handle;
my $logger = $self->get_logger;
my $rsp = eval {
$self->get_service->deleteMessage(
Amazon::SQS::Model::DeleteMessageRequest->new(
{ QueueUrl => $self->get_url,
ReceiptHandle => $handle
}
)
);
lib/Amazon/SQS/QueueHandler.pm view on Meta::CPAN
if $rsp && !$EVAL_ERROR;
die $err
if !ref $err || ref $err ne 'Amazon::SQS::Exception';
my $err_message = <<'END_OF_ERROR';
Exception: %s
Response Status Code: %s
Error Code: %s
Error Type: %s
Request ID: %s
END_OF_ERROR
die sprintf $err_message,
$err->getMessage,
$err->getStatusCode,
$err->getErrorCode,
$err->getErrorType,
$err->getRequestId;
return;
}
1;
__END__
=pod
=head1 NAME
Amazon::SQS::QueueHandler - base class for creating SQS message queue handlers
=head1 SYNOPSIS
package MyHandler;
use parent qw(Amazon::SQS::QueueHandler);
sub handler {
my ($self, $message) = @_;
return 1; # delete the message
}
1;
=head1 DESCRIPTION
Base class for creating queue handlers that work with the
F<QueueDaemon.pl> script. You provide a handler class that processes
SQS messages. The F<QueueDaemon.pl> script handles the plumbing.
=head1 METHODS AND SUBROUTINES
=head2 handler
handler(message)
You provide your own handler message that receives a message to
process. The message is the decoded body of the message placed on the
SQS queue by some other process. Messages can be sent as plain text,
JSON strings or x-www-form-encoded strings.
Generally speaking, by default your handler should return a true value
if you want the message deleted and a non-zero value if you want the
message to be returned to the queue. There are various options
available with the F<QueueDaemon.pl> script that control this behavior
however.
=head2 change_message_visibility
change_message_visibility(timeout)
Changes the message visibility timeout. You may find that in some
circumstances you would like to either extend the time the message
remains invisible or you want to shorten the time it becomes
available. Use this method when your handler receives the message to
alter the visibility of the message to other workers.
=head1 NOTES
As a subclass of L<Amazon::SQS::QueueHandler>, your class has access to
the methods of its parent. Most notably you might want to use the
logger which is an instance of a L<Log::Log4perl> logger.
sub handler {
my ($self, $message) = @_;
$self->get_logger->info('...got a message!');
...
}
The logging level was set either in your configuration file or on the
command line when you invoked the F<QueueDaemon.pl> script.
=head1 SEE ALSO
L<Amazon::SQS::Config>
=head1 AUTHOR
Rob Lauer - <bigfoot@cpan.org>
=cut
( run in 0.874 second using v1.01-cache-2.11-cpan-39bf76dae61 )