Amazon-DynamoDB
view release on metacpan or search on metacpan
lib/Amazon/DynamoDB/20120810.pm view on Meta::CPAN
package Amazon::DynamoDB::20120810;
$Amazon::DynamoDB::20120810::VERSION = '0.35';
use strict;
use warnings;
use Future;
use Future::Utils qw(repeat try_repeat);
use POSIX qw(strftime);
use JSON::MaybeXS qw(decode_json encode_json);
use MIME::Base64;
use List::Util;
use List::MoreUtils;
use B qw(svref_2object);
use HTTP::Request;
use Kavorka;
use Amazon::DynamoDB::Types;
use Type::Registry;
use VM::EC2::Security::CredentialCache;
use AWS::Signature4;
BEGIN {
my $reg = "Type::Registry"->for_me;
$reg->add_types(-Standard);
$reg->add_types("Amazon::DynamoDB::Types");
};
sub new {
my $class = shift;
bless { @_ }, $class
}
sub implementation { shift->{implementation} }
sub host { shift->{host} }
sub port { shift->{port} }
sub ssl { shift->{ssl} }
sub algorithm { 'AWS4-HMAC-SHA256' }
sub scope { shift->{scope} }
sub access_key { shift->{access_key} }
sub secret_key { shift->{secret_key} }
sub debug_failures { shift->{debug} }
sub max_retries { shift->{max_retries} }
method create_table(TableNameType :$TableName!,
Int :$ReadCapacityUnits = 2,
Int :$WriteCapacityUnits = 2,
AttributeDefinitionsType :$AttributeDefinitions,
KeySchemaType :$KeySchema!,
ArrayRef[GlobalSecondaryIndexType] :$GlobalSecondaryIndexes where { scalar(@$_) <= 5 },
ArrayRef[LocalSecondaryIndexType] :$LocalSecondaryIndexes
) {
my %payload = (
TableName => $TableName,
ProvisionedThroughput => {
ReadCapacityUnits => int($ReadCapacityUnits),
WriteCapacityUnits => int($WriteCapacityUnits),
}
);
if (defined($AttributeDefinitions)) {
foreach my $field_name (keys %$AttributeDefinitions) {
my $type = $AttributeDefinitions->{$field_name};
push @{$payload{AttributeDefinitions}}, {
AttributeName => $field_name,
AttributeType => $type // 'S',
}
}
lib/Amazon/DynamoDB/20120810.pm view on Meta::CPAN
'QueryFilter' => $QueryFilter,
'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
'ScanIndexForward' => $ScanIndexForward,
'Select' => $Select,
'TableName' => $TableName
});
foreach my $key_name (keys %$KeyConditions) {
my $key_details = $KeyConditions->{$key_name};
$payload->{KeyConditions}->{$key_name} = {
AttributeValueList => _encode_attribute_value_list($key_details->{AttributeValueList}, $key_details->{ComparisonOperator}),
ComparisonOperator => $key_details->{ComparisonOperator}
};
}
$self->_scan_or_query_process('Query', $payload, $code, { ResultLimit => $Limit});
}
method scan (CodeRef $code,
AttributesToGetType :$AttributesToGet,
KeyType :$ExclusiveStartKey,
Int :$Limit where { $_ >= 0},
ReturnConsumedCapacityType :$ReturnConsumedCapacity,
ScanFilterType :$ScanFilter,
Int :$Segment where { $_ >= 0 },
SelectType :$Select,
TableNameType :$TableName!,
Int :$TotalSegments where { $_ >= 1 && $_ <= 1000000 },
Str :$FilterExpression,
ExpressionAttributeValuesType :$ExpressionAttributeValues,
ExpressionAttributeNamesType :$ExpressionAttributeNames,
) {
my $payload = _make_payload({
'AttributesToGet' => $AttributesToGet,
'ExclusiveStartKey' => $ExclusiveStartKey,
'ExpressionAttributeValues' => $ExpressionAttributeValues,
'ExpressionAttributeNames' => $ExpressionAttributeNames,
'FilterExpression' => $FilterExpression,
'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
'ScanFilter' => $ScanFilter,
'Segment' => $Segment,
'Select' => $Select,
'TableName' => $TableName,
'TotalSegments' => $TotalSegments
});
$self->_scan_or_query_process('Scan', $payload, $code, { ResultLimit => $Limit});
}
method make_request(Str :$target,
HashRef :$payload,
) {
my $api_version = '20120810';
my $host = $self->host;
my $req = HTTP::Request->new(
POST => (($self->ssl) ? 'https' : 'http') . '://' . $self->host . ($self->port ? (':' . $self->port) : '') . '/'
);
$req->header( host => $host );
# Amazon requires ISO-8601 basic format
my $now = time;
my $http_date = strftime('%Y%m%dT%H%M%SZ', gmtime($now));
my $date = strftime('%Y%m%d', gmtime($now));
$req->protocol('HTTP/1.1');
$req->header( 'Date' => $http_date );
$req->header( 'x-amz-target', 'DynamoDB_'. $api_version. '.'. $target );
$req->header( 'content-type' => 'application/x-amz-json-1.0' );
$payload = encode_json($payload);
$req->content($payload);
$req->header( 'Content-Length' => length($payload));
if ($self->{use_iam_role}) {
my $creds = VM::EC2::Security::CredentialCache->get();
defined($creds) || die("Unable to retrieve IAM role credentials");
$self->{access_key} = $creds->accessKeyId;
$self->{secret_key} = $creds->secretAccessKey;
$req->header('x-amz-security-token' => $creds->sessionToken);
}
my $signer = AWS::Signature4->new(-access_key => $self->access_key,
-secret_key => $self->secret_key);
$signer->sign($req);
return $req;
}
method _request(HTTP::Request $req) {
$self->implementation->request($req);
}
# Since scan and query have the same type of responses share the processing.
method _scan_or_query_process (Str $target,
HashRef $payload,
CodeRef $code,
HashRef $args) {
my $finished = 0;
my $records_seen = 0;
my $repeat = try_repeat {
# Since we're may be making more than one request in this repeat loop
# decrease our limit of results to scan in each call by the number
# of records remaining that the overall request wanted ot pull.
if (defined($args->{ResultLimit})) {
$payload->{Limit} = $args->{ResultLimit} - $records_seen;
}
my $req = $self->make_request(
target => $target,
payload => $payload,
);
$self->_process_request(
$req,
sub {
my $result = shift;
my $data = decode_json($result);
for my $entry (@{$data->{Items}}) {
$code->(_decode_item_attributes($entry));
}
$records_seen += scalar(@{$data->{Items}});
if ((defined($args->{ResultLimit}) && $records_seen >= $args->{ResultLimit})) {
$finished = 1;
}
if (!defined($data->{LastEvaluatedKey})) {
$finished = 1;
} else {
if (!$finished) {
$payload->{ExclusiveStartKey} = $data->{LastEvaluatedKey};
}
}
if (defined($data->{LastEvaluatedKey}) && $finished) {
$data->{LastEvaluatedKey} = _decode_item_attributes($data->{LastEvaluatedKey});
}
return $data;
})
->on_fail(sub {
$finished = 1;
});
} until => sub { $finished };
}
lib/Amazon/DynamoDB/20120810.pm view on Meta::CPAN
}
$type = 'SS';
last;
}
} else {
ref($v) eq 'SCALAR' || Carp::confess("Reference found but not a scalar");
$type = 'B';
}
} else {
my $flags = B::svref_2object(\$v)->FLAGS;
if ($flags & B::SVp_POK) {
$type = 'S';
} elsif ($flags & (B::SVp_IOK | B::SVp_NOK)) {
$type = 'N';
} else {
$type = 'S';
}
}
if ($type eq 'N' || $type eq 'S') {
defined($v) || Carp::confess("Attempt to encode undefined value");
return ($type, "$v");
} elsif ($type eq 'B') {
return ($type, MIME::Base64::encode_base64(${$v}, ''));
} elsif ($type eq 'NS' || $type eq 'SS') {
return ($type, [map { "$_" } @$v]);
} elsif ($type eq 'BS') {
return ($type, [map { MIME::Base64::encode_base64(${$_}, '') } @$v]);
} else {
die("Unknown type for quoting and escaping: $type");
}
}
fun _decode_type_and_value(Str $type, Any $value) {
if ($type eq 'S' || $type eq 'SS') {
return $value;
} elsif ($type eq 'N') {
return 0+$value;
} elsif ($type eq 'B') {
return MIME::Base64::decode_base64($value);
} elsif ($type eq 'BS') {
return [map { MIME::Base64::decode_base64($_) } @$value];
} elsif ($type eq 'NS') {
return [map { 0+$_} @$value];
} else {
die("Don't know how to decode type: $type");
}
}
fun _decode_item_attributes(Maybe[HashRef] $item) {
my $r;
foreach my $key (keys %$item) {
my $type = (keys %{$item->{$key}})[0];
my $value = $item->{$key}->{$type};
$r->{$key} = _decode_type_and_value($type, $item->{$key}->{$type});
}
return $r;
}
method _process_request(HTTP::Request $req, CodeRef $done?) {
my $current_retry = 0;
my $do_retry = 1;
try_repeat {
$do_retry = 0;
my $sleep_amount = 0;
if ($current_retry > 0) {
$sleep_amount = (2 ** $current_retry * 50)/1000;
}
my $complete = sub {
$self->_request($req)->transform(
fail => sub {
my ($status, $resp, $req)= @_;
my $r;
if (defined($resp) && defined($resp->code)) {
if ($resp->code == 500) {
$do_retry = 1;
$current_retry++;
} elsif ($resp->code == 400) {
my $json = $resp->can('decoded_content')
? $resp->decoded_content
: $resp->body; # Mojo
$r = decode_json($json);
if ($r->{__type} =~ /ProvisionedThroughputExceededException$/) {
# Need to sleep
$do_retry = 1;
$current_retry++;
} else {
# extract the type into a better prettyier name.
if ($r->{__type} =~ /^com\.amazonaws\.dynamodb\.v20120810#(.+)$/) {
$r->{type} = $1;
}
}
}
}
if (defined($self->max_retries()) && $current_retry > $self->max_retries()) {
$do_retry = 0;
}
if (!$do_retry) {
if ($self->debug_failures()) {
print "DynamoDB Failure: $status\n";
if (defined($resp)) {
print "response:\n";
print $resp->as_string() . "\n";
}
if (defined($req)) {
print "Request:\n";
print $req->as_string() . "\n";
}
}
return $r || $status;
}
},
done => $done);
};
lib/Amazon/DynamoDB/20120810.pm view on Meta::CPAN
Additional Parameters:
=over
=item * ResultLimit - limit on the total number of results to return.
=back
$ddb->batch_get_item(
sub {
my ($table, $item) = @_;
},
RequestItems => {
$table_name => {
ConsistentRead => 'true',
AttributesToGet => ['user_id', 'name'],
Keys => [
{
user_id => 1,
},
],
}
})
=head2 scan
Scan a table for values with an optional filter expression.
Amazon Documentation:
L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html>
Additional parameters:
=back
$ddb->scan(
sub {
my $item = shift;
push @found_items, $item;
},
TableName => $table_name,
ScanFilter => {
user_id => {
ComparisonOperator => 'NOT_NULL',
}
});
=head1 NAME
Amazon::DynamoDB::20120810 - interact with DynamoDB using API version 20120810
=head1 METHODS - Internal
The following methods are intended for internal use and are documented
purely for completeness - for normal operations see L</METHODS> instead.
=head2 make_request
Generates an L<HTTP::Request>.
=head1 FUNCTIONS - Internal
=head2 _encode_type_and_value
Returns an appropriate type (N, S, SS etc.) and stringified/encoded value for the given
value.
DynamoDB only uses strings even if there is a Numeric value specified,
so while the type will be expressed as a Number the value will be
stringified.
C<http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataFormat.html>
=head1 AUTHORS
=over 4
=item *
Rusty Conover <rusty@luckydinosaur.com>
=item *
Tom Molesworth <cpan@entitymodel.com>
=back
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2013 by Tom Molesworth, copyright (c) 2014 Lucky Dinosaur LLC. L<http://www.luckydinosaur.com>.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut
( run in 0.657 second using v1.01-cache-2.11-cpan-437f7b0c052 )