Amazon-DynamoDB

 view release on metacpan or  search on metacpan

lib/Amazon/DynamoDB/20120810.pm  view on Meta::CPAN

package Amazon::DynamoDB::20120810;
$Amazon::DynamoDB::20120810::VERSION = '0.35';
use strict;
use warnings;


use Future;
use Future::Utils qw(repeat try_repeat);
use POSIX qw(strftime);
use JSON::MaybeXS qw(decode_json encode_json);
use MIME::Base64;
use List::Util;
use List::MoreUtils;
use B qw(svref_2object);
use HTTP::Request;
use Kavorka;
use Amazon::DynamoDB::Types;
use Type::Registry;
use VM::EC2::Security::CredentialCache;
use AWS::Signature4;
   
BEGIN {
    my $reg = "Type::Registry"->for_me; 
    $reg->add_types(-Standard);
    $reg->add_types("Amazon::DynamoDB::Types");
};



sub new {
    my $class = shift;
    bless { @_ }, $class
}

sub implementation { shift->{implementation} }
sub host { shift->{host} }
sub port { shift->{port} }
sub ssl { shift->{ssl} }
sub algorithm { 'AWS4-HMAC-SHA256' }
sub scope { shift->{scope} }
sub access_key { shift->{access_key} }
sub secret_key { shift->{secret_key} }
sub debug_failures { shift->{debug} }

sub max_retries { shift->{max_retries} }





method create_table(TableNameType :$TableName!,
                    Int :$ReadCapacityUnits = 2, 
                    Int :$WriteCapacityUnits = 2,
                    AttributeDefinitionsType :$AttributeDefinitions,
                    KeySchemaType :$KeySchema!,
                    ArrayRef[GlobalSecondaryIndexType] :$GlobalSecondaryIndexes where { scalar(@$_) <= 5 },
                    ArrayRef[LocalSecondaryIndexType] :$LocalSecondaryIndexes
                ) {
    my %payload = (
        TableName => $TableName,
        ProvisionedThroughput => {
            ReadCapacityUnits => int($ReadCapacityUnits),
            WriteCapacityUnits => int($WriteCapacityUnits),
        }
    );

    if (defined($AttributeDefinitions)) {
        foreach my $field_name (keys %$AttributeDefinitions) {
            my $type = $AttributeDefinitions->{$field_name};

            push @{$payload{AttributeDefinitions}}, {
                AttributeName => $field_name,
                AttributeType => $type // 'S',
            }
        }

lib/Amazon/DynamoDB/20120810.pm  view on Meta::CPAN

                                'QueryFilter' => $QueryFilter,
                                'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
                                'ScanIndexForward' => $ScanIndexForward,
                                'Select' => $Select,
                                'TableName' => $TableName
                            });
    

    foreach my $key_name (keys %$KeyConditions) {
        my $key_details = $KeyConditions->{$key_name};
        $payload->{KeyConditions}->{$key_name} = {
            AttributeValueList => _encode_attribute_value_list($key_details->{AttributeValueList}, $key_details->{ComparisonOperator}),
            ComparisonOperator => $key_details->{ComparisonOperator}
        };
    }

    $self->_scan_or_query_process('Query', $payload, $code, { ResultLimit => $Limit});
}





method scan (CodeRef $code,
             AttributesToGetType :$AttributesToGet,
             KeyType :$ExclusiveStartKey,
             Int :$Limit where { $_ >= 0},
             ReturnConsumedCapacityType :$ReturnConsumedCapacity,
             ScanFilterType :$ScanFilter,
             Int :$Segment where { $_ >= 0 },
             SelectType :$Select,
             TableNameType :$TableName!,
             Int :$TotalSegments where { $_ >= 1 && $_ <= 1000000 },
             Str :$FilterExpression,
             ExpressionAttributeValuesType :$ExpressionAttributeValues,
             ExpressionAttributeNamesType :$ExpressionAttributeNames,
         ) {
    my $payload = _make_payload({
                                'AttributesToGet' => $AttributesToGet,
                                'ExclusiveStartKey' => $ExclusiveStartKey,
                                'ExpressionAttributeValues' => $ExpressionAttributeValues,
                                'ExpressionAttributeNames' => $ExpressionAttributeNames,
                                'FilterExpression' => $FilterExpression,
                                'ReturnConsumedCapacity' => $ReturnConsumedCapacity,
                                'ScanFilter' => $ScanFilter,
                                'Segment' => $Segment,
                                'Select' => $Select,
                                'TableName' => $TableName,
                                'TotalSegments' => $TotalSegments
                            });

    $self->_scan_or_query_process('Scan', $payload, $code, { ResultLimit => $Limit});
}


