FusionInventory-Agent

 view release on metacpan or  search on metacpan

lib/FusionInventory/Agent/Task/NetDiscovery.pm  view on Meta::CPAN

use FusionInventory::Agent::Task::NetDiscovery::Job;

our $VERSION = FusionInventory::Agent::Task::NetDiscovery::Version::VERSION;

sub isEnabled {
    my ($self, $response) = @_;

    if (!$self->{target}->isType('server')) {
        $self->{logger}->debug("NetDiscovery task not compatible with local target");
        return;
    }

    my @options = $response->getOptionsInfoByName('NETDISCOVERY');
    if (!@options) {
        $self->{logger}->debug("NetDiscovery task execution not requested");
        return;
    }

    my @jobs;
    foreach my $option (@options) {
        if (!$option->{RANGEIP}) {
            $self->{logger}->error("invalid job: no IP range defined");
            next;
        }

        my @ranges;
        foreach my $range (@{$option->{RANGEIP}}) {
            if (!$range->{IPSTART}) {
                $self->{logger}->error(
                    "invalid range: no first address defined"
                );
                next;
            }
            if (!$range->{IPEND}) {
                $self->{logger}->error(
                    "invalid range: no last address defined"
                );
                next;
            }
            push @ranges, $range;
        }

        if (!@ranges) {
            $self->{logger}->error("invalid job: no valid IP range defined");
            next;
        }

        my $params = $option->{PARAM}->[0];
        if (!$params) {
            $self->{logger}->error("invalid job: no PARAM defined");
            next;
        }
        if (!defined($params->{PID})) {
            $self->{logger}->error("invalid job: no PID defined");
            next;
        }

        push @jobs, FusionInventory::Agent::Task::NetDiscovery::Job->new(
            logger      => $self->{logger},
            params      => $params,
            credentials => $option->{AUTHENTICATION},
            ranges      => \@ranges,
        );
    }

    if (!@jobs) {
        $self->{logger}->error("no valid job found, aborting");
        return;
    }

    $self->{jobs} = \@jobs;

    return 1;
}

sub _discovery_thread {
    my ($self, $jobs, $done) = @_;

    my $count = 0;

    my $id = threads->tid();
    $self->{logger}->debug("[thread $id] creation");

    # run as long as they are a job to process
    while (my $job = $jobs->dequeue()) {

        last unless ref($job) eq 'HASH';
        last if $job->{leave};

        my $result = $self->_scanAddress($job);

        if ($result && defined($job->{entity})) {
            $result->{ENTITY} = $job->{entity};
        }

        # Only send result if a device was found which involves setting IP
        $self->_sendResultMessage($result, $job->{pid})
            if $result->{IP};

        $done->enqueue($job);
        $count ++;
    }

    delete $self->{logger}->{prefix};

    $self->{logger}->debug2("[thread $id] processed $count scans");
    $self->{logger}->debug("[thread $id] termination");
}

