POE-Component-Amazon-S3

 view release on metacpan or  search on metacpan

lib/POE/Component/Amazon/S3.pm  view on Meta::CPAN

package POE::Component::Amazon::S3;

use strict;

use Carp qw(carp croak);
use Data::Dump qw(dump);
use Digest::HMAC_SHA1;
use HTTP::Date;
use HTTP::Request;
use MIME::Base64 qw(encode_base64);
use POE;
use POE::Component::Client::HTTP;
use URI::Escape qw(uri_escape);
use XML::LibXML;
use XML::LibXML::XPathContext;

our $VERSION = '0.01';

my $AMAZON_HEADER_PREFIX = 'x-amz-';
my $METADATA_PREFIX      = 'x-amz-meta-';

# block size when downloading/uploading to files
my $BLOCK_SIZE           = 4096;

# max keys to fetch each time when calling list_bucket_all
my $MAX_KEYS_PER_CHUNK   = 100;

# unique id counter for list_bucket_all to store temporary results
my $LIST_ID              = 1;

sub spawn {
    my $class = shift;
    
    croak "$class requires an even number of parameters" if @_ % 2;
    
    my %params = @_;
    
    croak "$class requires aws_access_key_id and aws_secret_access_key"
        unless $params{aws_access_key_id} && $params{aws_secret_access_key};
    
    $params{libxml} = XML::LibXML->new;
    
    my $self = bless \%params, $class;
    
    # A non-streaming HTTP client for most requests
    POE::Component::Client::HTTP->spawn(
        Agent   => 'POE-Component-Amazon-S3/' . $VERSION,
        Alias   => 'ua',
        Timeout => 30,
    );
    
    # A streaming HTTP client for downloads
    POE::Component::Client::HTTP->spawn(
        Agent     => 'POE-Component-Amazon-S3/' . $VERSION,
        Alias     => 'ua-streaming',
        Timeout   => 30,
        Streaming => $BLOCK_SIZE,
    );
    
    POE::Session->create(
        object_states => [
            $self => [
                qw/
                    _start
                    shutdown
                          
                    add_bucket
                    add_bucket_done
                    buckets
                    buckets_done
                    delete_bucket
                    delete_bucket_done

                    add_key
                    add_key_done
                    head_key
                    head_key_done
                    list_bucket
                    list_bucket_done
                    list_bucket_all
                    list_bucket_all_chunk
                    get_acl
                    get_acl_done
                    get_key
                    get_key_done
                    delete_key
                    delete_key_done
                    set_acl
                    set_acl_got_current
                    set_acl_done
                /
            ],
        ],
    );
    
    return;
}

sub _start {
    my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
    
    $kernel->alias_set( $self->{alias} || 's3' );
}

sub shutdown {
    my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
    
    # Shut down our HTTP clients
    $kernel->post( ua => 'shutdown' );
    $kernel->post( 'ua-streaming' => 'shutdown' );
    
    $kernel->alias_remove( $self->{alias} || 'amazon-s3' );
}

### Bucket methods

sub add_bucket {
    my ( $kernel, $self, $sender, $event, $conf ) = @_[ KERNEL, OBJECT, SENDER, ARG0, ARG1 ];
    
    my $bucket = $conf->{bucket};
    croak 'must specify bucket' unless $bucket;

    if ($conf->{acl_short}){
        $self->_validate_acl_short($conf->{acl_short});
    }

    my $header_ref = ($conf->{acl_short})
        ? {'x-amz-acl' => $conf->{acl_short}}
        : {};
    
    my $request = $self->_make_request( PUT => $bucket, $header_ref );
    
    # Save callback info
    my $pass = {
        sender => $sender,
        event  => $event,
        pass   => $conf->{pass} || [],
    };
    
    $kernel->post( ua => request => add_bucket_done => $request, $pass );
}

sub add_bucket_done {
    my ( $kernel, $self, $req, $res ) = @_[ KERNEL, OBJECT, ARG0, ARG1 ];
    
    my $request  = $req->[0];
    my $pass     = $req->[1];
    my $response = $res->[0];
    
    my $return = $self->_check_error( $response );
    
    $kernel->post( 
        $pass->{sender}, 
        $pass->{event}, 
        $return,
        $response,
        @{ $pass->{pass} },
    );
}