method make_request(Str :$target,
                    HashRef :$payload,
                ) {
    my $api_version = '20120810';
    my $host = $self->host;
    my $req = HTTP::Request->new(
        POST => (($self->ssl) ? 'https' : 'http') . '://' . $self->host . ($self->port ? (':' . $self->port) : '') . '/'
    );
    $req->header( host => $host );
    # Amazon requires ISO-8601 basic format
    my $now = time;
    my $http_date = strftime('%Y%m%dT%H%M%SZ', gmtime($now));
    my $date = strftime('%Y%m%d', gmtime($now));

    $req->protocol('HTTP/1.1');
    $req->header( 'Date' => $http_date );
    $req->header( 'x-amz-target', 'DynamoDB_'. $api_version. '.'. $target );
    $req->header( 'content-type' => 'application/x-amz-json-1.0' );
    $payload = encode_json($payload);
    $req->content($payload);
    $req->header( 'Content-Length' => length($payload));
    
    if ($self->{use_iam_role}) {
        my $creds = VM::EC2::Security::CredentialCache->get();
        defined($creds) || die("Unable to retrieve IAM role credentials");
        $self->{access_key} = $creds->accessKeyId;
        $self->{secret_key} = $creds->secretAccessKey;
        $req->header('x-amz-security-token' => $creds->sessionToken);
    }        

    my $signer = AWS::Signature4->new(-access_key => $self->access_key,
                                      -secret_key => $self->secret_key);
    
    $signer->sign($req);
    return $req;
}

method _request(HTTP::Request $req) {
    $self->implementation->request($req);
}


# Since scan and query have the same type of responses share the processing.
method _scan_or_query_process (Str $target,
                               HashRef $payload,
                               CodeRef $code,
                               HashRef $args) {
    my $finished = 0;
    my $records_seen = 0;
    my $repeat = try_repeat {
        
        # Since we're may be making more than one request in this repeat loop
        # decrease our limit of results to scan in each call by the number 
        # of records remaining that the overall request wanted ot pull.
        if (defined($args->{ResultLimit})) {
            $payload->{Limit} = $args->{ResultLimit} - $records_seen;
        }

        my $req = $self->make_request(
            target => $target,
            payload => $payload,
        );
        
        $self->_process_request(
            $req,
            sub {
                my $result = shift;
                my $data = decode_json($result);
                
                for my $entry (@{$data->{Items}}) {
                    $code->(_decode_item_attributes($entry));
                }

                $records_seen += scalar(@{$data->{Items}});
                if ((defined($args->{ResultLimit}) && $records_seen >= $args->{ResultLimit})) {
                    $finished = 1;
                } 

                if (!defined($data->{LastEvaluatedKey})) {
                    $finished = 1;
                } else {
                    if (!$finished) {
                        $payload->{ExclusiveStartKey} = $data->{LastEvaluatedKey};                    
                    }
                }
                
                if (defined($data->{LastEvaluatedKey}) && $finished) {
                    $data->{LastEvaluatedKey} = _decode_item_attributes($data->{LastEvaluatedKey});
                }


                return $data;
            })
            ->on_fail(sub {
                          $finished = 1;
                      });
    } until => sub { $finished };
}

lib/Amazon/DynamoDB/20120810.pm  view on Meta::CPAN

                }
                $type = 'SS';
                last;
            }
        } else {
            ref($v) eq 'SCALAR' || Carp::confess("Reference found but not a scalar");
            $type = 'B';
        }
    } else {
        my $flags = B::svref_2object(\$v)->FLAGS;
        if ($flags & B::SVp_POK) {
            $type = 'S';
        } elsif ($flags & (B::SVp_IOK | B::SVp_NOK)) {
            $type = 'N';
        } else {
            $type = 'S';
        }
    }
    
    if ($type eq 'N' || $type eq 'S') {
        defined($v) || Carp::confess("Attempt to encode undefined value");
        return ($type, "$v");
    } elsif ($type eq 'B') {
        return ($type, MIME::Base64::encode_base64(${$v}, ''));
    } elsif ($type eq 'NS' || $type eq 'SS') {
        return ($type, [map { "$_" } @$v]);
    } elsif ($type eq 'BS') {
        return ($type, [map { MIME::Base64::encode_base64(${$_}, '') } @$v]);
    } else {
        die("Unknown type for quoting and escaping: $type");
    }
}

