Net-CascadeCopy

 view release on metacpan or  search on metacpan

lib/Net/CascadeCopy.pm  view on Meta::CPAN

package Net::CascadeCopy;
use strict;
use warnings;

our $VERSION = '0.2.6'; # VERSION

use Mouse;

use Benchmark;
use Log::Log4perl qw(:easy);
use POSIX ":sys_wait_h"; # imports WNOHANG
use Proc::Queue size => 32, debug => 0, trace => 0, delay => 1;

my $logger = get_logger( 'default' );

has data         => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );

has total_time   => ( is => 'rw', isa => 'Num', default => 0 );

has ssh          => ( is => 'ro', isa => 'Str', default => "ssh"   );
has ssh_args     => ( is => 'ro', isa => 'Str', default => "-x -A" );

has command      => ( is => 'ro', isa => 'Str', required => 1  );
has command_args => ( is => 'ro', isa => 'Str', default  => "" );

has source_path  => ( is => 'ro', isa => 'Str', required => 1 );
has target_path  => ( is => 'ro', isa => 'Str', required => 1 );

has output       => ( is => 'ro', isa => 'Str', default => "" );

# maximum number of failures per server
has max_failures => ( is => 'ro', isa => 'Num', default => 3 );

# maximum processes per remote server
has max_forks    => ( is => 'ro', isa => 'Num', default => 2 );

# keep track of child processes
has children     => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );

# for testing purposes
has transfer_map => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );

# sort order
has sort_order   => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );

sub add_group {
    my ( $self, $group, $servers_a ) = @_;

    $logger->info( "Adding group: $group: ",
                   join( ", ", @$servers_a ),
               );

    # initialize data structures
    for my $server ( @{ $servers_a } ) {
        $self->data->{remaining}->{ $group }->{$server} = 1;

        unless ( defined $self->sort_order->{ $group }->{ $server } ) {
            $self->sort_order->{ $group }->{ $server } = scalar keys %{ $self->sort_order->{ $group } };
        }
    }

    # first server to transfer from is the current server
    $self->data->{available}->{ $group }->{localhost} = 1;

    # initialize data structures
    $self->data->{completed}->{ $group } = [];

}

sub get_groups {
    my ( $self ) = @_;

    my @groups;

    for my $group ( keys %{ $self->sort_order } ) {
        push @groups, $group;
    }

    return @groups;
}

sub transfer {
    my ( $self ) = @_;

    my $transfer_start = new Benchmark;

  LOOP:
    while ( 1 ) {
        last LOOP unless $self->_transfer_loop( $transfer_start );
        sleep 1;
    }
}

sub _transfer_loop {
    my ( $self, $transfer_start ) = @_;

    $self->_check_for_completed_processes();

    # keep track if there are any remaining servers in any groups
    my ( $remaining_flag, $available_flag );

    # handle completed processes
    if ( ! scalar keys %{ $self->data->{remaining} } && ! $self->data->{running} ) {
        my $transfer_end = new Benchmark;



( run in 2.370 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )