App-Glacier

 view release on metacpan or  search on metacpan

lib/App/Glacier/Command/Put.pm  view on Meta::CPAN

    $self->error(@msg);
    if ($self->{_options}{multiple}) {
	die "__UPLOAD_FAILED__";
    } else {
	exit $code;
    }
}

sub _upload {
    my ($self, $vaultname, $localname, $remotename) = @_;

    $remotename = basename($localname) unless defined($remotename);

    my $st = stat($localname)
	or $self->abend(EX_NOINPUT, "can't stat \"$localname\": $!");
    unless (S_ISREG($st->mode)) {
	$self->abend(EX_NOPERM, "\"$localname\" is not a regular file");
    }
    my $size = $st->size;
    if ($size == 0) {
	$self->abend(EX_NOPERM, "\"$localname\": file has zero size");
    }
    
    my $dir = $self->directory($vaultname);
    my $id = ($size < $self->cf_transfer_param(qw(upload single-part-size)))
	       ? $self->_upload_simple($vaultname, $localname, $remotename)
               : $self->_upload_multipart($vaultname, $localname, $remotename);
    return if $self->dry_run;
    $self->debug(1, "ID $id\n");
    $dir->add_version($remotename, { ArchiveId => $id,
				     Size => $size,
				     CreationDate => new App::Glacier::DateTime,
				     ArchiveDescription => $remotename });
}

sub _upload_simple {
    my ($self, $vaultname, $localname, $remotename) = @_;

    $self->debug(1, "uploading $localname in single part");
    return if $self->dry_run;

    my $p = new App::Glacier::Progress(1,
				       prefix => $localname,
				       show_none => 1)
	unless $self->{_options}{quiet};
    my $archive_id = $self->glacier->Upload_archive($vaultname, $localname,
						    $remotename);	
    $p->finish('uploaded') if $p;
    
    if ($self->glacier->lasterr) {
	$self->abend(EX_FAILURE, "upload failed: ",
		     $self->glacier->last_error_message);
    }
    return $archive_id;
}

sub _upload_multipart {
    my ($self, $vaultname, $localname, $remotename) = @_;
    my $glacier = $self->glacier;
    
    use threads;
    use threads::shared;

    my $archive_size = -s $localname;
    my $part_size =
	$glacier->calculate_multipart_upload_partsize($archive_size);
    
    $self->abend(EX_FAILURE, "$localname is too big for upload")
	if $part_size == 0;

    # Number of parts to upload:
    my $total_parts = int(($archive_size + $part_size - 1) / $part_size);
    
    # Compute number of threads
    my $njobs = $self->{_options}{jobs}
                || $self->cf_transfer_param(qw(upload jobs));

    # Number of parts to upload by each job;
    my $job_parts = int(($total_parts + $njobs - 1) / $njobs);
    
    $self->debug(1,
	 "uploading $localname in chunks of $part_size bytes, in $njobs jobs");
    return if $self->dry_run;
    
    open(my $fd, '<', $localname)
	or $self->abort(EX_FAILURE, "can't open $localname: $!");
    binmode($fd);
    my $upload_id = $glacier->multipart_upload_init($vaultname, $part_size,
						    $remotename);
    $self->debug(1, "Upload ID: $upload_id");

    use Fcntl qw(SEEK_SET);
    
    my @part_hashes :shared = ();
    my $p = new App::Glacier::Progress($total_parts,
				       prefix => $localname)
	unless $self->{_options}{quiet};
    
    for (my $i = 0; $i < $njobs; $i++) {
	my $thr = threads->create(
	    sub {
		my ($job_idx) = @_;
		# Number of part to start from
		my $part_idx = $job_idx * $job_parts;
		# Offset in file
		my $off = $part_idx * $part_size;
		# Number of retries in case of failure
		my $retries = $self->cf_transfer_param(qw(upload retries));
		
		for (my $j = 0; $j < $job_parts;
		     $j++, $part_idx++, $off += $part_size) {
		    last if $off >= $archive_size;
		    my $part;
		    {
			lock @part_hashes;
			seek($fd, $off, SEEK_SET);
			my $rb = sysread($fd, $part, $part_size);
			if ($rb == 0) {
			    $self->abend(EX_OSERR,
					 "failed to read part $part_idx: $!");
			}
		    }
		
		    my $res;
		    for (my $try = 0;;) {
			$res = $glacier->Multipart_upload_upload_part(
			                 $vaultname,
			                 $upload_id,
			                 $part_size,
			                 $part_idx,
			                 \$part
			    );
			if ($glacier->lasterr) {
			    if (++$try < $retries) {
				$self->debug(1, "part $part_idx: ",
					     $glacier->last_error_message);
				$self->debug(1, "retrying");
			    } else {
				$self->error("failed to upload part $part_idx: ",
					     $glacier->last_error_message);
				return 0;
			    }
			} else {
			    last;
			}
		    }
			
		    $part_hashes[$part_idx] = $res;
		    $p->update if $p;		    
		}
		return 1;
	    }, $i);
    }    

    $self->debug(2, "waiting for dowload to finish");
    foreach my $thr (threads->list) {
	# FIXME: better error handling
	$thr->join() or croak "thread $thr failed";
    }
    $p->finish('uploaded') if $p;
    
    # Capture archive id or error code
    $self->debug(2, "finalizing the upload");
    my $archive_id = $glacier->Multipart_upload_complete(
					 $vaultname, $upload_id,
					 \@part_hashes,
					 $archive_size);

    if ($glacier->lasterr) {
	$glacier->multipart_upload_abort($vaultname, $upload_id);
	$self->abend(EX_FAILURE, "upload failed: ",
		     $glacier->last_error_message);
    }
    
    # Check if we have a valid $archive_id
    unless ($archive_id =~ /^[a-zA-Z0-9_\-]{10,}$/) {
	$glacier->multipart_upload_abort($vaultname, $upload_id);
	$self->abend(EX_FAILURE, "upload completion failed");
    }

    return $archive_id;
}

1;



( run in 1.510 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )