Hadoop-HDFS-Command

 view release on metacpan or  search on metacpan

lib/Hadoop/HDFS/Command.pm  view on Meta::CPAN

package Hadoop::HDFS::Command;
$Hadoop::HDFS::Command::VERSION = '0.007';
use 5.010;
use strict;
use warnings;
use Capture::Tiny   ();
use Carp            ();
use Data::Dumper;
use DateTime::Format::Strptime;
use DateTime;
use Getopt::Long    ();
use IPC::Cmd        ();
use Ref::Util       ();
use Time::HiRes   qw( time );
use Types::Standard qw(Bool Str);

{ use Moo; }

has cmd_hdfs => (
    is  => 'rw',
    isa => sub {
        my $val = shift;
        return if $val && -e $val && -x _;
        Carp::confess sprintf "The command `%s` either does not exist or not an executable!",
                        $val,
        ;
    },
    default => sub { '/usr/bin/hdfs' },
    lazy    => 1,
);

has enable_log => (
    is      => 'rw',
    isa     => Bool,
    default => sub { 0 },
    lazy    => 1,
);

has trace_logs => (
    is      => 'rw',
    isa     => Bool,
    default => sub { 0 },
    lazy    => 1,
);

has runas => (
    is      => 'rw',
    isa     => Str,
    default => scalar getpwuid $<,
    lazy    => 1,
);

before ['_capture', '_capture_with_stdin'] => sub {
    my ($self, $options, @cmd) = @_;
    unshift @cmd, 'sudo', '-u', $self->runas
        unless $self->runas eq getpwuid $<;
    @_ = ($self, $options, @cmd);
};

sub dfs {
    my $self = shift;
    my $options = Ref::Util::is_hashref $_[0] ? shift( @_ ) : {};
    (my $cmd    = shift || die "No dfs command specified") =~ s{ \A [-]+ }{}xms;
    my $method  = '_dfs_' . $cmd;
    Carp::croak "'$cmd' is not implemented!" if ! $self->can( $method );
    $self->$method( $options, @_ );
}

sub _dfs_ls {
    my $self = shift;
    state $strp;

    my $options = shift;
    my @params  = @_;
    my @flags   = qw( d h R );
    my($arg, $paths) = $self->_parse_options(
                            \@params,
                            \@flags,
                            undef,
                            {
                                require_params => 1,
                            },
                        );

    my $want_epoch = $options->{want_epoch};
    my $cb = delete $options->{callback};

    if ( $cb ) {
        die "callback needs to be a CODE" if ! Ref::Util::is_coderef $cb;
        if ( defined wantarray ) {
            Carp::croak "You need to call this function in void context when callback is specified";
        }
    }

    my @response = $self->_capture(
        $options,
        $self->cmd_hdfs,
        qw( dfs -ls ),
        ( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
        @{ $paths },
    );

    # directory is empty
    #
    return if ! @response;

    if ( $response[0] && $response[0] =~ m{ \A Found \s+ [0-9] }xms ) {
        shift @response; # junk
    }

    my $space = q{ };

    my @rv;
    for my $line ( @response ) {
        my($mode, $replication, $user, $group, @unknown) = split m{ \s+ }xms, $line, 5;
        my @rest = map { split $space, $_ } @unknown;
        my $size;
        if ( $arg->{h}) {
            if ( $rest[0] eq '0' || $rest[1] !~ m{ [a-zA-Z_] }xms ) {
                $size = shift @rest;
            }
            else {
                $size = join $space, shift @rest, shift @rest;
            }
        }
        else {
            $size = shift @rest;
        }
        my $date   = join ' ', shift @rest, shift @rest;
        my $path   = shift( @rest ) || die "Unable to parse $line to gather the path";
        my $is_dir = $mode =~ m{ \A [d] }xms ? 1 : 0;

        my %record = (
            mode        => $mode,
            replication => $replication,
            user        => $user,
            group       => $group,
            size        => $size,
            date        => $date,
            path        => $path,
            type        => $is_dir ? 'dir' : 'file',
        );

        if ( $want_epoch ) {
            $strp ||= DateTime::Format::Strptime->new(
                            pattern   => '%Y-%m-%d %H:%M',
                            time_zone => 'CET',
                            on_error  => 'croak',
                        );

lib/Hadoop/HDFS/Command.pm  view on Meta::CPAN

}

sub _dfs_setfacl {
    my $self    = shift;
    my $options = shift;
    my @params  = @_;
    my @flags   = qw( b k R );
    my @args    = qw( m=s x=s set=s );

    my($arg, $paths) = $self->_parse_options(
                            \@params,
                            \@flags,
                            \@args,
                            {
                                require_params => 1,
                            },
                        );

    my @acl_flags = map { '-' . $_ } grep { $arg->{ $_ } } @flags;
    delete @{ $arg }{ @flags };

    my @acl_args = map {
        my $key = $_ eq 'set'? '--set' : '-' . $_;
        $key => $arg->{ $_ }
    } keys %{ $arg };

    my @response = $self->_capture(
        $options,
        $self->cmd_hdfs,
        qw( dfs -setfacl ),
        @acl_flags,
        @acl_args,
        @{ $paths },
    );

    # empty on success
    @response;
}

sub _parse_options {
    my $self = shift;
    # TODO: collect dfs generic options
    #
    # Generic options supported are
    # -conf <configuration file>     specify an application configuration file
    # -D <property=value>            use value for given property
    # -fs <local|namenode:port>      specify a namenode
    # -jt <local|resourcemanager:port>    specify a ResourceManager
    # -files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
    # -libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
    # -archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.

    my($params, $flags, $opt, $conf) = @_;
    $conf ||= {};
    my @params = map { $_ eq '-' ? '\-' : $_ } @{ $params };

    my @getopt_args = (
        \@params,
        \my %arg,
        (
            map { Ref::Util::is_arrayref $_ ? @{ $_ } : () }
                $flags,
                $opt,
        ),
    );

    if  ( $self->trace_logs ) {
        $self->_log( trace => '_parse_options::getopt: %s', \@getopt_args );
    }

    Getopt::Long::GetOptionsFromArray(
        @getopt_args
    ) || die qq{Unable to parse parameters: '@{$params}'};

    if ( $conf->{require_params} && ! @params ) {
        die "No parameters were specified!";
    }

    if ( $self->trace_logs ) {
        $self->_log( trace => '_parse_options::rv: %s', Dumper [ \%arg, [ @params ] ] );
    }

    return \%arg, [ @params ];
}

sub _capture {
    my $self = shift;
    my $options = shift;
    my @cmd     = @_;

    $self->_log( debug => 'Executing command: %s', join(' ', @cmd) );

    my $start = time;

    my($stdout, $stderr, $fail) = Capture::Tiny::capture {
        system( @cmd );
    };

    $self->_log( debug => 'Execution took %.3f seconds', time - $start );

    if ( $fail ) {
        my $code = $fail >> 8;
        $stderr ||= '[no error]';
        my $msg = "External command (@cmd) failed with status=$code: $stderr";
        if ( $options->{ignore_fail} ) {
            if ( ! $options->{silent} ) {
                warn "[Fatal error downgraded to a warning] $msg";
            }
            return $self->_split_on_newlines( $stdout || '' );
        }
        die $msg;
    }

    if ( $stderr ) {
        warn "Warning from external command: $stderr";
    }

    return $self->_split_on_newlines( $stdout );
}

sub _capture_with_stdin {



( run in 1.471 second using v1.01-cache-2.11-cpan-99c4e6809bf )