EMDIS-ECS

 view release on metacpan or  search on metacpan

script/ecs_scan_mail  view on Meta::CPAN

    if(time() >= $processing_cutoff_time) {
        log_info("process_maildrop(): skipped due to processing time limit");
        return;
    }

    print "$DEBUG_LABEL process_maildrop()\n"
        if $ECS_CFG->{ECS_DEBUG} > 0;

    # get sorted list of files in directory
    if(not opendir(MAILDROP, $ECS_CFG->ECS_DRP_DIR)) {
        log_error("process_maildrop(): unable to open \"maildrop\" " .
                        "directory: " . $ECS_CFG->ECS_DRP_DIR);
        return;
    }
    my $file;
    my @filelist = ();
    while(defined($file = readdir(MAILDROP))) {
        push(@filelist, $file);
    }
    closedir(MAILDROP);
    @filelist = sort @filelist;

    if($#filelist >= 0) {
        log_warn("process_maildrop(): the maildrop feature is deprecated " .
            "and will removed in a future version of this software.");
    }

    # process each file
    for $file (@filelist)
    {
        last if $interrupted;

        # check whether processing time limit has been exceeded
        if(time() >= $processing_cutoff_time) {
            log_info("process_amqp_staging(): exiting early due to " .
                "processing time limit (file: $file)");
            last;
        }

        next if($file eq '.') or ($file eq '..');
        my $filename = catfile($ECS_CFG->ECS_DRP_DIR, $file);
        next unless -f $filename;
        print "$DEBUG_LABEL processing \"maildrop\" file: $filename\n"
            if $ECS_CFG->{ECS_DEBUG} > 0;
        my $msg = new EMDIS::ECS::FileBackedMessage($ECS_CFG->THIS_NODE,'',$filename);
        if(not ref $msg)
        {
            log_error("process_maildrop(): unable to load file " .
                      "$filename: $msg");
            last; # don't continue
        }
        my $err = $msg->inspect_fml();
        if($err)
        {
            log_error("process_maildrop(): unable to inspect FML in " .
                      "$filename: $err");
            last; # don't continue
        }

        # retrieve node status from node_tbl
        my $was_locked = $ECS_NODE_TBL->LOCK;
        if(not $was_locked)
        {
            # lock ECS_NODE_TBL
            if(not $ECS_NODE_TBL->lock())
            {
                log_error("process_maildrop(): unable to lock ECS_NODE_TBL: " .
                          $ECS_NODE_TBL->ERROR);
                last;
            }
        }

        my $node = $ECS_NODE_TBL->read($msg->hub_rcv);
        $ECS_NODE_TBL->unlock() unless $was_locked;

        if (not defined $node) {
           log_error("process_maildrop(): cannot read node '" . $msg->hub_rcv . "'");
           next;
        }

        # don't process the message, if the receiving node is disabled
        if (not ( (exists $node->{node_disabled}) and
                  is_yes($node->{node_disabled}) ) )
        {
           my $err = $msg->send_this_message($msg->hub_rcv);
           undef $msg;   # closes file

           if($err)
           {
              log_error("process_maildrop(): unable to send file " .
                        "$filename: $err");
              last;   # CAVE: don't continue,
                      # otherwise you risk a inconsistent DB!
           }
           unlink $filename;
        }
        else {
           log_info("process_maildrop(): skipping message '$filename' to " .
                    "node " . $msg->hub_rcv . " (node_disabled=$node->{node_disabled})."
                   );
        }
    }
}

