Net-Hadoop-YARN
view release on metacpan or search on metacpan
lib/Net/Hadoop/YARN/ResourceManager/Scheduler/UserApps.pm view on Meta::CPAN
package Net::Hadoop::YARN::ResourceManager::Scheduler::UserApps;
$Net::Hadoop::YARN::ResourceManager::Scheduler::UserApps::VERSION = '0.203';
use 5.10.0;
use strict;
use warnings;
use Data::Dumper ();
use Moo;
use POSIX ();
use Ref::Util ();
use Scalar::Util ();
use Time::Duration ();
use Net::Hadoop::YARN::ResourceManager;
has rm_object => (
is => 'rw',
isa => sub {
my $thing = shift;
my $type = 'Net::Hadoop::YARN::ResourceManager';
if ( ! $thing
|| ! Scalar::Util::blessed $thing
|| ! $thing->isa( $type )
) {
die "rm_object is not a $type";
}
},
default => sub {
Net::Hadoop::YARN::ResourceManager->new(
( $ENV{YARN_RESOURCE_MANAGER} ? (
servers => [ split /,/, $ENV{YARN_RESOURCE_MANAGER} ]
) : () )
);
},
);
sub collect {
my $self = shift;
my $user = shift || die "No user name was specified";
my $apps = $self->rm_object->apps( user => $user );
if ( ! Ref::Util::is_arrayref $apps ) {
if ( Ref::Util::is_hashref $apps ) {
if ( my $check = $apps->{apps} ) {
if ( ! keys %{ $check }) {
$apps = [];
}
else {
die sprintf "[TODO-1] Don't know what to do with %s",
Data::Dumper::Dumper [ $user => $apps ],
;
}
}
}
else {
die sprintf "[TODO-2] Don't know what to do with %s",
Data::Dumper::Dumper [ $user => $apps ],
;
}
}
my $format_epoch = sub {
my $epoch = shift || die "No epoch specified!";
return POSIX::strftime "%a %b %d %Y %H:%M:%S %Z", localtime $epoch;
};
my %apps_by_state;
foreach my $app ( @{ $apps } ) {
foreach my $resource ( qw(
allocatedMB
allocatedVCores
) ) {
$app->{ $resource } = 0 if $app->{ $resource } eq '-1';
}
if ( $app->{allocatedMB} ) {
$app->{allocatedMB_fmt} = $self->format_bytes( $app->{allocatedMB} * 1024**2 );
}
if ( $app->{allocatedVCores} ) {
$app->{allocatedVCores_fmt} = sprintf '%s vCore%s',
$app->{allocatedVCores},
$app->{allocatedVCores} > 1 ? 's' : '',
;
}
# TODO
# [STRING]"applicationTags"
# the value is something like "oozie-59a27f107d250c9822fd45e87fd40db8"
# which is not the job id.
foreach my $hash_or_string ( qw(
diagnostics
applicationTags
)) {
next if ! exists $app->{ $hash_or_string };
# This is a bug in the REST layer
if ( Ref::Util::is_hashref $app->{ $hash_or_string }
&& ! keys %{ $app->{ $hash_or_string } }
) {
$app->{ $hash_or_string } = '';
}
}
# https://www.cloudera.com/documentation/enterprise/latest/topics/cm_dg_yarn_applications.html
foreach my $duration_field ( qw(
vcoreSeconds
elapsedTime
memorySeconds
) ) {
next if ! exists $app->{ $duration_field };
if ( $app->{ $duration_field } ) {
$app->{ $duration_field . '_fmt' } = Time::Duration::duration(
$duration_field eq 'elapsedTime'
? $app->{ $duration_field } / 1000
: $app->{ $duration_field }
);
}
}
foreach my $time_field ( qw(
finishedTime
startedTime
) ) {
next if ! exists $app->{ $time_field };
if ( $app->{ $time_field } ) {
$app->{ $time_field . '_fmt' } = $format_epoch->( $app->{ $time_field } / 1000);
}
}
if ( $app->{name} =~ m{ \Q-oozie-oozi-W\E \z }xms ) {
my %name = map { @{ $_ } > 1 ? @{ $_ } : ( $_->[0] => 1 ) }
map { [ split m{ [=] }xms, $_, 2 ] }
split m{ [:] }xms, $app->{name};
$name{workflow_name} = delete $name{W} if $name{W};
$name{action_name} = delete $name{A} if $name{A};
$name{action_type} = delete $name{T} if $name{T};
$name{id} = delete $name{ID} if $name{ID};
$app->{oozie_meta} = \%name;
$app->{oozie_id} = $name{id} if $name{id};
}
push @{ $apps_by_state{ $app->{state} } ||= [] }, $app;
}
my %total_res;
foreach my $app ( @{ $apps_by_state{RUNNING} }) {
$total_res{allocatedMB} += $app->{allocatedMB};
$total_res{allocatedVCores} += $app->{allocatedVCores};
}
if ( $total_res{allocatedMB} ) {
$total_res{allocatedMB_fmt} = $self->format_bytes( $total_res{allocatedMB} * 1024**2 );
}
if ( $total_res{allocatedVCores} ) {
$total_res{allocatedVCores_fmt} = sprintf '%s vCore%s',
$total_res{allocatedVCores},
$total_res{allocatedVCores} > 1 ? 's' : '',
;
}
my @grouped;
foreach my $ordered_state (qw(
RUNNING
ACCEPTED
FINISHED
KILLED
FAILED
)) {
push @grouped, {
state => lc( $ordered_state ),
state_fmt => ucfirst( lc $ordered_state ),
apps => delete( $apps_by_state{$ordered_state} ) || [],
},
}
# TODO: possibly needs to be removed if we are sure that the code above
# is handling all of the possible states. So, this is a "just in case" part
#
push @grouped, {
state => 'rest',
apps => [ map { @{ $_ } } values %apps_by_state ],
};
# Spark jobs are returned like this for whatever reason.
if ( my $apps = $grouped[-1]->{apps} ) {
if ( Ref::Util::is_arrayref $apps && Ref::Util::is_arrayref $apps->[0] ) {
$grouped[-1]->{apps} = [ @{ $apps->[0] } ];
}
}
return {
grouped_apps => [ grep { @{ $_->{apps} } > 0 } @grouped ],
total_apps => scalar @{ $apps },
resources => \%total_res,
user => $user,
};
}
sub format_bytes {
my $self = shift;
my $bytes = shift;
return sprintf '%.2f GB', $bytes / 1024**3;
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::Hadoop::YARN::ResourceManager::Scheduler::UserApps
=head1 VERSION
version 0.203
=head1 SYNOPSIS
use Net::Hadoop::YARN::ResourceManager::Scheduler::UserApps;
my $uapps = Net::Hadoop::YARN::ResourceManager::Scheduler::UserApps->new(%opt);
my $stats = $uapps->collect( $user_name );
=head1 DESCRIPTION
User application stats in the Resource Manager.
=head1 NAME
Net::Hadoop::YARN::ResourceManager::Scheduler::UserApps - User application stats in the Resource Manager
=head1 METHODS
=head2 new
Available options:
=over 4
=item rm_object
The default C<rm_object> (resouce manager) can be overridden with this option.
The object needs to be a subclass of C<Net::Hadoop::YARN::ResourceManager>.
( run in 2.093 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )