FusionInventory-Agent

 view release on metacpan or  search on metacpan

lib/FusionInventory/Agent/Task/NetInventory.pm  view on Meta::CPAN

use parent 'FusionInventory::Agent::Task';

use Encode qw(encode);
use English qw(-no_match_vars);
use Time::HiRes qw(usleep);
use Thread::Queue v2.01;
use UNIVERSAL::require;

use FusionInventory::Agent::XML::Query;
use FusionInventory::Agent::Version;
use FusionInventory::Agent::Tools;
use FusionInventory::Agent::Tools::Hardware;
use FusionInventory::Agent::Tools::Network;
use FusionInventory::Agent::Tools::Expiration;

use FusionInventory::Agent::Task::NetInventory::Version;
use FusionInventory::Agent::Task::NetInventory::Job;

our $VERSION = FusionInventory::Agent::Task::NetInventory::Version::VERSION;

sub isEnabled {
    my ($self, $response) = @_;

    if (!$self->{target}->isType('server')) {
        $self->{logger}->debug("NetInventory task not compatible with local target");
        return;
    }

    my @options = $response->getOptionsInfoByName('SNMPQUERY');
    if (!@options) {
        $self->{logger}->debug("NetInventory task execution not requested");
        return;
    }

    my @jobs;
    foreach my $option (@options) {
        if (!$option->{DEVICE}) {
            $self->{logger}->error("invalid job: no device defined");
            next;
        }

        my @devices;
        foreach my $device (@{$option->{DEVICE}}) {
            if (!$device->{IP}) {
                $self->{logger}->error("invalid device: no address defined");
                next;
            }
            push @devices, $device;
        }

        if (!@devices) {
            $self->{logger}->error("invalid job: no valid device defined");
            next;
        }

        my $params = $option->{PARAM}->[0];

        push @jobs, FusionInventory::Agent::Task::NetInventory::Job->new(
            logger      => $self->{logger},
            params      => $params,
            credentials => $option->{AUTHENTICATION},
            devices     => \@devices
        );
    }

    if (!@jobs) {
        $self->{logger}->error("no valid job found, aborting");
        return;
    }

    $self->{jobs} = \@jobs;

    return 1;
}