fun _decode_type_and_value(Str $type, Any $value) {
    if ($type eq 'S' || $type eq 'SS') {
        return $value;
    } elsif ($type eq 'N') {
        return  0+$value;
    } elsif ($type eq 'B') {
        return MIME::Base64::decode_base64($value);
    } elsif ($type eq 'BS') {
        return [map { MIME::Base64::decode_base64($_) } @$value];
    } elsif ($type eq 'NS') {
        return [map { 0+$_} @$value];
    } else {
        die("Don't know how to decode type: $type");
    }
}


fun _decode_item_attributes(Maybe[HashRef] $item) {
    my $r;
    foreach my $key (keys %$item) {
        my $type = (keys %{$item->{$key}})[0];
        my $value = $item->{$key}->{$type};
        $r->{$key} = _decode_type_and_value($type, $item->{$key}->{$type});
    }
    return $r;
}

method _process_request(HTTP::Request $req, CodeRef $done?) {
    my $current_retry = 0;
    my $do_retry = 1;
    try_repeat {
        $do_retry = 0;
        
        my $sleep_amount = 0;
        if ($current_retry > 0) {
            $sleep_amount = (2 ** $current_retry * 50)/1000;
        }

        my $complete = sub {
            $self->_request($req)->transform(
                fail => sub {
                    my ($status, $resp, $req)= @_;
                    my $r;
                    if (defined($resp) && defined($resp->code)) {
                        if ($resp->code == 500) {
                            $do_retry = 1;
                            $current_retry++;
                        } elsif ($resp->code == 400) {
                            my $json = $resp->can('decoded_content')
                                ? $resp->decoded_content
                                : $resp->body; # Mojo
                            $r = decode_json($json);
                            if ($r->{__type} =~ /ProvisionedThroughputExceededException$/) {
                                # Need to sleep
                                $do_retry = 1;
                                $current_retry++;
                                    
                                
                            } else {
                                # extract the type into a better prettyier name.
                                if ($r->{__type} =~ /^com\.amazonaws\.dynamodb\.v20120810#(.+)$/) {
                                    $r->{type} = $1;
                                }
                            }
                        }
                    }
                    
                    if (defined($self->max_retries()) && $current_retry > $self->max_retries()) {
                        $do_retry = 0;
                    }

                    if (!$do_retry) {
                        if ($self->debug_failures()) {
                            print "DynamoDB Failure: $status\n";
                            if (defined($resp)) {
                                print "response:\n";
                                print $resp->as_string() . "\n";
                            }
                            if (defined($req)) {
                                print "Request:\n";
                                print $req->as_string() . "\n";
                            }
                        }
                        return $r || $status;
                    }
                },
                done => $done);
        };

lib/Amazon/DynamoDB/20120810.pm  view on Meta::CPAN


Additional Parameters:

=over

=item * ResultLimit - limit on the total number of results to return.

=back

  $ddb->batch_get_item(
    sub {
        my ($table, $item) = @_;
    },
    RequestItems => {
        $table_name => {
            ConsistentRead => 'true',
            AttributesToGet => ['user_id', 'name'],
            Keys => [
                {
                    user_id => 1,
                },
            ],
        }
    })

=head2 scan

Scan a table for values with an optional filter expression.

Amazon Documentation:

L<http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html>

Additional parameters:

=back

  $ddb->scan(
    sub {
      my $item = shift;
      push @found_items, $item;
    },
    TableName => $table_name,
    ScanFilter => {
      user_id => {
        ComparisonOperator => 'NOT_NULL',
      }
    });

=head1 NAME

Amazon::DynamoDB::20120810 - interact with DynamoDB using API version 20120810

=head1 METHODS - Internal 

The following methods are intended for internal use and are documented
purely for completeness - for normal operations see L</METHODS> instead.

=head2 make_request

Generates an L<HTTP::Request>.

=head1 FUNCTIONS - Internal

=head2 _encode_type_and_value

Returns an appropriate type (N, S, SS etc.) and stringified/encoded value for the given
value.

DynamoDB only uses strings even if there is a Numeric value specified,
so while the type will be expressed as a Number the value will be
stringified.

C<http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataFormat.html>

=head1 AUTHORS

=over 4

=item *

Rusty Conover <rusty@luckydinosaur.com>

=item *

Tom Molesworth <cpan@entitymodel.com>

=back

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2013 by Tom Molesworth, copyright (c) 2014 Lucky Dinosaur LLC. L<http://www.luckydinosaur.com>.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut



( run in 0.657 second using v1.01-cache-2.11-cpan-437f7b0c052 )