Amazon-SQS-Client

 view release on metacpan or  search on metacpan

lib/Amazon/SQS/QueueHandler.pm  view on Meta::CPAN

use strict;
use warnings;

package Amazon::SQS::QueueHandler;

use Data::Dumper;
use English qw(-no_match_vars);

use Amazon::Credentials;
use Amazon::SQS::Model::DeleteMessageRequest;
use Amazon::SQS::Model::ReceiveMessageRequest;
use Amazon::SQS::Client;
use CGI::Simple;
use JSON;
use List::Util qw(none max);

__PACKAGE__->follow_best_practice;
__PACKAGE__->mk_accessors(
  qw(
    config
    create_queue
    credentials
    endpoint_url
    logger
    max_error_retry
    message
    message_id
    message_type
    message_body
    raw_message
    receipt_handle
    request
    region
    service
    signature_version
    queue_list
    name
    max_messages
    url
    visibility_timeout
    wait_time
  )
);

use parent qw(Class::Accessor::Fast);

our @VALID_MESSAGE_TYPES = qw(
  text/plain
  application/json
  application/x-www-form-urlencoded
);

our $DEFAULT_ENDPOINT_URL = 'https://queue.amazonaws.com';
our $MAX_MESSAGES         = 1;

our $TRUE  = 1;
our $FALSE = 0;

########################################################################
sub new {
########################################################################
  my ( $class, @args ) = @_;

  my $options = ref $args[0] ? $args[0] : {@args};

  $options->{credentials} //= Amazon::Credentials->new;
  my $self = $class->SUPER::new($options);

  $self->init_defaults();

  $self->create_service();

  if ( $self->get_name && !$self->get_url ) {
    my %queue_list = reverse $self->list_queues();

    if ( $self->get_create_queue ) {
      $self->create_queue( $self->get_name );
    }
    else {
      my $queue_url = $queue_list{ $self->get_name };

      die sprintf "no such queue [%s]\n", $self->get_name
        if !$queue_url;

      $self->set_url($queue_url);
    }
  }

  die "no queue url set\n"
    if !$self->get_url;

  $self->create_request;

  return $self;
}

########################################################################
sub init_defaults {
########################################################################
  my ($self) = @_;

  my $config = $self->get_config;

  # init options from config...
  if ($config) {
    foreach (
      qw(
      handler_message_type
      aws_endpoint_url
      queue_max_error_retry
      queue_max_messages
      queue_url
      queue_name
      queue_create_queue
      queue_visibility_timeout
      queue_wait_time
      )
    ) {
      my $getter = "get_$_";

      if ( $config->can($getter) ) {

        my @local_name = split /_/xsm, $_;
        shift @local_name;

        my $var = join q{_}, @local_name;

lib/Amazon/SQS/QueueHandler.pm  view on Meta::CPAN

########################################################################
  my ( $self, $queue_name ) = @_;

  my $queue_list = $self->get_queue_list // {};

  my $queue_url = eval {
    return $queue_list->{$queue_name}
      if $queue_list->{$queue_name};

    my $service = $self->get_service;

    my $rsp = $service->createQueue( { QueueName => $queue_name } );

    my $result = $rsp->getCreateQueueResult();

    return $result->getQueueUrl;
  };

  die "could not create queue $queue_name\n$EVAL_ERROR"
    if !$queue_url || $EVAL_ERROR;

  $self->set_url($queue_url);

  return $queue_url;
}

########################################################################
sub list_queues {
########################################################################
  my ($self) = @_;

  my $service = $self->get_service();

  my $rsp = $service->listQueues();

  my $result = $rsp->getListQueuesResult();

  my $queueUrls = $result->getQueueUrl();

  my %queue_list;

  foreach ( @{$queueUrls} ) {
    if (/\/([^\/]+)$/xsm) {
      $queue_list{$_} = $1;
    }
  }

  $self->set_queue_list( \%queue_list );

  return %queue_list;
}

########################################################################
sub create_service {
########################################################################
  my ($self) = @_;

  my %options = (
    ServiceURL    => $self->get_endpoint_url,
    MaxErrorRetry => $self->get_max_error_retry,
    credentials   => $self->get_credentials,
  );

  my $service = eval { return Amazon::SQS::Client->new( undef, undef, \%options ); };

  die "could not create service\n$EVAL_ERROR"
    if !$service || $EVAL_ERROR;

  $self->set_service($service);

  return $service;
}

########################################################################
sub decode_message {
########################################################################
  my ( $self, $message_type, $message_body ) = @_;

  $message_type //= $self->get_message_type;
  $message_body //= $self->get_message_body;

  $self->get_logger->trace(
    Dumper(
      [ type => $message_type,
        body => $message_body,
      ]
    )
  );

  my $decoded_message = eval {
    return $message_body
      if $message_type eq 'text/plain';

    return JSON->new->decode($message_body)
      if $message_type eq 'application/json';

    if ( $message_type eq 'application/x-www-form-encoded' ) {
      my %vars = CGI::Simple->new($message_body)->Vars();

      # create array refs from multi-value params
      foreach ( keys %vars ) {
        next if $vars{$_} !~ /\0/;
        $vars{$_} = [ split /\0/xsm, $vars{$_} ];
      }

      return \%vars;
    }
  };

  die "unable to decode message\n$EVAL_ERROR"
    if !defined $decoded_message || $EVAL_ERROR;

  $self->set_message($decoded_message);

  return $decoded_message;
}

########################################################################
sub handler {
########################################################################
  my ( $self, $message ) = @_;



( run in 0.759 second using v1.01-cache-2.11-cpan-39bf76dae61 )