# ----------------------------------------------------------------------
# Process outgoing messages in to_XX folders (if $ECS_CFG->ECS_TO_DIR)
sub process_to_XX
{
    my $processing_start_time = shift;
    my $processing_cutoff_time = shift;

    # check whether processing time limit has been exceeded
    if(time() >= $processing_cutoff_time) {
        log_info("process_to_XX(): skipped due to processing time limit");
        return;
    }

    my $to_dir = $ECS_CFG->ECS_TO_DIR;
    print "$DEBUG_LABEL process_to_XX() --> to_XX directories\n"
        if $ECS_CFG->{ECS_DEBUG} > 0;

    # run over to_XX directories:
    if ( $ECS_NODE_TBL->lock() ) {                # lock node_tbl
       my @keys = sort $ECS_NODE_TBL->keys();
       $ECS_NODE_TBL->unlock();                   # unlock node_tbl
       foreach my $node_id ( @keys ) {
          next if $node_id eq $ECS_CFG->THIS_NODE;

          if(not $ECS_NODE_TBL->lock())
          {
             log_error("process_to_XX(): unable to lock ECS_NODE_TBL: " .
                       $ECS_NODE_TBL->ERROR);
             last;
          }

script/ecs_scan_mail  view on Meta::CPAN

{
    my @a = split '_', $a;
    my @b = split '_', $b;
    my $c = $a[0] <=> $b[0];
    return ($c != 0 ? $c : $a[1] <=> $b[1]);
}

# ----------------------------------------------------------------------
# Process incoming messages and documents in "store" folder
sub process_store
{
    my $processing_start_time = shift;
    my $processing_cutoff_time = shift;

    # check whether processing time limit has been exceeded
    if(time() >= $processing_cutoff_time) {
        log_info("process_store(): skipped due to processing time limit");
        return;
    }

    my @nodelist = ();
    my $nodes = {};

    print "$DEBUG_LABEL process_store()\n"
        if $ECS_CFG->{ECS_DEBUG} > 0;

    update_statistics(\@nodelist, $nodes);

    # quick hack to help prevent single node from monopolizing processing loop
    if($#nodelist > 0)
    {
        my $scan_seq = time() / $ECS_CFG->T_SCN;

        # reverse list on even numbered interval
        @nodelist = reverse @nodelist
            if ($scan_seq % 2) == 0;

        # shift through nodelist, giving each node a chance to go first
        my $offset = ($scan_seq / 2) % ($#nodelist + 1);
        my $pos;
        for($pos = 0; $pos < $offset; $pos++)
        {
            my $node_id = shift @nodelist;
            push @nodelist, $node_id;
        }
    }

    # iterate through nodes and
    # check whether any of the messages are able to be processed
    NODE:
    for my $node_id (@nodelist)
    {
        # check whether processing time limit has been exceeded
        if(time() >= $processing_cutoff_time) {
            log_info("process_store(): exiting early due to processing " .
                "time limit (node: $node_id)");
            last NODE;
        }

        # retrieve node status from node_tbl
        my $was_locked = $ECS_NODE_TBL->LOCK;
        if(not $was_locked)
        {
            # lock ECS_NODE_TBL
            if(not $ECS_NODE_TBL->lock())
            {
                log_error("process_store(): unable to lock ECS_NODE_TBL: " .
                          $ECS_NODE_TBL->ERROR);
                last NODE;
            }
        }
        my $node = $ECS_NODE_TBL->read($node_id);
        $ECS_NODE_TBL->unlock() unless $was_locked;
        # don't try to process if node not found
        if(not ref $node)
        {
            log_error(
                "process_store(): unable to retrieve node $node_id status.");
            next NODE;
        }
        # is node marked as disabled?
        if((exists $node->{node_disabled}) and
            is_yes($node->{node_disabled}))
        {
            log_info("process_store(): skipping node $node_id " .
                "(node_disabled=$node->{node_disabled}).");
            next NODE;
        }
        my $in_seq = $node->{in_seq};
        my $doc_in_seq = $node->{doc_in_seq};

        # process document files in sequential order by seq_num
        my $docmsgpart = $nodes->{$node_id}->{docmsgpart};
        my @docmsglist = sort compare_seq_part keys %$docmsgpart;
        my $processed_seq_num = -1;
        NODE_DOC_MSG_PART:
        for my $seq_part_num (@docmsglist)
        {
            my ($seq_num, $part_num) = split '_', $seq_part_num;

            # skip if this file is part of an already processed document
            next NODE_DOC_MSG_PART if $seq_num <= $processed_seq_num;

            # don't process now if document is still "early"
            if($seq_num > ++$doc_in_seq)
            {
                my $q_doc_gap_seq = (exists $node->{q_doc_gap_seq} ?
                    $node->{q_doc_gap_seq} : 0);
                my $q_doc_gap_time = (exists $node->{q_doc_gap_time} ?
                    $node->{q_doc_gap_time} : time());
                # if q_doc_gap_seq hasn't changed and T_RESEND_DELAY has
                # elapsed, send a batch of up to 100 DOC_RE_SEND requests
                if($seq_num == $q_doc_gap_seq)
                {
                    if((time() - $q_doc_gap_time) > $ECS_CFG->T_RESEND_DELAY)
                    {
                        my $max_resend_seq = $seq_num - 1;
                        $max_resend_seq = $doc_in_seq + 99
                            if $max_resend_seq > ($doc_in_seq + 99);
                        log_info(
                            "process_store(): requesting DOC_RE_SEND for " .
                            "documents $node_id:$doc_in_seq through " .
                            "$node_id:$max_resend_seq");
                        for(my $resend_seq = $doc_in_seq;
                            $resend_seq <= $max_resend_seq; $resend_seq++)
                        {
                            $err = send_ecs_message($node_id, '',
                                "msg_type=DOC_RE_SEND\n",
                                "seq_num=$resend_seq\n",
                                "# random noise: " . rand() . "\n");
                            if($err)
                            {
                                log_error(
                                    "process_store(): unable to send " .
                                    "DOC_RE_SEND request to node $node_id: $err");
                                last;
                            }
                        }
                        $q_doc_gap_time = time();
                    }
                }
                else
                {
                    $q_doc_gap_seq = $seq_num;
                    $q_doc_gap_time = time();
                }
                # update q_doc_gap_seq and q_doc_gap_time in node_tbl
                my $was_locked = $ECS_NODE_TBL->LOCK;
                if(not $was_locked)
                {
                    if(not $ECS_NODE_TBL->lock())
                    {
                        log_error("process_store(): unable to lock " .
                                  "ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR);
                    }
                }
                if($ECS_NODE_TBL->LOCK)
                {
                    my $node = $ECS_NODE_TBL->read($node_id);
                    $node->{q_doc_gap_seq} = $q_doc_gap_seq;
                    $node->{q_doc_gap_time} = $q_doc_gap_time;
                    $ECS_NODE_TBL->write($node_id, $node);
                    $ECS_NODE_TBL->unlock() unless $was_locked;
                }
                last NODE_DOC_MSG_PART;
            }

            # sanity check - docnumparts should always be 1
            if($nodes->{$node_id}->{docnumparts}->{$seq_num} != 1) {
                log_error("process_store(): unexpected error, document parts > 1 " .
                               "for $node_id:$seq_num:DOC");
                last NODE_DOC_MSG_PART;
            }

            # process document
            my $filename = $docmsgpart->{$seq_part_num};
            print "$DEBUG_LABEL processing \"store\" document file: $filename\n"
                if $ECS_CFG->{ECS_DEBUG} > 0;
            my $msg = EMDIS::ECS::Message::read_from_file($filename);
            if(not ref $msg)
            {
                log_error("process_store(): unable to read document " .
                               "from file $filename: $msg");
                last NODE_DOC_MSG_PART;
            }
            $processed_seq_num = $seq_num;
            $err = process_document($msg, $filename, 1);
            if($err)
            {
                if($err =~ /unable to decrypt document/)
                {
                    log_error("process_store(): unable to decrypt document " .
                              "from file $filename (kept in store): $err");
                    # (don't) attempt to process as clear-text
                    # $err = process_document($msg, $filename, 0);
                    last NODE_DOC_MSG_PART;
                }
                else {
                    log_error("process_store(): unable to process document " .
                              "from file $filename: $err");
                    last NODE_DOC_MSG_PART;
                }
            }
            unlink $filename;

            last NODE_DOC_MSG_PART if $processing_cutoff_time <= time();
            last NODE if $interrupted;
        }

        # check whether processing time limit has been exceeded
        if(time() >= $processing_cutoff_time) {
            log_info("process_store(): exiting early due to processing " .
                "time limit (after $node_id doc)");
            last NODE;
        }

        # process message files in sequential order by seq_num
        my $msgpart = $nodes->{$node_id}->{msgpart};
        my @msglist = sort compare_seq_part keys %$msgpart;
        $processed_seq_num = -1;
        NODE_MSG_PART:
        for my $seq_part_num (@msglist)
        {
            my ($seq_num, $part_num) = split '_', $seq_part_num;

            # skip if this file is part of an already processed message
            next NODE_MSG_PART if $seq_num <= $processed_seq_num;

            # don't process now if message is still "early"
            if($seq_num > ++$in_seq)
            {
                my $q_gap_seq = (exists $node->{q_gap_seq} ?
                    $node->{q_gap_seq} : 0);
                my $q_gap_time = (exists $node->{q_gap_time} ?
                    $node->{q_gap_time} : time());
                # if q_gap_seq hasn't changed and T_RESEND_DELAY has
                # elapsed, send a batch of up to 100 RE_SEND requests
                if($seq_num == $q_gap_seq)
                {
                    if((time() - $q_gap_time) > $ECS_CFG->T_RESEND_DELAY)
                    {
                        my $max_resend_seq = $seq_num - 1;
                        $max_resend_seq = $in_seq + 99
                            if $max_resend_seq > ($in_seq + 99);
                        log_info(
                            "process_store(): requesting RE_SEND for " .
                            "messages $node_id:$in_seq through " .
                            "$node_id:$max_resend_seq");
                        for(my $resend_seq = $in_seq;
                            $resend_seq <= $max_resend_seq; $resend_seq++)
                        {
                            $err = send_ecs_message($node_id, '',
                                "msg_type=RE_SEND\n",
                                "seq_num=$resend_seq\n",
                                "# random noise: " . rand() . "\n");
                            if($err)
                            {
                                log_error(
                                    "process_store(): unable to send " .
                                    "RE_SEND request to node $node_id: $err");
                                last;
                            }
                        }
                        $q_gap_time = time();
                    }
                }
                else
                {
                    $q_gap_seq = $seq_num;
                    $q_gap_time = time();
                }
                # update q_gap_seq and q_gap time in node_tbl
                my $was_locked = $ECS_NODE_TBL->LOCK;
                if(not $was_locked)
                {
                    if(not $ECS_NODE_TBL->lock())
                    {
                        log_error("process_store(): unable to lock " .
                                  "ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR);
                    }
                }
                if($ECS_NODE_TBL->LOCK)
                {
                    my $node = $ECS_NODE_TBL->read($node_id);
                    $node->{q_gap_seq} = $q_gap_seq;
                    $node->{q_gap_time} = $q_gap_time;
                    $ECS_NODE_TBL->write($node_id, $node);
                    $ECS_NODE_TBL->unlock() unless $was_locked;
                }
                last NODE_MSG_PART;
            }

            # assemble message part file name array, and
            # determine whether all message parts are present
            my @msg_part_filenames = ();
            my @missing_parts = ();
            for my $pn (1..$nodes->{$node_id}->{numparts}->{$seq_num})
            {
                if(exists $nodes->{$node_id}->{msgpart}->{"${seq_num}_${pn}"})
                {
                    push @msg_part_filenames,
                        $nodes->{$node_id}->{msgpart}->{"${seq_num}_${pn}"};
                }
                else
                {
                    push @missing_parts, $pn;
                }
            }

            # if indicated, request RE_SEND of any missing message parts
            if($#missing_parts >= 0)
            {
                my $q_gap_seq = (exists $node->{q_gap_seq} ?
                    $node->{q_gap_seq} : 0);
                my $q_gap_time = (exists $node->{q_gap_time} ?
                    $node->{q_gap_time} : time());
                if($seq_num == $q_gap_seq)
                {
                    if((time() - $q_gap_time) > $ECS_CFG->T_RESEND_DELAY)
                    {
                        log_info(
                            "process_store(): requesting RE_SEND for " .
                            "message $node_id:$seq_num, parts " .
                            join(',', @missing_parts));
                        for my $pn (@missing_parts)
                        {
                            $err = send_ecs_message($node_id, '',
                                "msg_type=RE_SEND\n",
                                "seq_num=$seq_num:$pn\n",
                                "# random noise: " . rand() . "\n");
                            if($err)
                            {
                                log_error(
                                    "process_store(): unable to send " .
                                    "RE_SEND request to node $node_id: $err");
                                last;
                            }
                        }
                        $q_gap_time = time();
                    }
                }
                else
                {
                    $q_gap_seq = $seq_num;
                    $q_gap_time = time();
                }
                # update q_gap_seq and q_gap time in node_tbl
                my $was_locked = $ECS_NODE_TBL->LOCK;
                if(not $was_locked)
                {
                    if(not $ECS_NODE_TBL->lock())
                    {
                        log_error("process_store(): unable to lock " .
                                  "ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR);
                    }
                }
                if($ECS_NODE_TBL->LOCK)
                {
                    my $node = $ECS_NODE_TBL->read($node_id);
                    $node->{q_gap_seq} = $q_gap_seq;
                    $node->{q_gap_time} = $q_gap_time;
                    $ECS_NODE_TBL->write($node_id, $node);
                    $ECS_NODE_TBL->unlock() unless $was_locked;
                }

                last NODE_MSG_PART;
            }

            # process message
            my $filename = $msgpart->{$seq_part_num};
            print "$DEBUG_LABEL processing \"store\" file: $filename\n"
                if $ECS_CFG->{ECS_DEBUG} > 0;
            my $msg = EMDIS::ECS::Message::read_from_file($filename);
            if(not ref $msg)
            {
                log_error("process_store(): unable to read message " .
                               "from file $filename: $msg");
                last NODE_MSG_PART;
            }
            $processed_seq_num = $seq_num;
            $err = process_message($msg, $filename, 1, \@msg_part_filenames);
            if($err)
            {
                if($err =~ /unable to decrypt message/)
                {
                    log_error("process_store(): unable to decrypt message " .
                              "from file $filename (kept in store): $err");
                    # (don't) attempt to process message as clear-text
                    # $err = process_message($msg, $filename, 0);
                    last NODE_MSG_PART;
                }
                elsif( $err =~ /received (early|duplicate) message/ ) {
                    log_info("process_store(): unable to process message " .
                             "from file $filename: $err");
                    next NODE_MSG_PART;
                }
                else {
                    log_error("process_store(): unable to process message " .
                              "from file $filename: $err");
                    last NODE_MSG_PART;
                }
            }
            foreach my $mpfn (@msg_part_filenames)
            {
                unlink $mpfn;
            }

            last NODE_MSG_PART if $processing_cutoff_time <= time();
            last NODE if $interrupted;
        }

        # check whether processing time limit has been exceeded
        if(time() >= $processing_cutoff_time) {
            log_info("process_store(): exiting early due to processing " .
                "time limit (after $node_id msg)");
            last NODE;
        }
    }

    @nodelist = ();
    $nodes = {};
    update_statistics(\@nodelist, $nodes);
}

script/ecs_scan_mail  view on Meta::CPAN

# Move file to trash directory
sub move_to_trash
{
    my $filename = shift;
    print "$DEBUG_LABEL move_to_trash($filename)\n"
        if $ECS_CFG->ECS_DEBUG > 0;

    my $trashdir = catdir($ECS_CFG->ECS_MBX_STORE_DIR, 'trash');
    mkdir $trashdir unless -e $trashdir;
    my $err = move_to_dir($filename, $trashdir);
    if($err) {
        log_error("process_store(): unable to move file " .
                       "$filename: $err");
    }
}

# ----------------------------------------------------------------------
# Delete message corresponding to specified msgnum from mailbox.
# returns error message, if any
sub mbox_delete_message
{
    my $msgnum = shift;
    print "$DEBUG_LABEL mbox_delete_message($msgnum)\n"
        if $ECS_CFG->ECS_DEBUG > 0;

    for ($ECS_CFG->INBOX_PROTOCOL) {
        /POP3/ and do {
            $mbox->delete($msgnum);
            return '';  # successful
        };
        /IMAP/ and do {
            $mbox->delete_message($msgnum);
            return '';  # successful
        };
        /DIRECTORY/ and do {
            opendir (DIR, $ECS_CFG->INBOX_DIRECTORY)
                or die "can't opendir " . $ECS_CFG->INBOX_DIRECTORY . ": $!";
            my @files =  sort grep { !/^\./ && !/Deleted/i }readdir(DIR);
            closedir (DIR);
            unlink catdir($ECS_CFG->INBOX_DIRECTORY, $files[$msgnum - 1]);
            return '';  # successful
        };
    }
    return "Unexpected mbox-protocol: $_";
}

# ----------------------------------------------------------------------
# Process specified document.
# returns error message, if any
sub process_document
{
    my $msg = shift;
    my $filename = shift;
    my $decrypt = shift;
    my $err = '';

    print "$DEBUG_LABEL process_document(\$msg, $filename, $decrypt)\n"
        if $ECS_CFG->ECS_DEBUG > 0;

    # look up node in ECS_NODE_TBL
    my $was_locked = $ECS_NODE_TBL->LOCK;
    if(not $was_locked) {
        # lock ECS_NODE_TBL
        return "process_document(): unable to lock ECS_NODE_TBL: " .
            $ECS_NODE_TBL->ERROR
                unless $ECS_NODE_TBL->lock();
    }
    # store information about document currently being processed
    my $this_node = $ECS_NODE_TBL->read($ECS_CFG->THIS_NODE);
    $this_node->{proc_node} = $msg->sender;
    $this_node->{proc_seq} = $msg->seq_num;
    $this_node->{proc_file} = $filename;
    $ECS_NODE_TBL->write($ECS_CFG->THIS_NODE, $this_node);
    # retrieve node info
    my $node = $ECS_NODE_TBL->read($msg->sender);
    $ECS_NODE_TBL->unlock() unless $was_locked;  # release node_tbl lock
    if(not $node) {
        # don't process message from unknown node
        $err = "process_document(): $err; " . EOL()
            if $err;
        $err .= "process_document(): document from unknown node: " .
            $msg->sender;
        return $err;
    }

    # don't process "duplicate" document (seq_num too low)
    if($msg->seq_num < ($node->{doc_in_seq} + 1)) {
        my @err = ();
        push @err, "process_document(): received duplicate document: " .
            $msg->sender . ":" . $msg->seq_num . ":DOC";
        # move file to 'trash' folder
        my $e = move_to_dir($filename, $ECS_CFG->ECS_MBX_TRASH_DIR);
        push @err, "process_message(): $e"
            if $e;
        return join EOL(), @err;
    }

    # don't process "early" document (seq_num too high)
    if($msg->seq_num > ($node->{doc_in_seq} + 1)) {
        $err = "process_document(): $err; " . EOL()
            if $err;
        $err .= "process_document(): received early document: " .
            $msg->sender . ":" . $msg->seq_num;
        return $err;
    }

    # create document file
    # decrypt (if indicated)
    my $dmsg;
    if($decrypt) {
        $dmsg = EMDIS::ECS::Message::read_from_encrypted_file($filename);
        if(not ref $dmsg)
        {
            $err = "process_document(): unable to decrypt document: $dmsg";
        }
    }
    else {
        $dmsg = EMDIS::ECS::Message::read_from_file($filename);
        if(not ref $dmsg)
        {
            $err = "process_document(): unable to read document: $dmsg";
        }
    }
    # if error encountered, return it
    return $err if $err;

    # write cleartext document to temp file in $ECS_DAT_DIR/tmp
    my $template = $filename;
    if($template =~ /_\w{4}(\.\w+)*$/o) {
        $template =~ s/_\w{4}(\.\w+)*$/_XXXX/o;
    }
    else {
        $template .= '_XXXX';
    }
    my ($fh, $doc_fname) = tempfile($template, SUFFIX => '.doc.asc');
    print $fh $dmsg->cleartext

script/ecs_scan_mail  view on Meta::CPAN

    {
        unlink $doc_fname;
        return $err;
    }

    # store copy of decrypted payload in $ECS_MBX_IN_FML_DIR
    my $in_fml_filename = catfile($ECS_CFG->ECS_MBX_IN_FML_DIR,
        sprintf("%s_%s_%010d.doc", $msg->sender, $ECS_CFG->THIS_NODE,
                $msg->seq_num));
    if(not copy($doc_fname, $in_fml_filename))
    {
        $err = "process_document(): could not copy $doc_fname " .
            "to $in_fml_filename: $!";
        unlink $doc_fname;
        return $err;
    }
    chmod $EMDIS::ECS::FILEMODE, $in_fml_filename;

    # copy payload file to from_XX directory (if ECS_FROM_DIR is configured)
    my $from_dir = $ECS_CFG->ECS_FROM_DIR;
    if ( defined $from_dir and $from_dir ne '' ) {
        # create temporary file (without extension .doc!)
        # to prevent a race condition on the interface
        $from_dir = catdir( $from_dir, 'from_' . $msg->sender );
        my $template = sprintf( "tmp_doc_d%010d.XXXXXX", $msg->seq_num );
        my( $tmp_fh, $from_tmp_filename ) = tempfile( $template,
                                                      DIR => $from_dir,
                                                      UNLINK => 0 );
        # close the filehandle. We just want to make sure we reserve the
        # temporary filename
        close( $tmp_fh );

        # now copy the actual file content over
        if ( not copy( $doc_fname, $from_tmp_filename ) ) {
            $err = "process_document(): could not copy $doc_fname "
                 . "to $from_tmp_filename: $!";
        }

        # put the final filename together ...
        my $from_filename = catfile( $from_dir,
                                     sprintf("d%010d.doc", $msg->seq_num ) );
        # ... and rename our temporary file in the same directory
        if ( not rename( $from_tmp_filename, $from_filename ) )
        {
            $err = "process_document(): could not rename $from_tmp_filename "
                 . "to $from_filename: $!";
        }

        chmod $EMDIS::ECS::FILEMODE, $from_filename;
    }

    # remove temp file
    unlink $doc_fname;

    return $err if $err ne '';

    unlink $filename;   # remove input file after successful processing

    # message was processed
    # if needed, update $node->{doc_in_seq}
    if(not $was_locked) {
        $ECS_NODE_TBL->lock()     # lock ECS_NODE_TBL if needed
            or return "process_document(): unable to (write) lock " .
                "ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR;
    }
    $node = $ECS_NODE_TBL->read($msg->sender);
    $err = $ECS_NODE_TBL->ERROR;
    if((not $err) and (ref $node))
    {
        $node->{doc_in_seq}++;
        if($msg->seq_num == $node->{doc_in_seq})
        {
            if(is_yes($ECS_CFG->ALWAYS_ACK))
            {
                # only send DOC_MSG_ACK if $ECS_CFG->ALWAYS_ACK is set
                $err = send_ecs_message($msg->sender, '',
                    "msg_type=DOC_MSG_ACK\n",
                    "seq_num=$node->{doc_in_seq}\n",
                    "# 10-4 " . rand() . "\n");
                if($err)
                {
                    $err = "unable to send DOC_MSG_ACK $node->{doc_in_seq} " .
                        "meta-message to node " . $msg->sender . ": $err";
                }
                else
                {
                    $node->{doc_in_seq_ack} = $node->{doc_in_seq};
                }
            }
            $ECS_NODE_TBL->write($msg->sender,$node);
            $err = $ECS_NODE_TBL->ERROR;
        }
    }
    $ECS_NODE_TBL->unlock() unless $was_locked;  # release node_tbl lock
    return "process_document(): $err" if $err;

    return '';
}

# ----------------------------------------------------------------------
# Process specified meta-message.
# returns error message, if any
sub process_meta_message
{
    my $msg = shift;
    my $filename = shift;

    print "$DEBUG_LABEL process_meta_message(\$msg, $filename)\n"
        if $ECS_CFG->ECS_DEBUG > 0;

    # compose command
    my $cmd = sprintf("%s --config $opt_config %s %s",
                      $ECS_CFG->M_MSG_PROC,
                      $filename,
                      $msg->sender);
    print "$DEBUG_LABEL command: $cmd\n"
        if $ECS_CFG->ECS_DEBUG > 0;

    # execute command
    my $result = timelimit_cmd($ECS_CFG->T_MSG_PROC, $cmd);

    # format result, if needed
    $result = "process_meta_message(): $result"
        if($result);

    return $result;
}

# ----------------------------------------------------------------------
# Process specified ECS message.
# returns error message, if any
sub process_message
{
    my $msg = shift;
    my $filename = shift;
    my $decrypt = shift;
    my $msg_part_filenames = shift;
    my $err = '';
    my @msgs = ();
    my $child_pid;

    print "$DEBUG_LABEL process_message(\$msg, $filename, $decrypt, (" .
        join(', ', @$msg_part_filenames) . "))\n"
        if $ECS_CFG->ECS_DEBUG > 0;

    # look up node in ECS_NODE_TBL
    my $was_locked = $ECS_NODE_TBL->LOCK;
    if(not $was_locked) {
        # lock ECS_NODE_TBL
        return "process_message(): unable to lock ECS_NODE_TBL: " .
            $ECS_NODE_TBL->ERROR
                unless $ECS_NODE_TBL->lock();
    }
    # store information about message currently being processed
    my $this_node = $ECS_NODE_TBL->read($ECS_CFG->THIS_NODE);
    $this_node->{proc_node} = $msg->sender;
    $this_node->{proc_seq} = $msg->seq_num;
    $this_node->{proc_file} = $filename;
    $ECS_NODE_TBL->write($ECS_CFG->THIS_NODE, $this_node);
    # retrieve node info
    my $node = $ECS_NODE_TBL->read($msg->sender);
    $ECS_NODE_TBL->unlock() unless $was_locked;  # release node_tbl lock
    if(not $node) {
        # don't process message from unknown node
        $err = "process_message(): $err; " . EOL()
            if $err;
        $err .= "process_message(): message from unknown node: " .
            $msg->sender;
        return $err;
    }

    # don't process "duplicate" message (seq_num too low)
    if($msg->seq_num < ($node->{in_seq} + 1)) {
        my @err = ();
        push @err, "process_message(): received duplicate message: " .
            $msg->sender . ":" . $msg->seq_num;
        # move file(s) to 'trash' folder
        for my $f (@$msg_part_filenames)
        {
            my $e = move_to_dir($f, $ECS_CFG->ECS_MBX_TRASH_DIR);
            push @err, "process_message(): $e"
                if $e;
        }
        return join EOL(), @err;
    }

    # don't process "early" message (seq_num too high)
    if($msg->seq_num > ($node->{in_seq} + 1)) {
        $err = "process_message(): $err; " . EOL()
            if $err;
        $err .= "process_message(): received early message: " .
            $msg->sender . ":" . $msg->seq_num;
        return $err;
    }

    # sanity checks on $msg_part_filenames
    return "process_message(): unexpected error: \$msg_part_filenames " .
        "not defined!"
        if not defined $msg_part_filenames;
    return "process_message(): unexpected error: \$msg_part_filenames " .
        "not an ARRAY reference!"
        if 'ARRAY' ne ref $msg_part_filenames;
    return "process_message(): unexpected error: \$msg_part_filenames " .
        "array is wrong size (expected " . $msg->num_parts . ", found " .
        scalar(@$msg_part_filenames) . ")"
        if $msg->num_parts != scalar(@$msg_part_filenames);

    # create payload file for each message part
    my @mp_payload_filename = ();
    for my $msg_part_fname (@$msg_part_filenames)
    {
        # decrypt message part (if indicated)
        my $dmsg;
        if($decrypt) {
            $dmsg = EMDIS::ECS::Message::read_from_encrypted_file($msg_part_fname);
            if(not ref $dmsg)
            {
                $err = "process_message(): unable to decrypt message: $dmsg";
                last;
            }
        }
        else {

script/ecs_scan_mail  view on Meta::CPAN

        # set ADAPTER_CMD environment variable
        $ENV{ADAPTER_CMD} = $ECS_CFG->ADAPTER_CMD;

        # execute command
        $err = timelimit_cmd($ECS_CFG->T_MSG_PROC, $cmd);
        print "$DEBUG_LABEL command output:\n$EMDIS::ECS::cmd_output\n"
            if $ECS_CFG->ECS_DEBUG > 0;

        # format error message, if needed
        # TODO:  automatically send MSG_DEN response ??
        $err = "process_message(): $err"
            if($err);
    }

    if($err)
    {
        unlink $payload_filename;
        return $err;
    }

    # copy payload file to from_XX directory (if ECS_FROM_DIR is configured)
    my $from_dir = $ECS_CFG->ECS_FROM_DIR;
    if ( defined $from_dir and $from_dir ne '' ) {
        # create temporary file (without extension .msg!)
        # to prevent a race condition on the interface
        $from_dir = catdir( $from_dir, 'from_' . $msg->sender );
        my $template = sprintf( "tmp_msg_%010d.XXXXXX", $msg->seq_num );
        my( $tmp_fh, $from_tmp_filename ) = tempfile( $template,
                                                      DIR => $from_dir,
                                                      UNLINK => 0 );
        # close the filehandle. We just want to make sure we reserve the
        # temporary filename
        close( $tmp_fh );

        # now copy the actual file content over
        if ( not copy( $payload_filename, $from_tmp_filename ) ) {
            $err = "process_message(): could not copy $payload_filename "
                 . "to $from_tmp_filename: $!";
        }

        # put the final filename together ...
        my $from_filename = catfile( $from_dir,
                                     sprintf("%010d.msg", $msg->seq_num ) );
        # ... and rename our temporary file in the same directory
        if ( not rename( $from_tmp_filename, $from_filename ) )
        {
            $err = "process_message(): could not rename $from_tmp_filename "
                 . "to $from_filename: $!";
        }

        chmod $EMDIS::ECS::FILEMODE, $from_filename;
    }

    # remove temp file
    unlink $payload_filename;

    return $err if $err ne '';

    # message was processed
    # if needed, update $node->{in_seq}
    if(not $was_locked) {
        $ECS_NODE_TBL->lock()     # lock ECS_NODE_TBL if needed
            or return "process_message(): unable to (write) lock " .
                "ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR;
    }
    $node = $ECS_NODE_TBL->read($msg->sender);
    $err = $ECS_NODE_TBL->ERROR;
    if((not $err) and (ref $node))
    {
        $node->{in_seq}++;
        if($msg->seq_num == $node->{in_seq})
        {
            if(is_yes($ECS_CFG->ALWAYS_ACK))
            {
                # only send MSG_ACK if $ECS_CFG->ALWAYS_ACK is set
                $err = send_ecs_message($msg->sender, '',
                    "msg_type=MSG_ACK\n",
                    "seq_num=$node->{in_seq}\n",
                    "# 10-4 " . rand() . "\n");
                if($err)
                {
                    $err = "unable to send MSG_ACK $node->{in_seq} " .
                        "meta-message to node " . $msg->sender . ": $err";
                }
                else
                {
                    $node->{in_seq_ack} = $node->{in_seq};
                }
            }
            $ECS_NODE_TBL->write($msg->sender,$node);
            $err = $ECS_NODE_TBL->ERROR;
        }
    }
    $ECS_NODE_TBL->unlock() unless $was_locked;  # release node_tbl lock
    return "process_message(): $err" if $err;

    return '';
}

# ----------------------------------------------------------------------
# Re-read configuration when SIGHUP received.
sub sighup_handler
{
    $reload_config = 1;
}

# ----------------------------------------------------------------------
# Set flag indicating program has been interrupted.
sub sigint_handler
{
    $interrupted = 1;
}


__END__

# embedded POD documentation

=head1 NAME

ecs_scan_mail - ECS email processing daemon

=head1 SYNOPSIS

 ecs_scan_mail

 ecs_scan_mail --once

 ecs_scan_mail --nodaemon

=head1 DESCRIPTION

This program monitors incoming ECS email.  It receives messages from
the configured POP3 or IMAP mailbox and calls appropriate functions to
trigger processing of ECS meta-messages and regular messages.

=head1 OPTIONS

=over 5

=item --config I<ecs_config_file>

Specify the location of the ECS configuration file.  By default, the program
looks for the file specified by the ECS_CONFIG_FILE environment variable;
if that environment variable is not set, it looks for a file named "ecs.cfg"
in the current directory.

=item --daemon

Spawn background process to continuously monitor remote node
communication status.  This option is enabled by default.

=item --nodaemon



( run in 0.649 second using v1.01-cache-2.11-cpan-5a3173703d6 )