Amazon-SQS-Client

 view release on metacpan or  search on metacpan

lib/Amazon/SQS/QueueHandler.pm  view on Meta::CPAN

  my ($self) = @_;

  my $service = $self->get_service();

  my $rsp = $service->listQueues();

  my $result = $rsp->getListQueuesResult();

  my $queueUrls = $result->getQueueUrl();

  my %queue_list;

  foreach ( @{$queueUrls} ) {
    if (/\/([^\/]+)$/xsm) {
      $queue_list{$_} = $1;
    }
  }

  $self->set_queue_list( \%queue_list );

  return %queue_list;
}

########################################################################
sub create_service {
########################################################################
  my ($self) = @_;

  my %options = (
    ServiceURL    => $self->get_endpoint_url,
    MaxErrorRetry => $self->get_max_error_retry,
    credentials   => $self->get_credentials,
  );

  my $service = eval { return Amazon::SQS::Client->new( undef, undef, \%options ); };

  die "could not create service\n$EVAL_ERROR"
    if !$service || $EVAL_ERROR;

  $self->set_service($service);

  return $service;
}

########################################################################
sub decode_message {
########################################################################
  my ( $self, $message_type, $message_body ) = @_;

  $message_type //= $self->get_message_type;
  $message_body //= $self->get_message_body;

  $self->get_logger->trace(
    Dumper(
      [ type => $message_type,
        body => $message_body,
      ]
    )
  );

  my $decoded_message = eval {
    return $message_body
      if $message_type eq 'text/plain';

    return JSON->new->decode($message_body)
      if $message_type eq 'application/json';

    if ( $message_type eq 'application/x-www-form-encoded' ) {
      my %vars = CGI::Simple->new($message_body)->Vars();

      # create array refs from multi-value params
      foreach ( keys %vars ) {
        next if $vars{$_} !~ /\0/;
        $vars{$_} = [ split /\0/xsm, $vars{$_} ];
      }

      return \%vars;
    }
  };

  die "unable to decode message\n$EVAL_ERROR"
    if !defined $decoded_message || $EVAL_ERROR;

  $self->set_message($decoded_message);

  return $decoded_message;
}

########################################################################
sub handler {
########################################################################
  my ( $self, $message ) = @_;

  $self->get_logger->info( Dumper( [ message => $message ] ) );

  return $TRUE;
}

########################################################################
sub get_next_message {
########################################################################
  my ($self) = @_;

  $self->set_message(undef);

  my $service = $self->get_service;
  my $request = $self->get_request;

  my $response = $service->receiveMessage($request);

  return
    if !$response || !$response->isSetReceiveMessageResult();

  my $receiveMessageResult = $response->getReceiveMessageResult();

  my $messageList = $receiveMessageResult->getMessage();

  my ($message) = @{ $messageList // [] };

  return
    if !ref $message || !$message->isSetMessageId();

  $self->set_raw_message($message);

  $self->set_receipt_handle( $message->getReceiptHandle );

  $self->set_message_body( $message->getBody() );

  $self->set_message_id( $message->getMessageId() );

  my $decoded_message = $self->decode_message();

  $self->set_message($decoded_message);

  return $decoded_message;
}

########################################################################
sub create_request {
########################################################################
  my ($self) = @_;

  return $self->get_request
    if $self->get_request;

  my $max_messages       = max( 1, $self->get_max_messages );  # max of 1 currently
  my $wait_time          = $self->get_wait_time // 0;
  my $visibility_timeout = $self->get_visibility_timeout;

  my $request = Amazon::SQS::Model::ReceiveMessageRequest->new(
    { QueueUrl            => $self->get_url,
      MaxNumberOfMessages => $max_messages,
      VisibilityTimeout   => $visibility_timeout,
      WaitTimeSeconds     => $wait_time,
    }
  );

  $self->set_request($request);

  return;
}

########################################################################
sub change_message_visibility {
########################################################################
  my ( $self, $timeout ) = @_;

  my $service = $self->get_service;

  $service->changeMessageVisibility(
    QueueUrl          => $self->get_url,
    ReceiptHandle     => $self->get_receipt_handle,
    VisibilityTimeout => $timeout,
  );

  return;
}

########################################################################
sub delete_message {
########################################################################
  my ( $self, $handle ) = @_;

  $handle //= $self->get_receipt_handle;

  my $logger = $self->get_logger;

  my $rsp = eval {
    $self->get_service->deleteMessage(
      Amazon::SQS::Model::DeleteMessageRequest->new(
        { QueueUrl      => $self->get_url,
          ReceiptHandle => $handle
        }
      )
    );

lib/Amazon/SQS/QueueHandler.pm  view on Meta::CPAN

    if $rsp && !$EVAL_ERROR;

  die $err
    if !ref $err || ref $err ne 'Amazon::SQS::Exception';

  my $err_message = <<'END_OF_ERROR';
Exception: %s
Response Status Code: %s
Error Code: %s
Error Type: %s
Request ID: %s
END_OF_ERROR

  die sprintf $err_message,
    $err->getMessage,
    $err->getStatusCode,
    $err->getErrorCode,
    $err->getErrorType,
    $err->getRequestId;

  return;
}

1;

__END__

=pod

=head1 NAME

Amazon::SQS::QueueHandler - base class for creating SQS message queue handlers

=head1 SYNOPSIS

 package MyHandler;

 use parent qw(Amazon::SQS::QueueHandler);

 sub handler {
   my ($self, $message) = @_;
  
   return 1; # delete the message
 }

 1;

=head1 DESCRIPTION

Base class for creating queue handlers that work with the
F<QueueDaemon.pl> script.  You provide a handler class that processes
SQS messages. The F<QueueDaemon.pl> script handles the plumbing.

=head1 METHODS AND SUBROUTINES

=head2 handler

 handler(message)

You provide your own handler message that receives a message to
process. The message is the decoded body of the message placed on the
SQS queue by some other process. Messages can be sent as plain text,
JSON strings or x-www-form-encoded strings.

Generally speaking, by default your handler should return a true value
if you want the message deleted and a non-zero value if you want the
message to be returned to the queue. There are various options
available with the F<QueueDaemon.pl> script that control this behavior
however.

=head2 change_message_visibility

 change_message_visibility(timeout)

Changes the message visibility timeout. You may find that in some
circumstances you would like to either extend the time the message
remains invisible or you want to shorten the time it becomes
available.  Use this method when your handler receives the message to
alter the visibility of the message to other workers.

=head1 NOTES

As a subclass of L<Amazon::SQS::QueueHandler>, your class has access to
the methods of its parent. Most notably you might want to use the
logger which is an instance of a L<Log::Log4perl> logger.

 sub handler {
   my ($self, $message) = @_;

   $self->get_logger->info('...got a message!');
   ...
 }

The logging level was set either in your configuration file or on the
command line when you invoked the F<QueueDaemon.pl> script.

=head1 SEE ALSO

L<Amazon::SQS::Config>

=head1 AUTHOR

Rob Lauer - <bigfoot@cpan.org>

=cut



( run in 0.874 second using v1.01-cache-2.11-cpan-39bf76dae61 )