EMDIS-ECS
view release on metacpan or search on metacpan
script/ecs_scan_mail view on Meta::CPAN
if(time() >= $processing_cutoff_time) {
log_info("process_maildrop(): skipped due to processing time limit");
return;
}
print "$DEBUG_LABEL process_maildrop()\n"
if $ECS_CFG->{ECS_DEBUG} > 0;
# get sorted list of files in directory
if(not opendir(MAILDROP, $ECS_CFG->ECS_DRP_DIR)) {
log_error("process_maildrop(): unable to open \"maildrop\" " .
"directory: " . $ECS_CFG->ECS_DRP_DIR);
return;
}
my $file;
my @filelist = ();
while(defined($file = readdir(MAILDROP))) {
push(@filelist, $file);
}
closedir(MAILDROP);
@filelist = sort @filelist;
if($#filelist >= 0) {
log_warn("process_maildrop(): the maildrop feature is deprecated " .
"and will removed in a future version of this software.");
}
# process each file
for $file (@filelist)
{
last if $interrupted;
# check whether processing time limit has been exceeded
if(time() >= $processing_cutoff_time) {
log_info("process_amqp_staging(): exiting early due to " .
"processing time limit (file: $file)");
last;
}
next if($file eq '.') or ($file eq '..');
my $filename = catfile($ECS_CFG->ECS_DRP_DIR, $file);
next unless -f $filename;
print "$DEBUG_LABEL processing \"maildrop\" file: $filename\n"
if $ECS_CFG->{ECS_DEBUG} > 0;
my $msg = new EMDIS::ECS::FileBackedMessage($ECS_CFG->THIS_NODE,'',$filename);
if(not ref $msg)
{
log_error("process_maildrop(): unable to load file " .
"$filename: $msg");
last; # don't continue
}
my $err = $msg->inspect_fml();
if($err)
{
log_error("process_maildrop(): unable to inspect FML in " .
"$filename: $err");
last; # don't continue
}
# retrieve node status from node_tbl
my $was_locked = $ECS_NODE_TBL->LOCK;
if(not $was_locked)
{
# lock ECS_NODE_TBL
if(not $ECS_NODE_TBL->lock())
{
log_error("process_maildrop(): unable to lock ECS_NODE_TBL: " .
$ECS_NODE_TBL->ERROR);
last;
}
}
my $node = $ECS_NODE_TBL->read($msg->hub_rcv);
$ECS_NODE_TBL->unlock() unless $was_locked;
if (not defined $node) {
log_error("process_maildrop(): cannot read node '" . $msg->hub_rcv . "'");
next;
}
# don't process the message, if the receiving node is disabled
if (not ( (exists $node->{node_disabled}) and
is_yes($node->{node_disabled}) ) )
{
my $err = $msg->send_this_message($msg->hub_rcv);
undef $msg; # closes file
if($err)
{
log_error("process_maildrop(): unable to send file " .
"$filename: $err");
last; # CAVE: don't continue,
# otherwise you risk a inconsistent DB!
}
unlink $filename;
}
else {
log_info("process_maildrop(): skipping message '$filename' to " .
"node " . $msg->hub_rcv . " (node_disabled=$node->{node_disabled})."
);
}
}
}
# ----------------------------------------------------------------------
# Process outgoing messages in to_XX folders (if $ECS_CFG->ECS_TO_DIR)
sub process_to_XX
{
my $processing_start_time = shift;
my $processing_cutoff_time = shift;
# check whether processing time limit has been exceeded
if(time() >= $processing_cutoff_time) {
log_info("process_to_XX(): skipped due to processing time limit");
return;
}
my $to_dir = $ECS_CFG->ECS_TO_DIR;
print "$DEBUG_LABEL process_to_XX() --> to_XX directories\n"
if $ECS_CFG->{ECS_DEBUG} > 0;
# run over to_XX directories:
if ( $ECS_NODE_TBL->lock() ) { # lock node_tbl
my @keys = sort $ECS_NODE_TBL->keys();
$ECS_NODE_TBL->unlock(); # unlock node_tbl
foreach my $node_id ( @keys ) {
next if $node_id eq $ECS_CFG->THIS_NODE;
if(not $ECS_NODE_TBL->lock())
{
log_error("process_to_XX(): unable to lock ECS_NODE_TBL: " .
$ECS_NODE_TBL->ERROR);
last;
}
script/ecs_scan_mail view on Meta::CPAN
{
my @a = split '_', $a;
my @b = split '_', $b;
my $c = $a[0] <=> $b[0];
return ($c != 0 ? $c : $a[1] <=> $b[1]);
}
# ----------------------------------------------------------------------
# Process incoming messages and documents in "store" folder
sub process_store
{
my $processing_start_time = shift;
my $processing_cutoff_time = shift;
# check whether processing time limit has been exceeded
if(time() >= $processing_cutoff_time) {
log_info("process_store(): skipped due to processing time limit");
return;
}
my @nodelist = ();
my $nodes = {};
print "$DEBUG_LABEL process_store()\n"
if $ECS_CFG->{ECS_DEBUG} > 0;
update_statistics(\@nodelist, $nodes);
# quick hack to help prevent single node from monopolizing processing loop
if($#nodelist > 0)
{
my $scan_seq = time() / $ECS_CFG->T_SCN;
# reverse list on even numbered interval
@nodelist = reverse @nodelist
if ($scan_seq % 2) == 0;
# shift through nodelist, giving each node a chance to go first
my $offset = ($scan_seq / 2) % ($#nodelist + 1);
my $pos;
for($pos = 0; $pos < $offset; $pos++)
{
my $node_id = shift @nodelist;
push @nodelist, $node_id;
}
}
# iterate through nodes and
# check whether any of the messages are able to be processed
NODE:
for my $node_id (@nodelist)
{
# check whether processing time limit has been exceeded
if(time() >= $processing_cutoff_time) {
log_info("process_store(): exiting early due to processing " .
"time limit (node: $node_id)");
last NODE;
}
# retrieve node status from node_tbl
my $was_locked = $ECS_NODE_TBL->LOCK;
if(not $was_locked)
{
# lock ECS_NODE_TBL
if(not $ECS_NODE_TBL->lock())
{
log_error("process_store(): unable to lock ECS_NODE_TBL: " .
$ECS_NODE_TBL->ERROR);
last NODE;
}
}
my $node = $ECS_NODE_TBL->read($node_id);
$ECS_NODE_TBL->unlock() unless $was_locked;
# don't try to process if node not found
if(not ref $node)
{
log_error(
"process_store(): unable to retrieve node $node_id status.");
next NODE;
}
# is node marked as disabled?
if((exists $node->{node_disabled}) and
is_yes($node->{node_disabled}))
{
log_info("process_store(): skipping node $node_id " .
"(node_disabled=$node->{node_disabled}).");
next NODE;
}
my $in_seq = $node->{in_seq};
my $doc_in_seq = $node->{doc_in_seq};
# process document files in sequential order by seq_num
my $docmsgpart = $nodes->{$node_id}->{docmsgpart};
my @docmsglist = sort compare_seq_part keys %$docmsgpart;
my $processed_seq_num = -1;
NODE_DOC_MSG_PART:
for my $seq_part_num (@docmsglist)
{
my ($seq_num, $part_num) = split '_', $seq_part_num;
# skip if this file is part of an already processed document
next NODE_DOC_MSG_PART if $seq_num <= $processed_seq_num;
# don't process now if document is still "early"
if($seq_num > ++$doc_in_seq)
{
my $q_doc_gap_seq = (exists $node->{q_doc_gap_seq} ?
$node->{q_doc_gap_seq} : 0);
my $q_doc_gap_time = (exists $node->{q_doc_gap_time} ?
$node->{q_doc_gap_time} : time());
# if q_doc_gap_seq hasn't changed and T_RESEND_DELAY has
# elapsed, send a batch of up to 100 DOC_RE_SEND requests
if($seq_num == $q_doc_gap_seq)
{
if((time() - $q_doc_gap_time) > $ECS_CFG->T_RESEND_DELAY)
{
my $max_resend_seq = $seq_num - 1;
$max_resend_seq = $doc_in_seq + 99
if $max_resend_seq > ($doc_in_seq + 99);
log_info(
"process_store(): requesting DOC_RE_SEND for " .
"documents $node_id:$doc_in_seq through " .
"$node_id:$max_resend_seq");
for(my $resend_seq = $doc_in_seq;
$resend_seq <= $max_resend_seq; $resend_seq++)
{
$err = send_ecs_message($node_id, '',
"msg_type=DOC_RE_SEND\n",
"seq_num=$resend_seq\n",
"# random noise: " . rand() . "\n");
if($err)
{
log_error(
"process_store(): unable to send " .
"DOC_RE_SEND request to node $node_id: $err");
last;
}
}
$q_doc_gap_time = time();
}
}
else
{
$q_doc_gap_seq = $seq_num;
$q_doc_gap_time = time();
}
# update q_doc_gap_seq and q_doc_gap_time in node_tbl
my $was_locked = $ECS_NODE_TBL->LOCK;
if(not $was_locked)
{
if(not $ECS_NODE_TBL->lock())
{
log_error("process_store(): unable to lock " .
"ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR);
}
}
if($ECS_NODE_TBL->LOCK)
{
my $node = $ECS_NODE_TBL->read($node_id);
$node->{q_doc_gap_seq} = $q_doc_gap_seq;
$node->{q_doc_gap_time} = $q_doc_gap_time;
$ECS_NODE_TBL->write($node_id, $node);
$ECS_NODE_TBL->unlock() unless $was_locked;
}
last NODE_DOC_MSG_PART;
}
# sanity check - docnumparts should always be 1
if($nodes->{$node_id}->{docnumparts}->{$seq_num} != 1) {
log_error("process_store(): unexpected error, document parts > 1 " .
"for $node_id:$seq_num:DOC");
last NODE_DOC_MSG_PART;
}
# process document
my $filename = $docmsgpart->{$seq_part_num};
print "$DEBUG_LABEL processing \"store\" document file: $filename\n"
if $ECS_CFG->{ECS_DEBUG} > 0;
my $msg = EMDIS::ECS::Message::read_from_file($filename);
if(not ref $msg)
{
log_error("process_store(): unable to read document " .
"from file $filename: $msg");
last NODE_DOC_MSG_PART;
}
$processed_seq_num = $seq_num;
$err = process_document($msg, $filename, 1);
if($err)
{
if($err =~ /unable to decrypt document/)
{
log_error("process_store(): unable to decrypt document " .
"from file $filename (kept in store): $err");
# (don't) attempt to process as clear-text
# $err = process_document($msg, $filename, 0);
last NODE_DOC_MSG_PART;
}
else {
log_error("process_store(): unable to process document " .
"from file $filename: $err");
last NODE_DOC_MSG_PART;
}
}
unlink $filename;
last NODE_DOC_MSG_PART if $processing_cutoff_time <= time();
last NODE if $interrupted;
}
# check whether processing time limit has been exceeded
if(time() >= $processing_cutoff_time) {
log_info("process_store(): exiting early due to processing " .
"time limit (after $node_id doc)");
last NODE;
}
# process message files in sequential order by seq_num
my $msgpart = $nodes->{$node_id}->{msgpart};
my @msglist = sort compare_seq_part keys %$msgpart;
$processed_seq_num = -1;
NODE_MSG_PART:
for my $seq_part_num (@msglist)
{
my ($seq_num, $part_num) = split '_', $seq_part_num;
# skip if this file is part of an already processed message
next NODE_MSG_PART if $seq_num <= $processed_seq_num;
# don't process now if message is still "early"
if($seq_num > ++$in_seq)
{
my $q_gap_seq = (exists $node->{q_gap_seq} ?
$node->{q_gap_seq} : 0);
my $q_gap_time = (exists $node->{q_gap_time} ?
$node->{q_gap_time} : time());
# if q_gap_seq hasn't changed and T_RESEND_DELAY has
# elapsed, send a batch of up to 100 RE_SEND requests
if($seq_num == $q_gap_seq)
{
if((time() - $q_gap_time) > $ECS_CFG->T_RESEND_DELAY)
{
my $max_resend_seq = $seq_num - 1;
$max_resend_seq = $in_seq + 99
if $max_resend_seq > ($in_seq + 99);
log_info(
"process_store(): requesting RE_SEND for " .
"messages $node_id:$in_seq through " .
"$node_id:$max_resend_seq");
for(my $resend_seq = $in_seq;
$resend_seq <= $max_resend_seq; $resend_seq++)
{
$err = send_ecs_message($node_id, '',
"msg_type=RE_SEND\n",
"seq_num=$resend_seq\n",
"# random noise: " . rand() . "\n");
if($err)
{
log_error(
"process_store(): unable to send " .
"RE_SEND request to node $node_id: $err");
last;
}
}
$q_gap_time = time();
}
}
else
{
$q_gap_seq = $seq_num;
$q_gap_time = time();
}
# update q_gap_seq and q_gap time in node_tbl
my $was_locked = $ECS_NODE_TBL->LOCK;
if(not $was_locked)
{
if(not $ECS_NODE_TBL->lock())
{
log_error("process_store(): unable to lock " .
"ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR);
}
}
if($ECS_NODE_TBL->LOCK)
{
my $node = $ECS_NODE_TBL->read($node_id);
$node->{q_gap_seq} = $q_gap_seq;
$node->{q_gap_time} = $q_gap_time;
$ECS_NODE_TBL->write($node_id, $node);
$ECS_NODE_TBL->unlock() unless $was_locked;
}
last NODE_MSG_PART;
}
# assemble message part file name array, and
# determine whether all message parts are present
my @msg_part_filenames = ();
my @missing_parts = ();
for my $pn (1..$nodes->{$node_id}->{numparts}->{$seq_num})
{
if(exists $nodes->{$node_id}->{msgpart}->{"${seq_num}_${pn}"})
{
push @msg_part_filenames,
$nodes->{$node_id}->{msgpart}->{"${seq_num}_${pn}"};
}
else
{
push @missing_parts, $pn;
}
}
# if indicated, request RE_SEND of any missing message parts
if($#missing_parts >= 0)
{
my $q_gap_seq = (exists $node->{q_gap_seq} ?
$node->{q_gap_seq} : 0);
my $q_gap_time = (exists $node->{q_gap_time} ?
$node->{q_gap_time} : time());
if($seq_num == $q_gap_seq)
{
if((time() - $q_gap_time) > $ECS_CFG->T_RESEND_DELAY)
{
log_info(
"process_store(): requesting RE_SEND for " .
"message $node_id:$seq_num, parts " .
join(',', @missing_parts));
for my $pn (@missing_parts)
{
$err = send_ecs_message($node_id, '',
"msg_type=RE_SEND\n",
"seq_num=$seq_num:$pn\n",
"# random noise: " . rand() . "\n");
if($err)
{
log_error(
"process_store(): unable to send " .
"RE_SEND request to node $node_id: $err");
last;
}
}
$q_gap_time = time();
}
}
else
{
$q_gap_seq = $seq_num;
$q_gap_time = time();
}
# update q_gap_seq and q_gap time in node_tbl
my $was_locked = $ECS_NODE_TBL->LOCK;
if(not $was_locked)
{
if(not $ECS_NODE_TBL->lock())
{
log_error("process_store(): unable to lock " .
"ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR);
}
}
if($ECS_NODE_TBL->LOCK)
{
my $node = $ECS_NODE_TBL->read($node_id);
$node->{q_gap_seq} = $q_gap_seq;
$node->{q_gap_time} = $q_gap_time;
$ECS_NODE_TBL->write($node_id, $node);
$ECS_NODE_TBL->unlock() unless $was_locked;
}
last NODE_MSG_PART;
}
# process message
my $filename = $msgpart->{$seq_part_num};
print "$DEBUG_LABEL processing \"store\" file: $filename\n"
if $ECS_CFG->{ECS_DEBUG} > 0;
my $msg = EMDIS::ECS::Message::read_from_file($filename);
if(not ref $msg)
{
log_error("process_store(): unable to read message " .
"from file $filename: $msg");
last NODE_MSG_PART;
}
$processed_seq_num = $seq_num;
$err = process_message($msg, $filename, 1, \@msg_part_filenames);
if($err)
{
if($err =~ /unable to decrypt message/)
{
log_error("process_store(): unable to decrypt message " .
"from file $filename (kept in store): $err");
# (don't) attempt to process message as clear-text
# $err = process_message($msg, $filename, 0);
last NODE_MSG_PART;
}
elsif( $err =~ /received (early|duplicate) message/ ) {
log_info("process_store(): unable to process message " .
"from file $filename: $err");
next NODE_MSG_PART;
}
else {
log_error("process_store(): unable to process message " .
"from file $filename: $err");
last NODE_MSG_PART;
}
}
foreach my $mpfn (@msg_part_filenames)
{
unlink $mpfn;
}
last NODE_MSG_PART if $processing_cutoff_time <= time();
last NODE if $interrupted;
}
# check whether processing time limit has been exceeded
if(time() >= $processing_cutoff_time) {
log_info("process_store(): exiting early due to processing " .
"time limit (after $node_id msg)");
last NODE;
}
}
@nodelist = ();
$nodes = {};
update_statistics(\@nodelist, $nodes);
}
script/ecs_scan_mail view on Meta::CPAN
# Move file to trash directory
sub move_to_trash
{
my $filename = shift;
print "$DEBUG_LABEL move_to_trash($filename)\n"
if $ECS_CFG->ECS_DEBUG > 0;
my $trashdir = catdir($ECS_CFG->ECS_MBX_STORE_DIR, 'trash');
mkdir $trashdir unless -e $trashdir;
my $err = move_to_dir($filename, $trashdir);
if($err) {
log_error("process_store(): unable to move file " .
"$filename: $err");
}
}
# ----------------------------------------------------------------------
# Delete message corresponding to specified msgnum from mailbox.
# returns error message, if any
sub mbox_delete_message
{
my $msgnum = shift;
print "$DEBUG_LABEL mbox_delete_message($msgnum)\n"
if $ECS_CFG->ECS_DEBUG > 0;
for ($ECS_CFG->INBOX_PROTOCOL) {
/POP3/ and do {
$mbox->delete($msgnum);
return ''; # successful
};
/IMAP/ and do {
$mbox->delete_message($msgnum);
return ''; # successful
};
/DIRECTORY/ and do {
opendir (DIR, $ECS_CFG->INBOX_DIRECTORY)
or die "can't opendir " . $ECS_CFG->INBOX_DIRECTORY . ": $!";
my @files = sort grep { !/^\./ && !/Deleted/i }readdir(DIR);
closedir (DIR);
unlink catdir($ECS_CFG->INBOX_DIRECTORY, $files[$msgnum - 1]);
return ''; # successful
};
}
return "Unexpected mbox-protocol: $_";
}
# ----------------------------------------------------------------------
# Process specified document.
# returns error message, if any
sub process_document
{
my $msg = shift;
my $filename = shift;
my $decrypt = shift;
my $err = '';
print "$DEBUG_LABEL process_document(\$msg, $filename, $decrypt)\n"
if $ECS_CFG->ECS_DEBUG > 0;
# look up node in ECS_NODE_TBL
my $was_locked = $ECS_NODE_TBL->LOCK;
if(not $was_locked) {
# lock ECS_NODE_TBL
return "process_document(): unable to lock ECS_NODE_TBL: " .
$ECS_NODE_TBL->ERROR
unless $ECS_NODE_TBL->lock();
}
# store information about document currently being processed
my $this_node = $ECS_NODE_TBL->read($ECS_CFG->THIS_NODE);
$this_node->{proc_node} = $msg->sender;
$this_node->{proc_seq} = $msg->seq_num;
$this_node->{proc_file} = $filename;
$ECS_NODE_TBL->write($ECS_CFG->THIS_NODE, $this_node);
# retrieve node info
my $node = $ECS_NODE_TBL->read($msg->sender);
$ECS_NODE_TBL->unlock() unless $was_locked; # release node_tbl lock
if(not $node) {
# don't process message from unknown node
$err = "process_document(): $err; " . EOL()
if $err;
$err .= "process_document(): document from unknown node: " .
$msg->sender;
return $err;
}
# don't process "duplicate" document (seq_num too low)
if($msg->seq_num < ($node->{doc_in_seq} + 1)) {
my @err = ();
push @err, "process_document(): received duplicate document: " .
$msg->sender . ":" . $msg->seq_num . ":DOC";
# move file to 'trash' folder
my $e = move_to_dir($filename, $ECS_CFG->ECS_MBX_TRASH_DIR);
push @err, "process_message(): $e"
if $e;
return join EOL(), @err;
}
# don't process "early" document (seq_num too high)
if($msg->seq_num > ($node->{doc_in_seq} + 1)) {
$err = "process_document(): $err; " . EOL()
if $err;
$err .= "process_document(): received early document: " .
$msg->sender . ":" . $msg->seq_num;
return $err;
}
# create document file
# decrypt (if indicated)
my $dmsg;
if($decrypt) {
$dmsg = EMDIS::ECS::Message::read_from_encrypted_file($filename);
if(not ref $dmsg)
{
$err = "process_document(): unable to decrypt document: $dmsg";
}
}
else {
$dmsg = EMDIS::ECS::Message::read_from_file($filename);
if(not ref $dmsg)
{
$err = "process_document(): unable to read document: $dmsg";
}
}
# if error encountered, return it
return $err if $err;
# write cleartext document to temp file in $ECS_DAT_DIR/tmp
my $template = $filename;
if($template =~ /_\w{4}(\.\w+)*$/o) {
$template =~ s/_\w{4}(\.\w+)*$/_XXXX/o;
}
else {
$template .= '_XXXX';
}
my ($fh, $doc_fname) = tempfile($template, SUFFIX => '.doc.asc');
print $fh $dmsg->cleartext
script/ecs_scan_mail view on Meta::CPAN
{
unlink $doc_fname;
return $err;
}
# store copy of decrypted payload in $ECS_MBX_IN_FML_DIR
my $in_fml_filename = catfile($ECS_CFG->ECS_MBX_IN_FML_DIR,
sprintf("%s_%s_%010d.doc", $msg->sender, $ECS_CFG->THIS_NODE,
$msg->seq_num));
if(not copy($doc_fname, $in_fml_filename))
{
$err = "process_document(): could not copy $doc_fname " .
"to $in_fml_filename: $!";
unlink $doc_fname;
return $err;
}
chmod $EMDIS::ECS::FILEMODE, $in_fml_filename;
# copy payload file to from_XX directory (if ECS_FROM_DIR is configured)
my $from_dir = $ECS_CFG->ECS_FROM_DIR;
if ( defined $from_dir and $from_dir ne '' ) {
# create temporary file (without extension .doc!)
# to prevent a race condition on the interface
$from_dir = catdir( $from_dir, 'from_' . $msg->sender );
my $template = sprintf( "tmp_doc_d%010d.XXXXXX", $msg->seq_num );
my( $tmp_fh, $from_tmp_filename ) = tempfile( $template,
DIR => $from_dir,
UNLINK => 0 );
# close the filehandle. We just want to make sure we reserve the
# temporary filename
close( $tmp_fh );
# now copy the actual file content over
if ( not copy( $doc_fname, $from_tmp_filename ) ) {
$err = "process_document(): could not copy $doc_fname "
. "to $from_tmp_filename: $!";
}
# put the final filename together ...
my $from_filename = catfile( $from_dir,
sprintf("d%010d.doc", $msg->seq_num ) );
# ... and rename our temporary file in the same directory
if ( not rename( $from_tmp_filename, $from_filename ) )
{
$err = "process_document(): could not rename $from_tmp_filename "
. "to $from_filename: $!";
}
chmod $EMDIS::ECS::FILEMODE, $from_filename;
}
# remove temp file
unlink $doc_fname;
return $err if $err ne '';
unlink $filename; # remove input file after successful processing
# message was processed
# if needed, update $node->{doc_in_seq}
if(not $was_locked) {
$ECS_NODE_TBL->lock() # lock ECS_NODE_TBL if needed
or return "process_document(): unable to (write) lock " .
"ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR;
}
$node = $ECS_NODE_TBL->read($msg->sender);
$err = $ECS_NODE_TBL->ERROR;
if((not $err) and (ref $node))
{
$node->{doc_in_seq}++;
if($msg->seq_num == $node->{doc_in_seq})
{
if(is_yes($ECS_CFG->ALWAYS_ACK))
{
# only send DOC_MSG_ACK if $ECS_CFG->ALWAYS_ACK is set
$err = send_ecs_message($msg->sender, '',
"msg_type=DOC_MSG_ACK\n",
"seq_num=$node->{doc_in_seq}\n",
"# 10-4 " . rand() . "\n");
if($err)
{
$err = "unable to send DOC_MSG_ACK $node->{doc_in_seq} " .
"meta-message to node " . $msg->sender . ": $err";
}
else
{
$node->{doc_in_seq_ack} = $node->{doc_in_seq};
}
}
$ECS_NODE_TBL->write($msg->sender,$node);
$err = $ECS_NODE_TBL->ERROR;
}
}
$ECS_NODE_TBL->unlock() unless $was_locked; # release node_tbl lock
return "process_document(): $err" if $err;
return '';
}
# ----------------------------------------------------------------------
# Process specified meta-message.
# returns error message, if any
sub process_meta_message
{
my $msg = shift;
my $filename = shift;
print "$DEBUG_LABEL process_meta_message(\$msg, $filename)\n"
if $ECS_CFG->ECS_DEBUG > 0;
# compose command
my $cmd = sprintf("%s --config $opt_config %s %s",
$ECS_CFG->M_MSG_PROC,
$filename,
$msg->sender);
print "$DEBUG_LABEL command: $cmd\n"
if $ECS_CFG->ECS_DEBUG > 0;
# execute command
my $result = timelimit_cmd($ECS_CFG->T_MSG_PROC, $cmd);
# format result, if needed
$result = "process_meta_message(): $result"
if($result);
return $result;
}
# ----------------------------------------------------------------------
# Process specified ECS message.
# returns error message, if any
sub process_message
{
my $msg = shift;
my $filename = shift;
my $decrypt = shift;
my $msg_part_filenames = shift;
my $err = '';
my @msgs = ();
my $child_pid;
print "$DEBUG_LABEL process_message(\$msg, $filename, $decrypt, (" .
join(', ', @$msg_part_filenames) . "))\n"
if $ECS_CFG->ECS_DEBUG > 0;
# look up node in ECS_NODE_TBL
my $was_locked = $ECS_NODE_TBL->LOCK;
if(not $was_locked) {
# lock ECS_NODE_TBL
return "process_message(): unable to lock ECS_NODE_TBL: " .
$ECS_NODE_TBL->ERROR
unless $ECS_NODE_TBL->lock();
}
# store information about message currently being processed
my $this_node = $ECS_NODE_TBL->read($ECS_CFG->THIS_NODE);
$this_node->{proc_node} = $msg->sender;
$this_node->{proc_seq} = $msg->seq_num;
$this_node->{proc_file} = $filename;
$ECS_NODE_TBL->write($ECS_CFG->THIS_NODE, $this_node);
# retrieve node info
my $node = $ECS_NODE_TBL->read($msg->sender);
$ECS_NODE_TBL->unlock() unless $was_locked; # release node_tbl lock
if(not $node) {
# don't process message from unknown node
$err = "process_message(): $err; " . EOL()
if $err;
$err .= "process_message(): message from unknown node: " .
$msg->sender;
return $err;
}
# don't process "duplicate" message (seq_num too low)
if($msg->seq_num < ($node->{in_seq} + 1)) {
my @err = ();
push @err, "process_message(): received duplicate message: " .
$msg->sender . ":" . $msg->seq_num;
# move file(s) to 'trash' folder
for my $f (@$msg_part_filenames)
{
my $e = move_to_dir($f, $ECS_CFG->ECS_MBX_TRASH_DIR);
push @err, "process_message(): $e"
if $e;
}
return join EOL(), @err;
}
# don't process "early" message (seq_num too high)
if($msg->seq_num > ($node->{in_seq} + 1)) {
$err = "process_message(): $err; " . EOL()
if $err;
$err .= "process_message(): received early message: " .
$msg->sender . ":" . $msg->seq_num;
return $err;
}
# sanity checks on $msg_part_filenames
return "process_message(): unexpected error: \$msg_part_filenames " .
"not defined!"
if not defined $msg_part_filenames;
return "process_message(): unexpected error: \$msg_part_filenames " .
"not an ARRAY reference!"
if 'ARRAY' ne ref $msg_part_filenames;
return "process_message(): unexpected error: \$msg_part_filenames " .
"array is wrong size (expected " . $msg->num_parts . ", found " .
scalar(@$msg_part_filenames) . ")"
if $msg->num_parts != scalar(@$msg_part_filenames);
# create payload file for each message part
my @mp_payload_filename = ();
for my $msg_part_fname (@$msg_part_filenames)
{
# decrypt message part (if indicated)
my $dmsg;
if($decrypt) {
$dmsg = EMDIS::ECS::Message::read_from_encrypted_file($msg_part_fname);
if(not ref $dmsg)
{
$err = "process_message(): unable to decrypt message: $dmsg";
last;
}
}
else {
script/ecs_scan_mail view on Meta::CPAN
# set ADAPTER_CMD environment variable
$ENV{ADAPTER_CMD} = $ECS_CFG->ADAPTER_CMD;
# execute command
$err = timelimit_cmd($ECS_CFG->T_MSG_PROC, $cmd);
print "$DEBUG_LABEL command output:\n$EMDIS::ECS::cmd_output\n"
if $ECS_CFG->ECS_DEBUG > 0;
# format error message, if needed
# TODO: automatically send MSG_DEN response ??
$err = "process_message(): $err"
if($err);
}
if($err)
{
unlink $payload_filename;
return $err;
}
# copy payload file to from_XX directory (if ECS_FROM_DIR is configured)
my $from_dir = $ECS_CFG->ECS_FROM_DIR;
if ( defined $from_dir and $from_dir ne '' ) {
# create temporary file (without extension .msg!)
# to prevent a race condition on the interface
$from_dir = catdir( $from_dir, 'from_' . $msg->sender );
my $template = sprintf( "tmp_msg_%010d.XXXXXX", $msg->seq_num );
my( $tmp_fh, $from_tmp_filename ) = tempfile( $template,
DIR => $from_dir,
UNLINK => 0 );
# close the filehandle. We just want to make sure we reserve the
# temporary filename
close( $tmp_fh );
# now copy the actual file content over
if ( not copy( $payload_filename, $from_tmp_filename ) ) {
$err = "process_message(): could not copy $payload_filename "
. "to $from_tmp_filename: $!";
}
# put the final filename together ...
my $from_filename = catfile( $from_dir,
sprintf("%010d.msg", $msg->seq_num ) );
# ... and rename our temporary file in the same directory
if ( not rename( $from_tmp_filename, $from_filename ) )
{
$err = "process_message(): could not rename $from_tmp_filename "
. "to $from_filename: $!";
}
chmod $EMDIS::ECS::FILEMODE, $from_filename;
}
# remove temp file
unlink $payload_filename;
return $err if $err ne '';
# message was processed
# if needed, update $node->{in_seq}
if(not $was_locked) {
$ECS_NODE_TBL->lock() # lock ECS_NODE_TBL if needed
or return "process_message(): unable to (write) lock " .
"ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR;
}
$node = $ECS_NODE_TBL->read($msg->sender);
$err = $ECS_NODE_TBL->ERROR;
if((not $err) and (ref $node))
{
$node->{in_seq}++;
if($msg->seq_num == $node->{in_seq})
{
if(is_yes($ECS_CFG->ALWAYS_ACK))
{
# only send MSG_ACK if $ECS_CFG->ALWAYS_ACK is set
$err = send_ecs_message($msg->sender, '',
"msg_type=MSG_ACK\n",
"seq_num=$node->{in_seq}\n",
"# 10-4 " . rand() . "\n");
if($err)
{
$err = "unable to send MSG_ACK $node->{in_seq} " .
"meta-message to node " . $msg->sender . ": $err";
}
else
{
$node->{in_seq_ack} = $node->{in_seq};
}
}
$ECS_NODE_TBL->write($msg->sender,$node);
$err = $ECS_NODE_TBL->ERROR;
}
}
$ECS_NODE_TBL->unlock() unless $was_locked; # release node_tbl lock
return "process_message(): $err" if $err;
return '';
}
# ----------------------------------------------------------------------
# Re-read configuration when SIGHUP received.
sub sighup_handler
{
$reload_config = 1;
}
# ----------------------------------------------------------------------
# Set flag indicating program has been interrupted.
sub sigint_handler
{
$interrupted = 1;
}
__END__
# embedded POD documentation
=head1 NAME
ecs_scan_mail - ECS email processing daemon
=head1 SYNOPSIS
ecs_scan_mail
ecs_scan_mail --once
ecs_scan_mail --nodaemon
=head1 DESCRIPTION
This program monitors incoming ECS email. It receives messages from
the configured POP3 or IMAP mailbox and calls appropriate functions to
trigger processing of ECS meta-messages and regular messages.
=head1 OPTIONS
=over 5
=item --config I<ecs_config_file>
Specify the location of the ECS configuration file. By default, the program
looks for the file specified by the ECS_CONFIG_FILE environment variable;
if that environment variable is not set, it looks for a file named "ecs.cfg"
in the current directory.
=item --daemon
Spawn background process to continuously monitor remote node
communication status. This option is enabled by default.
=item --nodaemon
( run in 0.649 second using v1.01-cache-2.11-cpan-5a3173703d6 )