Amazon-SQS-Simple-AnyEvent

 view release on metacpan or  search on metacpan

lib/Amazon/SQS/Simple/AnyEvent.pm  view on Meta::CPAN

        $cb->(@messages);
    };
    
    return $self->_dispatch(\%params, [qw(Message)], $on_response);

}

#------------------------------------------------------------------------------

sub DeleteMessageBatch {
    my $self     = shift;
    my $messages = shift;
    my $cb       = pop;
    my %params   = @_;

    return unless @$messages;
    $params{Action} = 'DeleteMessageBatch';

    my $i=0;
    foreach my $msg (@$messages){
        if (++$i > 10){
            warn "Batch deletion limited to 10 messages";
            last;
        }

        $params{"DeleteMessageBatchRequestEntry.$i.Id"} = $msg->MessageId;
        $params{"DeleteMessageBatchRequestEntry.$i.ReceiptHandle"} = $msg->ReceiptHandle;
    }

    return $self->_dispatch(\%params, $cb);
}

#------------------------------------------------------------------------------

sub _dispatch {
    my $self        = shift;
    my $cb          = pop;
    my $params      = shift || {};
    my $force_array = shift || [];

    my $url = $self->{Endpoint};
    my $post_request = 0;

    $params = {
        AWSAccessKeyId      => $self->{AWSAccessKeyId},
        Version             => $self->{Version},
        %$params,
    };

    if (!$params->{Timestamp} && !$params->{Expires}) {
        $params->{Timestamp} = Amazon::SQS::Simple::Base::_timestamp();
    }
    
    if ($params->{MessageBody} && length($params->{MessageBody}) > $self->_max_get_msg_size) {
        $post_request = 1;
    }

    my ($query, @auth_headers) = $self->_get_signed_query($params, $post_request);
    my %http_opts = (tls_ctx => "low");

    $self->_debug_log($query);

    my $on_response = sub {
        my ($content, $headers) = @_;
        $self->_debug_log($content);
        if ($headers->{Status} =~ /^2/) {
            my $href =  XMLin($content, ForceArray => $force_array, KeyAttr => {});
            $cb->($href);
        }
        else {
            $ERROR = "$headers->{Status} $headers->{Reason}";
            $cb->();
        }
    };

    if ($post_request) {
        my $headers = {
            'Content-Type' => 'application/x-www-form-urlencoded;charset=utf-8',
            'Content'      => $query,
            @auth_headers,
        };

        http_post($url, headers => $headers, %http_opts, $on_response);
    }
    else {
        my $headers = {
            "Content-Type" => "text/plain;charset=utf-8",
            @auth_headers,
        };

        http_get("$url/?$query", headers => $headers, %http_opts, $on_response);
    }
}


1;

__END__

=head1 NAME

Amazon::SQS::Simple::AnyEvent - A non-blocking API to Amazon's SQS

=head1 SYNOPSIS

  use Amazon::SQS::Simple;
  use Amazon::SQS::Simple::AnyEvent;

  my $sqs = Amazon::SQS::Simple->new($access_key, $secret_key);
  my $queue = $sqs->GetQueue($endpoint);

  my $cb = sub {my $message = shift};

  my $msg   = $queue->ReceiveMessage();     # Blocking
  my $guard = $queue->ReceiveMessage($cb);  # Non-blocking

  # do something else...

=head1 DESCRIPTION

This module adds non-blocking capbilities to L<Amazon::SQS::Simple>
via L<AnyEvent>. It works by hijacking and replacing methods inside
the C<Amazon::SQS::Simple> namespace. However, this could easily break
if the internals of L<Amazon::SQS::Simple> change.  Also, this code is
alpha quality with no automated tests. You have been warned.



( run in 0.892 second using v1.01-cache-2.11-cpan-39bf76dae61 )