Net-CascadeCopy
view release on metacpan or search on metacpan
lib/Net/CascadeCopy.pm view on Meta::CPAN
package Net::CascadeCopy;
use strict;
use warnings;
our $VERSION = '0.2.6'; # VERSION
use Mouse;
use Benchmark;
use Log::Log4perl qw(:easy);
use POSIX ":sys_wait_h"; # imports WNOHANG
use Proc::Queue size => 32, debug => 0, trace => 0, delay => 1;
my $logger = get_logger( 'default' );
has data => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );
has total_time => ( is => 'rw', isa => 'Num', default => 0 );
has ssh => ( is => 'ro', isa => 'Str', default => "ssh" );
has ssh_args => ( is => 'ro', isa => 'Str', default => "-x -A" );
has command => ( is => 'ro', isa => 'Str', required => 1 );
has command_args => ( is => 'ro', isa => 'Str', default => "" );
has source_path => ( is => 'ro', isa => 'Str', required => 1 );
has target_path => ( is => 'ro', isa => 'Str', required => 1 );
has output => ( is => 'ro', isa => 'Str', default => "" );
# maximum number of failures per server
has max_failures => ( is => 'ro', isa => 'Num', default => 3 );
# maximum processes per remote server
has max_forks => ( is => 'ro', isa => 'Num', default => 2 );
# keep track of child processes
has children => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );
# for testing purposes
has transfer_map => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );
# sort order
has sort_order => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );
sub add_group {
my ( $self, $group, $servers_a ) = @_;
$logger->info( "Adding group: $group: ",
join( ", ", @$servers_a ),
);
# initialize data structures
for my $server ( @{ $servers_a } ) {
$self->data->{remaining}->{ $group }->{$server} = 1;
unless ( defined $self->sort_order->{ $group }->{ $server } ) {
$self->sort_order->{ $group }->{ $server } = scalar keys %{ $self->sort_order->{ $group } };
}
}
# first server to transfer from is the current server
$self->data->{available}->{ $group }->{localhost} = 1;
# initialize data structures
$self->data->{completed}->{ $group } = [];
}
sub get_groups {
my ( $self ) = @_;
my @groups;
for my $group ( keys %{ $self->sort_order } ) {
push @groups, $group;
}
return @groups;
}
sub transfer {
my ( $self ) = @_;
my $transfer_start = new Benchmark;
LOOP:
while ( 1 ) {
last LOOP unless $self->_transfer_loop( $transfer_start );
sleep 1;
}
}
sub _transfer_loop {
my ( $self, $transfer_start ) = @_;
$self->_check_for_completed_processes();
# keep track if there are any remaining servers in any groups
my ( $remaining_flag, $available_flag );
# handle completed processes
if ( ! scalar keys %{ $self->data->{remaining} } && ! $self->data->{running} ) {
my $transfer_end = new Benchmark;
( run in 2.370 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )