Hadoop-HDFS-Command
view release on metacpan or search on metacpan
lib/Hadoop/HDFS/Command.pm view on Meta::CPAN
package Hadoop::HDFS::Command;
$Hadoop::HDFS::Command::VERSION = '0.007';
use 5.010;
use strict;
use warnings;
use Capture::Tiny ();
use Carp ();
use Data::Dumper;
use DateTime::Format::Strptime;
use DateTime;
use Getopt::Long ();
use IPC::Cmd ();
use Ref::Util ();
use Time::HiRes qw( time );
use Types::Standard qw(Bool Str);
{ use Moo; }
has cmd_hdfs => (
is => 'rw',
isa => sub {
my $val = shift;
return if $val && -e $val && -x _;
Carp::confess sprintf "The command `%s` either does not exist or not an executable!",
$val,
;
},
default => sub { '/usr/bin/hdfs' },
lazy => 1,
);
has enable_log => (
is => 'rw',
isa => Bool,
default => sub { 0 },
lazy => 1,
);
has trace_logs => (
is => 'rw',
isa => Bool,
default => sub { 0 },
lazy => 1,
);
has runas => (
is => 'rw',
isa => Str,
default => scalar getpwuid $<,
lazy => 1,
);
before ['_capture', '_capture_with_stdin'] => sub {
my ($self, $options, @cmd) = @_;
unshift @cmd, 'sudo', '-u', $self->runas
unless $self->runas eq getpwuid $<;
@_ = ($self, $options, @cmd);
};
sub dfs {
my $self = shift;
my $options = Ref::Util::is_hashref $_[0] ? shift( @_ ) : {};
(my $cmd = shift || die "No dfs command specified") =~ s{ \A [-]+ }{}xms;
my $method = '_dfs_' . $cmd;
Carp::croak "'$cmd' is not implemented!" if ! $self->can( $method );
$self->$method( $options, @_ );
}
sub _dfs_ls {
my $self = shift;
state $strp;
my $options = shift;
my @params = @_;
my @flags = qw( d h R );
my($arg, $paths) = $self->_parse_options(
\@params,
\@flags,
undef,
{
require_params => 1,
},
);
my $want_epoch = $options->{want_epoch};
my $cb = delete $options->{callback};
if ( $cb ) {
die "callback needs to be a CODE" if ! Ref::Util::is_coderef $cb;
if ( defined wantarray ) {
Carp::croak "You need to call this function in void context when callback is specified";
}
}
my @response = $self->_capture(
$options,
$self->cmd_hdfs,
qw( dfs -ls ),
( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
@{ $paths },
);
# directory is empty
#
return if ! @response;
if ( $response[0] && $response[0] =~ m{ \A Found \s+ [0-9] }xms ) {
shift @response; # junk
}
my $space = q{ };
my @rv;
for my $line ( @response ) {
my($mode, $replication, $user, $group, @unknown) = split m{ \s+ }xms, $line, 5;
my @rest = map { split $space, $_ } @unknown;
my $size;
if ( $arg->{h}) {
if ( $rest[0] eq '0' || $rest[1] !~ m{ [a-zA-Z_] }xms ) {
$size = shift @rest;
}
else {
$size = join $space, shift @rest, shift @rest;
}
}
else {
$size = shift @rest;
}
my $date = join ' ', shift @rest, shift @rest;
my $path = shift( @rest ) || die "Unable to parse $line to gather the path";
my $is_dir = $mode =~ m{ \A [d] }xms ? 1 : 0;
my %record = (
mode => $mode,
replication => $replication,
user => $user,
group => $group,
size => $size,
date => $date,
path => $path,
type => $is_dir ? 'dir' : 'file',
);
if ( $want_epoch ) {
$strp ||= DateTime::Format::Strptime->new(
pattern => '%Y-%m-%d %H:%M',
time_zone => 'CET',
on_error => 'croak',
);
lib/Hadoop/HDFS/Command.pm view on Meta::CPAN
}
sub _dfs_setfacl {
my $self = shift;
my $options = shift;
my @params = @_;
my @flags = qw( b k R );
my @args = qw( m=s x=s set=s );
my($arg, $paths) = $self->_parse_options(
\@params,
\@flags,
\@args,
{
require_params => 1,
},
);
my @acl_flags = map { '-' . $_ } grep { $arg->{ $_ } } @flags;
delete @{ $arg }{ @flags };
my @acl_args = map {
my $key = $_ eq 'set'? '--set' : '-' . $_;
$key => $arg->{ $_ }
} keys %{ $arg };
my @response = $self->_capture(
$options,
$self->cmd_hdfs,
qw( dfs -setfacl ),
@acl_flags,
@acl_args,
@{ $paths },
);
# empty on success
@response;
}
sub _parse_options {
my $self = shift;
# TODO: collect dfs generic options
#
# Generic options supported are
# -conf <configuration file> specify an application configuration file
# -D <property=value> use value for given property
# -fs <local|namenode:port> specify a namenode
# -jt <local|resourcemanager:port> specify a ResourceManager
# -files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
# -libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
# -archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
my($params, $flags, $opt, $conf) = @_;
$conf ||= {};
my @params = map { $_ eq '-' ? '\-' : $_ } @{ $params };
my @getopt_args = (
\@params,
\my %arg,
(
map { Ref::Util::is_arrayref $_ ? @{ $_ } : () }
$flags,
$opt,
),
);
if ( $self->trace_logs ) {
$self->_log( trace => '_parse_options::getopt: %s', \@getopt_args );
}
Getopt::Long::GetOptionsFromArray(
@getopt_args
) || die qq{Unable to parse parameters: '@{$params}'};
if ( $conf->{require_params} && ! @params ) {
die "No parameters were specified!";
}
if ( $self->trace_logs ) {
$self->_log( trace => '_parse_options::rv: %s', Dumper [ \%arg, [ @params ] ] );
}
return \%arg, [ @params ];
}
sub _capture {
my $self = shift;
my $options = shift;
my @cmd = @_;
$self->_log( debug => 'Executing command: %s', join(' ', @cmd) );
my $start = time;
my($stdout, $stderr, $fail) = Capture::Tiny::capture {
system( @cmd );
};
$self->_log( debug => 'Execution took %.3f seconds', time - $start );
if ( $fail ) {
my $code = $fail >> 8;
$stderr ||= '[no error]';
my $msg = "External command (@cmd) failed with status=$code: $stderr";
if ( $options->{ignore_fail} ) {
if ( ! $options->{silent} ) {
warn "[Fatal error downgraded to a warning] $msg";
}
return $self->_split_on_newlines( $stdout || '' );
}
die $msg;
}
if ( $stderr ) {
warn "Warning from external command: $stderr";
}
return $self->_split_on_newlines( $stdout );
}
sub _capture_with_stdin {
( run in 1.471 second using v1.01-cache-2.11-cpan-99c4e6809bf )