Coro-Amazon-SimpleDB

 view release on metacpan or  search on metacpan

lib/Coro/Amazon/SimpleDB.pm  view on Meta::CPAN

            )
            and not $request->isSetDomainName
            ;
        return $request;
    }

    croak "can't normalize '".(ref $request)."' request to an Amazon::SimpleDB::Model";
}


sub _process_request {
    my $self = shift;
    my $request = $self->_normalize_sdb_request(shift);
    my $method = $request->client_request_method
        or croak "no processing for request of type '".(ref $request)."'";
    return $self->sdb->$method($request);
}


sub add_pending {
    my $self = shift;
    $self->pending->{$_} = $_ for @_;
    return $self;
}

sub remove_pending {
    my $self = shift;
    delete $self->pending->{$_} for @_;
    return $self;
}

sub has_pending { !!%{ shift->pending } }


sub poll {
    my $self = shift;
    async {
        CHECK_LOOP: {
            # Keep polling as long as there are pending requests.
            if ($self->has_pending) {
                Coro::AnyEvent::sleep 0.1;
                redo CHECK_LOOP;
            }
            EV::unloop;
        }
    };
    EV::loop;
    return $self;
}


sub async_requests {
    my ($self, @requests) = @_;

    my $debug = $self->DEBUG;
    require Time::HiRes and Time::HiRes->import('time') if $debug;
    my ($start, $duration) = (0, 0);
    my @responses = ();
    $self->bug("starting async enqueues");
    $start = time() if $debug;
    for ($[ .. $#requests) {
        my $idx = $_;
        my $request = $requests[$idx];
        $self->bug("adding request $request");
        my $coro = async {
            my ($start, $duration) = (0, 0);
            $self->bug("starting request for $request");
            $start = time() if $debug;
            $responses[$idx] = eval { $self->_process_request($request) };
            # Store the exception instead of the response (which
            # should be undef) if there was a problem.
            $responses[$idx] = $@ if $@;
            $duration = time() - $start if $debug;
            $self->bug("completed request for $request in $duration secs");
        };
        $self->add_pending($coro);
        $coro->on_destroy(sub { $self->remove_pending($coro) });
    }
    $duration = time() - $start if $debug;
    $self->bug("completed async enqueues in $duration secs, starting coro polling");
    $self->poll;

    return \@responses;
}


sub async_get_items {
    my ($self, @items) = @_;
    my $responses = $self->async_requests(@items);
    my %items = map {
        my $item_name = $items[$_];
        my $response = $responses->[$_];
        my $attributes
            = (ref $response eq 'Amazon::SimpleDB::Model::GetAttributesResponse') ?
                  {
                      map {
                          defined $_->getName ? ($_->getName, $_->getValue) : ()
                      } @{ $response->getGetAttributesResult->getAttribute }
                  }
            :     $response
            ;
        ($item_name => $attributes);
    } $[ .. $#items;
    return \%items;
}



1;

__END__

=head1 NAME

Coro::Amazon::SimpleDB - Use C<Amazon::SimpleDB::Client> to do asynchronous requests


=head1 VERSION

Version 0.01


=head1 SYNOPSIS

An asynchronous layer on top of Amazon's SimpleDB library.

  use Coro::Amazon::SimpleDB;

  my $sdb = Coro::Amazon::SimpleDB->new;
  $sdb->aws_access_key($aws_access_key_id);
  $sdb->aws_secret_access_key($aws_secret_access_key);
  $sdb->domain_name($aws_simpledb_domain);

  my $attributes = $sdb->async_get_items('name', 'rank', 'serial-number');
  my $full_name = join(' ', @{ $attributes->{name} }{'first', 'last'};


=head1 METHODS

=head2 new

Create and return a new instance.  The usual idiom is:

  my $sdb = Coro::Amazon::SimpleDB->new(
    aws_access_key        => $key,
    aws_secret_access_key => $secret_key,
    domain_name           => $domain,
  );

  # ... do stuff with the instance


=head2 async_requests

The main method of the asynchronous interface.  This method takes a
list of item names, hash refs representing requests, or request
objects and asynchronously requests them then polls and gathers the
results, returning the response objects (or exception objects if the
call failed) in an array ref ordered identically to the corresponding
requests.  The call will succeed even if some or all of the requests
failed.  It is up to the caller to check each entry in the response
array to see if the call succeeded.



( run in 0.825 second using v1.01-cache-2.11-cpan-5735350b133 )