Data-AnyXfer

 view release on metacpan or  search on metacpan

lib/Data/AnyXfer/Elastic/Import/SpawnTask/Remote.pm  view on Meta::CPAN

of the extra information needed by this module.

=cut


has remote_host_instance => (
    is  => 'rw',
    isa => InstanceOf['Data::AnyXfer::Elastic::Import::SpawnTask::Remote::Host'],
    required => 1,
);


# REMOTE FILE TRANSFER ROUTINES

sub remote_process_temp_directory {
    return Path::Class::dir( sprintf '%s/es_spawntask_remote_p%s',
        File::Spec->tmpdir, $$ );
}


sub _remote_transfer_object {
    my ( $class, $file_or_dir, $args ) = @_;

    # it's important that the temp directory includes our PID
    my $remote_dir = $class->remote_process_temp_directory;

    # synchronise the remote directory with our local datafile
    # working dir

    # XXX : working dir because:
    # decompressing a 10GB LZMA dataset and the IO to write it back to disk
    # + transferring 200MB takes longer than tranfering 10GB of already
    # decompressed data
    $class->_sync_source_to_target(
        %{$args},
        remote_target => $remote_dir,
        local_source  => $file_or_dir,
    );

    # return the final remote location
    return $file_or_dir->is_dir
        ? $remote_dir->subdir( $file_or_dir->basename )
        : $remote_dir->file( $file_or_dir->basename );
}


sub _sync_source_to_target {
    my ( $class, %args ) = @_;

    # get the remote host info
    my $remote_host = $args{remote_host_instance};

    # prepare rsync arguments
    my @rsync_cmd = (
        Core::Path::Utils->rsync,    #
        '-a',                                   # archive mode,
        '-v',                                   # verbose
        '-q',                 # keep partial files, show progress
        '--no-p',             # don't preserve permissions
        '--no-g',             # don't preserve groups
        '--chmod=ugo=rwX',    #
    );

    my $remote_target = sprintf '%s:%s', $remote_host->host,
        $args{remote_target};

    # add optional arguments
    if ( my $user = $remote_host->user ) {
        $remote_target = $user . '@' . $remote_target;
    }

    # if an identity file is specified, override the underlysing ssh
    # command to supply it along with the port
    if ( my $identity_file = $remote_host->identity_file ) {
        push @rsync_cmd, '-e',
            sprintf 'ssh -o StrictHostKeyChecking=no -i "%s" -p %s',
            $identity_file,
            $remote_host->port;
    } else {
        # otherwise just supply the port
        push @rsync_cmd, sprintf 'ssh -p %s', $remote_host->port;
    }

    # add the source and destination last
    push @rsync_cmd, $args{local_source}, $remote_target;

    # run the rsync and croak on any errors
    my ( $in, $out, $err ) = ( undef, undef, undef );
    IPC::Run3::run3( \@rsync_cmd, \$in, \$out, \$err ) or croak $err;
    return 1;
}




# OVERRIDE SPAWN TASK FUNCTIONALITY

# Override run to transfer the datafile working dir to
# the remote host

sub run {
    my ( $class, %args ) = @_;

    # we only need to do something if we have a datafile
    if ( $args{datafile} ) {

        # transfer the datafile working dir ahead of time
        # to the remote host
        my $dir = $class->_remote_transfer_object(
            $args{datafile}->storage->working_dir, \%args );

        # set the data dir to the location it will be on the remote
        # (must be stringified to be encoded for the argsfile)
        $args{datafile_dir} = $dir->stringify;
    }

    # call the base run method with modified datafile source directory
    return $class->SUPER::run(%args);
}


# Override run with argsfile to transfer it to the remote host

sub _run_self_with_argsfile {
    my ( $class, $argsfile, $args ) = @_;

    # move the argsfile to the remote host
    $argsfile = $class->_remote_transfer_object( $argsfile, $args, );

    # call the original method and pass it the remote
    # argsfile location in place of the local one
    return $class->SUPER::_run_self_with_argsfile( $argsfile, $args );
}


# Override exec command to run on the remote host

sub _exec_command {
    my ( $class, $command, $args ) = @_;

    # now that all of the things we need are available on the
    # remote host, we can proceed to run our commands there

    # get the remote host info
    my $remote_host = $args->{remote_host_instance};

    # build a new temp file to store the commands
    my $command_file
        = Core::tmp_dir()->file( Digest::MD5::md5_hex($command) );

    $command_file->spew(
        # add the shebang line and command
        "#!/usr/bin/env bash\n" . $command
            # XXX : sleep here so that execution
            # cannot finish between starting the remote command and detecting
            # the PID (pretty hard to happen, but theoretically could)
            . "\nsleep 5"
    );

    # move the command file to the remote host
    $command_file = $class->_remote_transfer_object( $command_file, $args );
    # (and make it user executable)
    $remote_host->run( 1, qq!chmod u+x $command_file! );

    # spawn a background process running on the remote host
    $remote_host->run( 0, qq/screen -d -m $command_file/ );

    # find the PID of the remote command
    my $pid = $remote_host->run( 0, qq!pgrep -f "$command_file"! );
    $pid = ( $pid =~ /^(\d+)/ )[0];

    # if the command has finished already, we should probably bail
    # as it likely didn't execute
    unless ($pid) {
        croak "No PID found for remote command. "
            . "It probably failed (file: $command_file)";
    }

    # return a process instance representing the remote runnng process
    return $class->_create_remote_process(
        spawntask_args => $args,
        remote_host    => $remote_host,
        pid            => $pid,
    );
}

sub _create_remote_process {
    my ( $class, %args ) = @_;

    # extract spawn task args for use with _exec_command
    # during cleanup
    my $spawntask_args = delete $args{spawntask_args};

    # create the remote process instance
    return PROCESS_CLASS()->new(
        %args,

        # define a cleanup hook so we can remove our temporary files
        # on the remote afterwards
        cleanup_sub => sub {

            # build command to run on remote
            my $command = sprintf q!%s -MFile::Path -E 'rmtree(q[%s])'!,
                $^X, $class->remote_process_temp_directory;

            # execute it using the host instance
            $spawntask_args->{remote_host_instance}->run( 0, $command );
        }
    );
}

1;

=head1 COPYRIGHT

This software is copyright (c) 2019, Anthony Lucas.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.

=cut



( run in 1.560 second using v1.01-cache-2.11-cpan-99c4e6809bf )