sub _inventory_thread {
    my ($self, $jobs, $done) = @_;

    my $id = threads->tid();
    $self->{logger}->debug("[thread $id] creation");

    # run as long as they are a job to process
    while (my $job = $jobs->dequeue()) {

        last unless ref($job) eq 'HASH';
        last if $job->{leave};

        my $device = $job->{device};

        my $result;
        eval {
            $result = $self->_queryDevice($job);
        };
        if ($EVAL_ERROR) {
            chomp $EVAL_ERROR;
            $result = {
                ERROR => {
                    ID      => $device->{ID},
                    MESSAGE => $EVAL_ERROR
                }
            };

            $result->{ERROR}->{TYPE} = $device->{TYPE} if $device->{TYPE};

            # Inserted back device PID in result if set by server
            $result->{PID} = $device->{PID} if defined($device->{PID});

            $self->{logger}->error("[thread $id] $EVAL_ERROR");
        }

        # Get result PID from result
        my $pid = delete $result->{PID};

        # Directly send the result message from the thread, but use job pid if
        # it was not set in result
        $self->_sendResultMessage($result, $pid || $job->{pid});

        $done->enqueue($job);
    }

    delete $self->{logger}->{prefix};

lib/FusionInventory/Agent/Task/NetInventory.pm  view on Meta::CPAN

    $self->{_client_params} = {
        logger       => $self->{logger},
        user         => $params{user},
        password     => $params{password},
        proxy        => $params{proxy},
        ca_cert_file => $params{ca_cert_file},
        ca_cert_dir  => $params{ca_cert_dir},
        no_ssl_check => $params{no_ssl_check},
        no_compress  => $params{no_compress},
    } if !$self->{client};

    # Extract greatest max_threads from jobs
    my ($max_threads) = sort { $b <=> $a } map { int($_->max_threads()) }
        @{$self->{jobs}};

    my %running_threads = ();

    # initialize FIFOs
    my $jobs = Thread::Queue->new();
    my $done = Thread::Queue->new();

    # count devices and check skip_start_stop
    my $devices_count   = 0;
    my $skip_start_stop = 0;
    foreach my $job (@{$self->{jobs}}) {
        $devices_count += $job->count();
        # newer server won't need START message if PID is provided on <DEVICE/>
        $skip_start_stop = any { defined($_->{PID}) } $job->devices()
            unless $skip_start_stop;
    }

    # Define a job expiration: 15 minutes by device to scan is large enough
    setExpirationTime( timeout => $devices_count * 900 );
    my $expiration = getExpirationTime();

    # no need more threads than devices to scan
    my $threads_count = $max_threads > $devices_count ? $devices_count : $max_threads;

    $self->{logger}->debug("creating $threads_count worker threads");
    for (my $i = 0; $i < $threads_count; $i++) {
        my $newthread = threads->create(sub { $self->_inventory_thread($jobs, $done); });
        # Keep known created threads in a hash
        $running_threads{$newthread->tid()} = $newthread ;
        usleep(50000) until ($newthread->is_running() || $newthread->is_joinable());
    }

    # Check really started threads number vs really running ones
    my @really_running  = map { $_->tid() } threads->list(threads::running);
    my @started_threads = keys(%running_threads);
    unless (@really_running == $threads_count && keys(%running_threads) == $threads_count) {
        $self->{logger}->debug(scalar(@really_running)." really running: [@really_running]");
        $self->{logger}->debug(scalar(@started_threads)." started: [@started_threads]");
    }

    my %queues = ();
    my $pid_index = 1;

    # Start jobs by preparing queues
    foreach my $job (@{$self->{jobs}}) {

        # SNMP credentials
        my $credentials = $job->credentials();

        # set pid
        my $pid = $job->pid() || $pid_index++;

        # send initial message to server unless it supports newer protocol
        $self->_sendStartMessage($pid) unless $skip_start_stop;

        # prepare queue
        my $queue = $queues{$pid} || {
            max_in_queue    => $job->max_threads(),
            in_queue        => 0,
            todo            => []
        };
        foreach my $device ($job->devices()) {
            push @{$queue->{todo}}, {
                pid         => $pid,
                device      => $device,
                timeout     => $job->timeout(),
                credentials => $credentials->{$device->{AUTHSNMP_ID}}
            };
        }

        # Only keep queue if we have a device to scan
        $queues{$pid} = $queue
            if @{$queue->{todo}};
    }

    my $queued_count = 0;
    my $job_count = 0;
    my $jid_len = length(sprintf("%i",$devices_count));
    my $jid_pattern = "#%0".$jid_len."i";

    # We need to guaranty we don't have more than max_in_queue device in shared
    # queue for each job
    while (my @pids = sort { $a <=> $b } keys(%queues)) {

        # Enqueue as device as possible
        foreach my $pid (@pids) {
            my $queue = $queues{$pid};
            next unless @{$queue->{todo}};
            next if $queue->{in_queue} >= $queue->{max_in_queue};
            my $device = shift @{$queue->{todo}};
            $queue->{in_queue} ++;
            $device->{jid} = sprintf($jid_pattern, ++$job_count);
            $jobs->enqueue($device);
            $queued_count++;
        }

        # as long as some of our threads are still running...
        if (keys(%running_threads)) {

            # send available results on the fly
            while (my $device = $done->dequeue_nb()) {
                my $pid = $device->{pid};
                my $queue = $queues{$pid};
                $queue->{in_queue} --;
                $queued_count--;
                unless ($queue->{in_queue} || @{$queue->{todo}}) {
                    # send final message to the server before cleaning threads unless it supports newer protocol
                    $self->_sendStopMessage($pid) unless $skip_start_stop;

                    delete $queues{$pid};

                    # send final message to the server unless it supports newer protocol
                    $self->_sendStopMessage($pid) unless $skip_start_stop;
                }
                # Check if it's time to abort a thread
                $devices_count--;
                if ($devices_count < $threads_count) {
                    $jobs->enqueue({ leave => 1 });
                    $threads_count--;
                }
            }

            # wait for a little
            usleep(50000);

            if ($expiration && time > $expiration) {
                $self->{logger}->warning("Aborting netinventory job as it reached expiration time");

lib/FusionInventory/Agent/Task/NetInventory.pm  view on Meta::CPAN

    setExpirationTime();
}

sub _sendMessage {
    my ($self, $content) = @_;

    my $message = FusionInventory::Agent::XML::Query->new(
        deviceid => $self->{deviceid} || 'foo',
        query    => 'SNMPQUERY',
        content  => $content
    );

    # task-specific client, if needed
    $self->{client} = FusionInventory::Agent::HTTP::Client::OCS->new(%{$self->{_client_params}})
        if !$self->{client};

    $self->{client}->send(
        url     => $self->{target}->getUrl(),
        message => $message
    );
}

sub _sendStartMessage {
    my ($self, $pid) = @_;

    $self->_sendMessage({
        AGENT => {
            START        => 1,
            AGENTVERSION => $FusionInventory::Agent::Version::VERSION,
        },
        MODULEVERSION => $VERSION,
        PROCESSNUMBER => $pid
    });
}

sub _sendStopMessage {
    my ($self, $pid) = @_;

    $self->_sendMessage({
        AGENT => {
            END => 1,
        },
        MODULEVERSION => $VERSION,
        PROCESSNUMBER => $pid
    });
}

sub _sendResultMessage {
    my ($self, $result, $pid) = @_;

    $self->_sendMessage({
        DEVICE        => $result,
        MODULEVERSION => $VERSION,
        PROCESSNUMBER => $pid || 0
    });
}

sub _queryDevice {
    my ($self, $params) = @_;

    my $credentials = $params->{credentials};
    my $device      = $params->{device};
    my $logger      = $self->{logger};
    my $id          = threads->tid();
    $logger->{prefix} = "[thread $id] $params->{jid}, ";
    $logger->debug(
        "scanning $device->{ID}: $device->{IP}" .
        ( $device->{PORT} ? ' on port ' . $device->{PORT} : '' ) .
        ( $device->{PROTOCOL} ? ' via ' . $device->{PROTOCOL} : '' )
    );

    my $snmp;
    if ($device->{FILE}) {
        FusionInventory::Agent::SNMP::Mock->require();
        eval {
            $snmp = FusionInventory::Agent::SNMP::Mock->new(
                ip   => $device->{IP},
                file => $device->{FILE}
            );
        };
        die "SNMP emulation error: $EVAL_ERROR" if $EVAL_ERROR;
    } else {
        eval {
            FusionInventory::Agent::SNMP::Live->require();
            $snmp = FusionInventory::Agent::SNMP::Live->new(
                version      => $credentials->{VERSION},
                hostname     => $device->{IP},
                port         => $device->{PORT},
                domain       => $device->{PROTOCOL},
                timeout      => $params->{timeout} || 15,
                community    => $credentials->{COMMUNITY},
                username     => $credentials->{USERNAME},
                authpassword => $credentials->{AUTHPASSPHRASE},
                authprotocol => $credentials->{AUTHPROTOCOL},
                privpassword => $credentials->{PRIVPASSPHRASE},
                privprotocol => $credentials->{PRIVPROTOCOL},
            );
        };
        die "SNMP communication error: $EVAL_ERROR" if $EVAL_ERROR;
    }

    my $result = getDeviceFullInfo(
         id      => $device->{ID},
         type    => $device->{TYPE},
         snmp    => $snmp,
         model   => $params->{model},
         logger  => $self->{logger},
         datadir => $self->{datadir}
    );

    # Inserted back device PID in result if set by server
    $result->{PID} = $device->{PID} if defined($device->{PID});

    return $result;
}

1;

__END__

=head1 NAME

FusionInventory::Agent::Task::NetInventory - Remote inventory support for FusionInventory Agent

=head1 DESCRIPTION

This task extracts various information from remote hosts through SNMP
protocol:

=over

=item *

printer cartridges and counters status

=item *

router/switch ports status

=item *

relations between devices and router/switch ports

=back

This task requires a GLPI server with FusionInventory plugin.

=head1 AUTHORS

Copyright (C) 2009 David Durieux
Copyright (C) 2010-2012 FusionInventory Team



( run in 0.770 second using v1.01-cache-2.11-cpan-39bf76dae61 )