Hadoop-HDFS-Command
view release on metacpan or search on metacpan
lib/Hadoop/HDFS/Command.pm view on Meta::CPAN
unshift @cmd, 'sudo', '-u', $self->runas
unless $self->runas eq getpwuid $<;
@_ = ($self, $options, @cmd);
};
sub dfs {
my $self = shift;
my $options = Ref::Util::is_hashref $_[0] ? shift( @_ ) : {};
(my $cmd = shift || die "No dfs command specified") =~ s{ \A [-]+ }{}xms;
my $method = '_dfs_' . $cmd;
Carp::croak "'$cmd' is not implemented!" if ! $self->can( $method );
$self->$method( $options, @_ );
}
sub _dfs_ls {
my $self = shift;
state $strp;
my $options = shift;
my @params = @_;
my @flags = qw( d h R );
my($arg, $paths) = $self->_parse_options(
\@params,
\@flags,
undef,
{
require_params => 1,
},
);
my $want_epoch = $options->{want_epoch};
my $cb = delete $options->{callback};
if ( $cb ) {
die "callback needs to be a CODE" if ! Ref::Util::is_coderef $cb;
if ( defined wantarray ) {
Carp::croak "You need to call this function in void context when callback is specified";
}
}
my @response = $self->_capture(
$options,
$self->cmd_hdfs,
qw( dfs -ls ),
( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
@{ $paths },
);
# directory is empty
#
return if ! @response;
if ( $response[0] && $response[0] =~ m{ \A Found \s+ [0-9] }xms ) {
shift @response; # junk
}
my $space = q{ };
my @rv;
for my $line ( @response ) {
my($mode, $replication, $user, $group, @unknown) = split m{ \s+ }xms, $line, 5;
my @rest = map { split $space, $_ } @unknown;
my $size;
if ( $arg->{h}) {
if ( $rest[0] eq '0' || $rest[1] !~ m{ [a-zA-Z_] }xms ) {
$size = shift @rest;
}
else {
$size = join $space, shift @rest, shift @rest;
}
}
else {
$size = shift @rest;
}
my $date = join ' ', shift @rest, shift @rest;
my $path = shift( @rest ) || die "Unable to parse $line to gather the path";
my $is_dir = $mode =~ m{ \A [d] }xms ? 1 : 0;
my %record = (
mode => $mode,
replication => $replication,
user => $user,
group => $group,
size => $size,
date => $date,
path => $path,
type => $is_dir ? 'dir' : 'file',
);
if ( $want_epoch ) {
$strp ||= DateTime::Format::Strptime->new(
pattern => '%Y-%m-%d %H:%M',
time_zone => 'CET',
on_error => 'croak',
);
eval {
$record{epoch} = $strp->parse_datetime( $date )->epoch;
1;
} or do {
my $eval_error = $@ || 'Zombie error';
$self->_log( debug => 'Failed to convert %s into an epoch: %s',
$date,
$eval_error,
);
};
}
if ( @rest ) {
# interpret as the rest of the path as spaces in paths are ok
# possibly this will need to be revisited in the future.
#
$record{path} = join $space, $record{path}, @rest;
}
if ( $cb ) {
# control the flow from the callback
# So, the return value matters.
#
if ( ! $cb->( \%record ) ) {
$self->_log( info => 'Terminating the ls processing as the user callback did not return a true value.');
last;
}
next;
}
push @rv, { %record };
}
return if $cb;
return @rv;
}
sub _dfs_du {
my $self = shift;
my $options = shift;
my @params = @_;
my @flags = qw( h s );
my($arg, $paths) = $self->_parse_options(
\@params,
\@flags,
undef,
{
require_params => 1,
},
);
my @rv = $self->_capture(
$options,
$self->cmd_hdfs,
qw( dfs -du ),
( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
@{ $paths },
) or die "No output collected from -du command";
return map {
my @val = split m{ \s{2,} }xms, $_;
{
size => shift( @val ),
name => pop( @val ),
( @val ? (
disk_space_consumed => shift( @val ),
) : () ),
}
} @rv;
}
sub _dfs_mv {
my $self = shift;
my $options = shift;
my @params = @_;
my($arg, $paths) = $self->_parse_options(
\@params,
[],
undef,
{
require_params => 1,
},
);
my $source = shift @{ $paths } || die "Source path not specified";
my $target = shift @{ $paths } || die "Target path not specified";
# will die on error
$self->_capture(
$options,
$self->cmd_hdfs,
qw( dfs -mv ),
$source => $target,
);
return;
}
sub _dfs_rm {
my $self = shift;
my $options = shift;
my @params = @_;
my @flags = qw( f r skipTrash );
my($arg, $paths) = $self->_parse_options(
\@params,
\@flags,
undef,
{
require_params => 1,
},
);
my @response = $self->_capture(
$options,
$self->cmd_hdfs,
qw( dfs -rm ),
( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
@{ $paths },
);
# just a confirmation message
return @response;
lib/Hadoop/HDFS/Command.pm view on Meta::CPAN
$self->cmd_hdfs,
qw( dfs -chown ),
( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
@{ $paths },
);
# just a confirmation message
return @response
}
sub _dfs_get {
my $self = shift;
my $options = shift;
my @params = @_;
my @flags = qw( p ignoreCrc crc );
my($arg, $paths) = $self->_parse_options(
\@params,
\@flags,
undef,
{
require_params => 1,
},
);
my @response = $self->_capture(
$options,
$self->cmd_hdfs,
qw( dfs -get ),
( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
@{ $paths },
);
# just a confirmation message
return @response
}
sub _dfs_getfacl {
my $self = shift;
my $options = shift;
my @params = @_;
my @flags = qw( R );
my($arg, $paths) = $self->_parse_options(
\@params,
\@flags,
undef,
{
require_params => 1,
},
);
my @response = $self->_capture(
$options,
$self->cmd_hdfs,
qw( dfs -getfacl ),
( map { '-' . $_ } grep { $arg->{ $_ } } @flags ),
@{ $paths },
);
my %rv;
for my $line ( @response ) {
if ( my($match) = $line =~ m{ \A [#] \s+ (.*) \z }xms ) {
my($k, $v) = split m{ [:] \s+ }xms, $match, 2;
$rv{ $k } = $v;
next;
}
push @{ $rv{entries} ||= [] }, $line;
}
return \%rv;
}
sub _dfs_setfacl {
my $self = shift;
my $options = shift;
my @params = @_;
my @flags = qw( b k R );
my @args = qw( m=s x=s set=s );
my($arg, $paths) = $self->_parse_options(
\@params,
\@flags,
\@args,
{
require_params => 1,
},
);
my @acl_flags = map { '-' . $_ } grep { $arg->{ $_ } } @flags;
delete @{ $arg }{ @flags };
my @acl_args = map {
my $key = $_ eq 'set'? '--set' : '-' . $_;
$key => $arg->{ $_ }
} keys %{ $arg };
my @response = $self->_capture(
$options,
$self->cmd_hdfs,
qw( dfs -setfacl ),
@acl_flags,
@acl_args,
@{ $paths },
);
# empty on success
@response;
}
sub _parse_options {
my $self = shift;
# TODO: collect dfs generic options
#
# Generic options supported are
# -conf <configuration file> specify an application configuration file
# -D <property=value> use value for given property
# -fs <local|namenode:port> specify a namenode
# -jt <local|resourcemanager:port> specify a ResourceManager
# -files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
# -libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
# -archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
my($params, $flags, $opt, $conf) = @_;
lib/Hadoop/HDFS/Command.pm view on Meta::CPAN
# TODO: use a single capture method.
my $options = shift;
my @cmd = @_;
my $stdin = delete $options->{stdin};
$self->_log( debug => 'Executing command(IPC): %s', join(' ', @cmd) );
my $start = time;
my $res = IPC::Cmd::run_forked(
\@cmd,
{
( $stdin ? (
child_stdin => $stdin,
) : () ),
#timeout => $timeout,
terminate_on_parent_sudden_death => 1,
}
);
$self->_log( debug => 'Execution took %.3f seconds', time - $start );
my($stdout, $stderr, $fail);
my $success = defined $res->{exit_code}
&& $res->{exit_code} == 0
&& ! $res->{timeout};
$fail = $success ? 0 : $res->{exit_code};
$stderr = $res->{stderr};
$stdout = $res->{stdout};
if ( $fail ) {
my $code = $fail >> 8;
$stderr ||= $res->{err_msg} || '[no error]';
my $msg = "External command (@cmd) failed with status=$code: $stderr";
if ( $options->{ignore_fail} ) {
if ( ! $options->{silent} ) {
warn "[Fatal error downgraded to a warning] $msg";
}
return $self->_split_on_newlines( $stdout || '' );
}
die $msg;
}
if ( $stderr ) {
warn "Warning from external command: $stderr";
}
return $self->_split_on_newlines( $stdout );
}
sub _split_on_newlines {
my $self = shift;
my $rv = shift;
$rv =~ s{ \A \s+ }{}xms;
$rv =~ s{ \s+ \z }{}xms;
return split m{ \n+ }xms, $rv;
}
sub _log {
my $self = shift;
return if ! $self->enable_log;
my($level, $tmpl, @param) = @_;
my $msg = sprintf "[%s] %s\n", uc $level, $tmpl;
printf STDERR $msg, @param;
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Hadoop::HDFS::Command
=head1 VERSION
version 0.007
=head1 SYNOPSIS
use Hadoop::HDFS::Command;
my $hdfs = Hadoop::HDFS::Command->new;
my @rv = $hdfs->$command( @command_args );
=head1 DESCRIPTION
This is a simple wrapper around the hdfs commandline to make them easier to
call from Perl and parse their output.
The interface is partially done at the moment (see the implemented wrappers
down below).
You can always use the WebHDFS to do similar operations instead of failling
back to the commandline. However there are several benefits of using the
cli; i) you'll end up with a single C<JVM> invocation, so the response
might be faster ii) Some functionality / endpoints might be buggy for WebHDFS
but might work with the cli (for example escaping certain values is broken
in some versions, but works with the cli).
=head1 NAME
Hadoop::HDFS::Command - Wrappers for various hadoop hdfs cli commands
=head1 METHODS
=head2 new
The constructor. Available attributes are listed below.
=head3 cmd_hdfs
Default value is C</usr/bin/hdfs>. This option needs to be altered if you have
( run in 0.871 second using v1.01-cache-2.11-cpan-71847e10f99 )