sub buckets {
    my ( $kernel, $self, $sender, $event, $conf ) = @_[ KERNEL, OBJECT, SENDER, ARG0, ARG1 ];

    my $request = $self->_make_request( GET => '' );

    # Save callback info
    my $pass = {
        sender => $sender,
        event  => $event,
        pass   => $conf->{pass} || [],

lib/POE/Component/Amazon/S3.pm  view on Meta::CPAN

    my $bucket = delete $conf->{bucket};
    my $key    = delete $conf->{key} || '';
    
    croak 'must specify bucket' unless $bucket;
    # Key is optional
    
    my $request = $self->_make_request( GET => $self->_uri( $bucket, $key ) . '?acl' );
    
    # Save callback info
    my $pass = {
        sender => $sender,
        event  => $event,
        pass   => $conf->{pass} || [],
    };

    $kernel->post( ua => request => get_acl_done => $request, $pass );
}

sub get_acl_done {
    my ( $kernel, $self, $req, $res ) = @_[ KERNEL, OBJECT, ARG0, ARG1 ];
    
    my $request  = $req->[0];
    my $pass     = $req->[1];
    my $response = $res->[0];
    
    my $return = $self->_check_error( $response );
    
    if ( $return ) {
        $return = $self->_parse_acl( $response->content );
    }
    
    $kernel->post(
        $pass->{sender},
        $pass->{event},
        $return,
        $response,
        @{ $pass->{pass} },
    );
}

sub get_key {
    my ( $kernel, $self, $sender, $event, $conf ) = @_[ KERNEL, OBJECT, SENDER, ARG0, ARG1 ];
    
    my $bucket = delete $conf->{bucket};
    my $key    = delete $conf->{key};
    my $file   = delete $conf->{file};
    
    croak 'must specify bucket' unless $bucket;
    croak 'must specify key' unless $key;
    
    my $request = $self->_make_request( GET => $self->_uri( $bucket, $key ) );
    
    # Save callback info
    my $pass = {
        sender => $sender,
        event  => $event,
        file   => $file,
        pass   => $conf->{pass} || [],
    };

    $kernel->post( 'ua-streaming' => request => get_key_done => $request, $pass );
}

sub get_key_done {
    my ( $kernel, $self, $req, $res ) = @_[ KERNEL, OBJECT, ARG0, ARG1 ];
    
    my $request  = $req->[0];
    my $pass     = $req->[1];
    
    my $response = $res->[0];
    my $chunk    = $res->[1];
    
    if ( $chunk ) {
        if ( $pass->{file} && $response->code =~ /^2\d\d$/ ) {
            # Save chunks to file, only if response is good
            if ( !$request->{_fh} ) {
                open my $fh, '>', $pass->{file};
                $request->{_fh} = $fh;
            }
    
            syswrite $request->{_fh}, $chunk;
        }
        else {
            # Save chunks to response object
            $response->content( $response->content() . $chunk );
        }
    
        return;
    }
    else {
        # We're all done
        if ( $request->{_fh} ) {
            $request->{_fh}->close();
            delete $request->{_fh};
        }
    }
    
    my $return = $self->_check_error( $response );
    
    $kernel->post(
        $pass->{sender},
        $pass->{event},
        $return,
        $response,
        @{ $pass->{pass} },
    );
}

sub delete_key {
    my ( $kernel, $self, $sender, $event, $conf ) = @_[ KERNEL, OBJECT, SENDER, ARG0, ARG1 ];
    
    my $bucket = delete $conf->{bucket};
    my $key    = delete $conf->{key};
    
    croak 'must specify bucket' unless $bucket;
    croak 'must specify key' unless $key;
    
    my $request = $self->_make_request( DELETE => $self->_uri( $bucket, $key ) );

    # Save callback info
    my $pass = {



( run in 0.517 second using v1.01-cache-2.11-cpan-39bf76dae61 )