POE-Component-Amazon-S3
view release on metacpan or search on metacpan
lib/POE/Component/Amazon/S3.pm view on Meta::CPAN
package POE::Component::Amazon::S3;
use strict;
use Carp qw(carp croak);
use Data::Dump qw(dump);
use Digest::HMAC_SHA1;
use HTTP::Date;
use HTTP::Request;
use MIME::Base64 qw(encode_base64);
use POE;
use POE::Component::Client::HTTP;
use URI::Escape qw(uri_escape);
use XML::LibXML;
use XML::LibXML::XPathContext;
our $VERSION = '0.01';
my $AMAZON_HEADER_PREFIX = 'x-amz-';
my $METADATA_PREFIX = 'x-amz-meta-';
# block size when downloading/uploading to files
my $BLOCK_SIZE = 4096;
# max keys to fetch each time when calling list_bucket_all
my $MAX_KEYS_PER_CHUNK = 100;
# unique id counter for list_bucket_all to store temporary results
my $LIST_ID = 1;
sub spawn {
my $class = shift;
croak "$class requires an even number of parameters" if @_ % 2;
my %params = @_;
croak "$class requires aws_access_key_id and aws_secret_access_key"
unless $params{aws_access_key_id} && $params{aws_secret_access_key};
$params{libxml} = XML::LibXML->new;
my $self = bless \%params, $class;
# A non-streaming HTTP client for most requests
POE::Component::Client::HTTP->spawn(
Agent => 'POE-Component-Amazon-S3/' . $VERSION,
Alias => 'ua',
Timeout => 30,
);
# A streaming HTTP client for downloads
POE::Component::Client::HTTP->spawn(
Agent => 'POE-Component-Amazon-S3/' . $VERSION,
Alias => 'ua-streaming',
Timeout => 30,
Streaming => $BLOCK_SIZE,
);
POE::Session->create(
object_states => [
$self => [
qw/
_start
shutdown
add_bucket
add_bucket_done
buckets
buckets_done
delete_bucket
delete_bucket_done
add_key
add_key_done
head_key
head_key_done
list_bucket
list_bucket_done
list_bucket_all
list_bucket_all_chunk
get_acl
get_acl_done
get_key
get_key_done
delete_key
delete_key_done
set_acl
set_acl_got_current
set_acl_done
/
],
],
);
return;
}
sub _start {
my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
$kernel->alias_set( $self->{alias} || 's3' );
}
sub shutdown {
my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
# Shut down our HTTP clients
$kernel->post( ua => 'shutdown' );
$kernel->post( 'ua-streaming' => 'shutdown' );
$kernel->alias_remove( $self->{alias} || 'amazon-s3' );
}
### Bucket methods
sub add_bucket {
my ( $kernel, $self, $sender, $event, $conf ) = @_[ KERNEL, OBJECT, SENDER, ARG0, ARG1 ];
my $bucket = $conf->{bucket};
croak 'must specify bucket' unless $bucket;
if ($conf->{acl_short}){
$self->_validate_acl_short($conf->{acl_short});
}
my $header_ref = ($conf->{acl_short})
? {'x-amz-acl' => $conf->{acl_short}}
: {};
my $request = $self->_make_request( PUT => $bucket, $header_ref );
# Save callback info
my $pass = {
sender => $sender,
event => $event,
pass => $conf->{pass} || [],
};
$kernel->post( ua => request => add_bucket_done => $request, $pass );
}
sub add_bucket_done {
my ( $kernel, $self, $req, $res ) = @_[ KERNEL, OBJECT, ARG0, ARG1 ];
my $request = $req->[0];
my $pass = $req->[1];
my $response = $res->[0];
my $return = $self->_check_error( $response );
$kernel->post(
$pass->{sender},
$pass->{event},
$return,
$response,
@{ $pass->{pass} },
);
}
sub buckets {
my ( $kernel, $self, $sender, $event, $conf ) = @_[ KERNEL, OBJECT, SENDER, ARG0, ARG1 ];
my $request = $self->_make_request( GET => '' );
# Save callback info
my $pass = {
sender => $sender,
event => $event,
pass => $conf->{pass} || [],
lib/POE/Component/Amazon/S3.pm view on Meta::CPAN
my $bucket = delete $conf->{bucket};
my $key = delete $conf->{key} || '';
croak 'must specify bucket' unless $bucket;
# Key is optional
my $request = $self->_make_request( GET => $self->_uri( $bucket, $key ) . '?acl' );
# Save callback info
my $pass = {
sender => $sender,
event => $event,
pass => $conf->{pass} || [],
};
$kernel->post( ua => request => get_acl_done => $request, $pass );
}
sub get_acl_done {
my ( $kernel, $self, $req, $res ) = @_[ KERNEL, OBJECT, ARG0, ARG1 ];
my $request = $req->[0];
my $pass = $req->[1];
my $response = $res->[0];
my $return = $self->_check_error( $response );
if ( $return ) {
$return = $self->_parse_acl( $response->content );
}
$kernel->post(
$pass->{sender},
$pass->{event},
$return,
$response,
@{ $pass->{pass} },
);
}
sub get_key {
my ( $kernel, $self, $sender, $event, $conf ) = @_[ KERNEL, OBJECT, SENDER, ARG0, ARG1 ];
my $bucket = delete $conf->{bucket};
my $key = delete $conf->{key};
my $file = delete $conf->{file};
croak 'must specify bucket' unless $bucket;
croak 'must specify key' unless $key;
my $request = $self->_make_request( GET => $self->_uri( $bucket, $key ) );
# Save callback info
my $pass = {
sender => $sender,
event => $event,
file => $file,
pass => $conf->{pass} || [],
};
$kernel->post( 'ua-streaming' => request => get_key_done => $request, $pass );
}
sub get_key_done {
my ( $kernel, $self, $req, $res ) = @_[ KERNEL, OBJECT, ARG0, ARG1 ];
my $request = $req->[0];
my $pass = $req->[1];
my $response = $res->[0];
my $chunk = $res->[1];
if ( $chunk ) {
if ( $pass->{file} && $response->code =~ /^2\d\d$/ ) {
# Save chunks to file, only if response is good
if ( !$request->{_fh} ) {
open my $fh, '>', $pass->{file};
$request->{_fh} = $fh;
}
syswrite $request->{_fh}, $chunk;
}
else {
# Save chunks to response object
$response->content( $response->content() . $chunk );
}
return;
}
else {
# We're all done
if ( $request->{_fh} ) {
$request->{_fh}->close();
delete $request->{_fh};
}
}
my $return = $self->_check_error( $response );
$kernel->post(
$pass->{sender},
$pass->{event},
$return,
$response,
@{ $pass->{pass} },
);
}
sub delete_key {
my ( $kernel, $self, $sender, $event, $conf ) = @_[ KERNEL, OBJECT, SENDER, ARG0, ARG1 ];
my $bucket = delete $conf->{bucket};
my $key = delete $conf->{key};
croak 'must specify bucket' unless $bucket;
croak 'must specify key' unless $key;
my $request = $self->_make_request( DELETE => $self->_uri( $bucket, $key ) );
# Save callback info
my $pass = {
( run in 0.517 second using v1.01-cache-2.11-cpan-39bf76dae61 )