sub run {
    my ($self, %params) = @_;

    # Prepare client configuration in needed to send message to server
    $self->{_client_params} = {
        logger       => $self->{logger},
        user         => $params{user},
        password     => $params{password},
        proxy        => $params{proxy},
        ca_cert_file => $params{ca_cert_file},
        ca_cert_dir  => $params{ca_cert_dir},
        no_ssl_check => $params{no_ssl_check},
        no_compress  => $params{no_compress},
    } if !$self->{client};

    # check discovery methods available
    if (canRun('arp')) {
        $self->{arp} = 'arp -a';
    } elsif (canRun('ip')) {
        $self->{arp} = 'ip neighbor show';
    } else {
        $self->{logger}->info(
            "Can't run 'ip neighbor show' or 'arp' command, arp table detection can't be used"
        );
    }

    Net::Ping->require();
    if ($EVAL_ERROR) {
        $self->{logger}->info(
            "Can't load Net::Ping, echo ping can't be used"
        );
    }

    Net::NBName->require();
    if ($EVAL_ERROR) {
        $self->{logger}->info(
            "Can't load Net::NBName, netbios can't be used"
        );
    }

    FusionInventory::Agent::SNMP::Live->require();
    if ($EVAL_ERROR) {
        $self->{logger}->info(
            "Can't load FusionInventory::Agent::SNMP::Live, snmp detection " .
            "can't be used"
        );
    }

    # Extract greatest max_threads from jobs
    my ($max_threads) = sort { $b <=> $a } map { int($_->max_threads()) }
        @{$self->{jobs}};

    my %running_threads = ();
    my %queues = ();

    # initialize FIFOs
    my $jobs = Thread::Queue->new();
    my $done = Thread::Queue->new();

    # Start jobs by preparing range queues and counting ips
    my $max_count = 0;
    foreach my $job (@{$self->{jobs}}) {
        my $pid = $job->pid;

        my $queue = {
            max_in_queue        => $job->max_threads(),
            in_queue            => 0,
            timeout             => $job->timeout(),
            snmp_credentials    => $job->getValidCredentials(),
        };

        my $size = 0;
        my @list = ();

        # process each address block
        foreach my $range ($job->ranges()) {
            my $start = $range->{start};
            my $end   = $range->{end};
            my $block = Net::IP->new( "$start-$end" );
            if (!$block || $block->{binip} !~ /1/) {
                $self->{logger}->error(
                    "IPv4 range not supported by Net::IP: $start-$end"
                );
                next;
            }

            $self->{logger}->debug("scanning block $start-$end");

            # send initial message to the server
            $self->_sendStartMessage($pid);

            do {
                push @list, {
                    ip              => $block->ip(),
                    snmp_ports      => $range->{ports},
                    snmp_domains    => $range->{domains},
                    entity          => $range->{entity},
                };
            } while (++$block);

            # Update ip addresses size for this job
            $size += scalar(@list);
        }

        next unless $size;

        # Update queues
        $queue->{list} = \@list;
        $queues{$pid} = $queue;

        # Update total count
        $max_count += $size;

        # send block size to the server
        $self->_sendBlockMessage($pid, $size);
    }

    # Don't keep client until we created threads to avoid segfault if SSL is used
    # we older openssl libs, but only if it is still not set by a script
    delete $self->{client} if $self->{_client_params};

    # Define a realistic block scan expiration : at least one minute by address
    my $target_expiration = $params{target_expiration} || 60;
    $target_expiration = 60 if ($target_expiration < 60);
    setExpirationTime( timeout => $max_count * $target_expiration );
    my $expiration = getExpirationTime();

    # no need more threads than ips to scan
    my $threads_count = $max_threads > $max_count ? $max_count : $max_threads;

    $self->{logger}->debug("creating $threads_count worker threads");
    for (my $i = 0; $i < $threads_count; $i++) {
        my $newthread = threads->create(sub { $self->_discovery_thread($jobs, $done); });
        # Keep known created threads in a hash
        $running_threads{$newthread->tid()} = $newthread ;
        usleep(50000) until ($newthread->is_running() || $newthread->is_joinable());
    }

    # Check really started threads number vs really running ones
    my @really_running  = map { $_->tid() } threads->list(threads::running);
    my @started_threads = keys(%running_threads);
    unless (@really_running == $threads_count && keys(%running_threads) == $threads_count) {
        $self->{logger}->debug(scalar(@really_running)." really running: [@really_running]");
        $self->{logger}->debug(scalar(@started_threads)." started: [@started_threads]");
    }

    my $queued_count = 0;
    my $job_count = 0;
    my $jid_len = length(sprintf("%i",$max_count));
    my $jid_pattern = "#%0".$jid_len."i";

    # We need to guaranty we don't have more than max_in_queue device in shared
    # queue for each job
    while (my @pids = sort { $a <=> $b } keys(%queues)) {

        # Enqueue as ip as possible
        foreach my $pid (@pids) {
            my $queue = $queues{$pid};
            next unless @{$queue->{list}};
            next if $queue->{in_queue} >= $queue->{max_in_queue};
            my $address = shift @{$queue->{list}};
            # Update address hash with common parameters
            $address->{pid} = $pid;
            $address->{timeout} = $queue->{timeout};
            $address->{snmp_credentials} = $queue->{snmp_credentials};
            $address->{jid} = sprintf($jid_pattern, ++$job_count);
            $queue->{in_queue} ++;
            $jobs->enqueue($address);
            $queued_count++;
        }

        # as long as some of our threads are still running...
        if (keys(%running_threads)) {

            # send available results on the fly
            while (my $address = $done->dequeue_nb()) {
                my $pid = $address->{pid};
                my $queue = $queues{$pid};
                $queue->{in_queue} --;
                $queued_count--;
                unless ($queue->{in_queue} || @{$queue->{list}}) {
                    # send final message to the server before cleaning threads
                    $self->_sendStopMessage($pid);

                    delete $queues{$pid};

                    # send final message to the server
                    $self->_sendStopMessage($pid);
                }
                # Check if it's time to abort a thread
                $max_count--;
                if ($max_count < $threads_count) {
                    $jobs->enqueue({ leave => 1 });
                    $threads_count--;
                }
            }

            # wait for a little
            usleep(50000);

            if ($expiration && time > $expiration) {
                $self->{logger}->warning("Aborting netdiscovery task as it reached expiration time");
                # detach all our running worker
                foreach my $tid (keys(%running_threads)) {
                    $running_threads{$tid}->detach()
                        if $running_threads{$tid}->is_running();
                    delete $running_threads{$tid};
                }
                last;
            }

            # List our created and possibly running threads in a list to check
            my %running_threads_checklist = map { $_ => 0 }
                keys(%running_threads);

            foreach my $running (threads->list(threads::running)) {
                my $tid = $running->tid();
                # Skip if this running thread tid is not is our started list
                next unless exists($running_threads{$tid});

                # Check a thread is still running
                $running_threads_checklist{$tid} = 1 ;
            }

            # Clean our started list from thread tid that don't run anymore

lib/FusionInventory/Agent/Task/NetDiscovery.pm  view on Meta::CPAN

    } elsif ($NetPingVersion >= 2.67) {
        $type = 'timestamp';
        $np->message_type($type);
        if ($np->ping($params->{ip})) {
            $device{DNSHOSTNAME} = $params->{ip};
        }
    }

    $self->{logger}->debug(
        sprintf "- scanning %s with $type ping: %s",
        $params->{ip},
        $device{DNSHOSTNAME} ? 'success' : 'no result'
    );

    return %device;
}

sub _scanAddressByNetbios {
    my ($self, $params) = @_;

    my $nb = Net::NBName->new();

    my $ns = $nb->node_status($params->{ip});

    $self->{logger}->debug(
        sprintf "- scanning %s with netbios: %s",
        $params->{ip},
        $ns ? 'success' : 'no result'
    );
    return unless $ns;

    my %device;
    foreach my $rr ($ns->names()) {
        my $suffix = $rr->suffix();
        my $G      = $rr->G();
        my $name   = $rr->name();
        if ($suffix == 0 && $G eq 'GROUP') {
            $device{WORKGROUP} = getSanitizedString($name);
        }
        if ($suffix == 3 && $G eq 'UNIQUE') {
            $device{USERSESSION} = getSanitizedString($name);
        }
        if ($suffix == 0 && $G eq 'UNIQUE') {
            $device{NETBIOSNAME} = getSanitizedString($name)
                unless $name =~ /^IS~/;
        }
    }

    $device{MAC} = $ns->mac_address();
    $device{MAC} =~ tr/-/:/;

    return %device;
}

sub _scanAddressBySNMP {
    my ($self, $params) = @_;

    my $tries = [];
    if ($params->{snmp_ports} && @{$params->{snmp_ports}}) {
        foreach my $port (@{$params->{snmp_ports}}) {
            my @cases = map { { port => $port, credential => $_ } } @{$params->{snmp_credentials}};
            push @{$tries}, @cases;
        }
    } else {
        @{$tries} = map { { credential => $_ } } @{$params->{snmp_credentials}};
    }
    if ($params->{snmp_domains} && @{$params->{snmp_domains}}) {
        my @domtries = ();
        foreach my $domain (@{$params->{snmp_domains}}) {
            foreach my $try (@{$tries}) {
                $try->{domain} = $domain;
            }
            push @domtries, @{$tries};
        }
        $tries = \@domtries;
    }

    foreach my $try (@{$tries}) {
        my $credential = $try->{credential};
        my $device = $self->_scanAddressBySNMPReal(
            ip         => $params->{ip},
            port       => $try->{port},
            domain     => $try->{domain},
            timeout    => $params->{timeout},
            credential => $credential
        );

        # no result means either no host, no response, or invalid credentials
        $self->{logger}->debug(
            sprintf "- scanning %s%s with SNMP%s, credentials %d: %s",
            $params->{ip},
            $try->{port}   ? ':'.$try->{port}   : '',
            $try->{domain} ? ' '.$try->{domain} : '',
            $credential->{ID},
            ref $device eq 'HASH' ? 'success' :
                $device ? "no result, $device" : 'no result'
        );

        if (ref $device eq 'HASH') {
            $device->{AUTHSNMP} = $credential->{ID};
            return %{$device};
        }
    }

    return;
}

sub _scanAddressBySNMPReal {
    my ($self, %params) = @_;

    my $snmp;
    eval {
        $snmp = FusionInventory::Agent::SNMP::Live->new(
            version      => $params{credential}->{VERSION},
            hostname     => $params{ip},
            port         => $params{port},
            domain       => $params{domain},
            timeout      => $params{timeout} || 1,
            community    => $params{credential}->{COMMUNITY},
            username     => $params{credential}->{USERNAME},
            authpassword => $params{credential}->{AUTHPASSPHRASE},
            authprotocol => $params{credential}->{AUTHPROTOCOL},
            privpassword => $params{credential}->{PRIVPASSPHRASE},
            privprotocol => $params{credential}->{PRIVPROTOCOL},
        );
    };

    # an exception here just means no device or wrong credentials
    return $EVAL_ERROR if $EVAL_ERROR;

    my $info = getDeviceInfo(
        snmp    => $snmp,
        datadir => $self->{datadir},
        logger  => $self->{logger},
    );
    return unless $info;

    return $info;
}

sub _sendStartMessage {
    my ($self, $pid) = @_;

    $self->_sendMessage({
        AGENT => {
            START        => 1,
            AGENTVERSION => $FusionInventory::Agent::Version::VERSION,
        },
        MODULEVERSION => $VERSION,
        PROCESSNUMBER => $pid
    });
}

sub _sendStopMessage {
    my ($self, $pid) = @_;

    $self->_sendMessage({
        AGENT => {
            END => 1,
        },
        MODULEVERSION => $VERSION,
        PROCESSNUMBER => $pid
    });
}

sub _sendBlockMessage {
    my ($self, $pid, $count) = @_;

    $self->_sendMessage({
        AGENT => {
            NBIP => $count
        },
        PROCESSNUMBER => $pid
    });
}

sub _sendResultMessage {
    my ($self, $result, $pid) = @_;

    $self->_sendMessage({
        DEVICE        => [$result],
        MODULEVERSION => $VERSION,
        PROCESSNUMBER => $pid
    });
}

1;



( run in 1.073 second using v1.01-cache-2.11-cpan-39bf76dae61 )