EMDIS-ECS
view release on metacpan or search on metacpan
script/ecs_scan_mail view on Meta::CPAN
}
# ----------------------------------------------------------------------
# We need to adjust the index based on the number of deleted messages
# for DIRECTORY protocol
sub indexAdj {
my $i = shift;
my $deleted = shift;
$ECS_CFG->INBOX_PROTOCOL =~ /DIRECTORY/ ?
($i - $deleted) : $i;
}
# ----------------------------------------------------------------------
# scan email inbox and process messages
sub scan_mail
{
my $mbox_err;
my $loopnum = 0;
my $msgcount = -1;
my $ignored_msgcount = 0;
until ($mbox_err or ($msgcount - $ignored_msgcount) == 0 or $loopnum >= 9)
{
$ignored_msgcount = 0;
$loopnum++;
($mbox_err, $msgcount) = mbox_open();
if($mbox_err) {
log_error("scan_mail(): unable to open email inbox: $mbox_err");
mbox_close();
last;
}
print "$DEBUG_LABEL msgcount = $msgcount\n"
if $ECS_CFG->ECS_DEBUG > 0;
if(not $mbox_err) {
my $numDelMsg = 0; # init the number of deleted msg to 0
my $fname_dttm = format_datetime(
time, '%04d%02d%02d_%02d%02d%02d');
for my $msgnum (1..$msgcount) {
last if $interrupted; # exit msgnum loop if int'd
# read email message
my ($err, $raw_msg) = mbox_get_message(indexAdj($msgnum,$numDelMsg));
if($err) {
log_error("scan_mail(): unable to get message $msgnum " .
"from email inbox: $err");
next;
}
# construct EMDIS::ECS::Message object
my $msg = new EMDIS::ECS::Message($raw_msg);
if((not ref $msg) or (not $msg->is_ecs_message() and not $msg->is_document())) {
my $template = sprintf('%s_%d_%04d_XXXX',
$fname_dttm, $loopnum, $msgnum);
my ($fh, $filename) = tempfile($template,
DIR => catdir($ECS_CFG->ECS_DAT_DIR, 'mboxes', 'trash'),
SUFFIX => '.msg');
print $fh $raw_msg
or $err = "unable to write file $filename: $!";
close $fh;
chmod $EMDIS::ECS::FILEMODE, $filename;
log_info(
"scan_mail(): received non-ECS message: $filename\n");
mbox_delete_message(indexAdj($msgnum,$numDelMsg));
$numDelMsg++; # We just deleted a msg
$ignored_msgcount++;
next;
}
last if $interrupted; # exit msgnum loop if int'd
# write message to temp file in $ECS_DAT_DIR/tmp
my $template = sprintf('%s_%d_%04d_XXXX',
$fname_dttm, $loopnum, $msgnum);
my ($fh, $filename) = tempfile($template,
DIR => $ECS_CFG->ECS_TMP_DIR,
SUFFIX => '.msg');
print $fh $msg->full_msg()
or $err = "unable to write file $filename: $!";
close $fh;
chmod $EMDIS::ECS::FILEMODE, $filename;
if($err) {
log_error("scan_mail(): unable to create tempfile: $err");
next;
}
# check if sender is this_node
if ($msg->sender eq $ECS_CFG->THIS_NODE) {
# mailtoadmin - tell admin mail is in $filename
print "$DEBUG_LABEL processing recipient = sender file: $filename\n"
if $ECS_CFG->{ECS_DEBUG} > 0;
send_admin_email("recipient and sender are the same." ,
" The message can be found in $filename");
mbox_delete_message(indexAdj($msgnum,$numDelMsg));
$numDelMsg++; # We just deleted a msg
$ignored_msgcount++;
next;
}
#check if sender is really a node
$ECS_NODE_TBL->lock(); # lock node_tbl
my @nodes = sort $ECS_NODE_TBL->keys();
$ECS_NODE_TBL->unlock(); # unlock node_tbl
my $sender = $msg->sender;
if ( ! ( grep { $_ =~ /$sender/ } @nodes)) {
# mailtoadmin - tell admin that sender is not a node
print "$DEBUG_LABEL making sure sender is a node\n"
if $ECS_CFG->{ECS_DEBUG} > 0;
send_admin_email(
"ecs_scan_mail: node not found: $sender (@nodes)! "
. "The message can be found in $filename\n");
mbox_delete_message(indexAdj($msgnum,$numDelMsg));
$numDelMsg++; # We just deleted a msg
$ignored_msgcount++;
next;
}
# copy message to "in" folder
$err = copy_to_dir($filename, $ECS_CFG->ECS_MBX_IN_DIR);
if($err) {
log_error("scan_mail(): unable to copy file $filename " .
"to " . $ECS_CFG->ECS_MBX_IN_DIR . ": $err");
$ignored_msgcount++;
next;
}
# if configured, copy message to "in_bck" folder
if($ECS_CFG->BCK_DIR ne 'NONE') {
$err = copy_to_dir($filename, $ECS_CFG->BCK_DIR);
if($err) {
log_error("scan_mail(): unable to back up file " .
"$filename to " . $ECS_CFG->BCK_DIR .
": $err");
}
}
if($msg->is_meta_message()) {
# process meta-message
$err = process_meta_message($msg, $filename);
script/ecs_scan_mail view on Meta::CPAN
if $err;
$err .= "process_document(): document from unknown node: " .
$msg->sender;
return $err;
}
# don't process "duplicate" document (seq_num too low)
if($msg->seq_num < ($node->{doc_in_seq} + 1)) {
my @err = ();
push @err, "process_document(): received duplicate document: " .
$msg->sender . ":" . $msg->seq_num . ":DOC";
# move file to 'trash' folder
my $e = move_to_dir($filename, $ECS_CFG->ECS_MBX_TRASH_DIR);
push @err, "process_message(): $e"
if $e;
return join EOL(), @err;
}
# don't process "early" document (seq_num too high)
if($msg->seq_num > ($node->{doc_in_seq} + 1)) {
$err = "process_document(): $err; " . EOL()
if $err;
$err .= "process_document(): received early document: " .
$msg->sender . ":" . $msg->seq_num;
return $err;
}
# create document file
# decrypt (if indicated)
my $dmsg;
if($decrypt) {
$dmsg = EMDIS::ECS::Message::read_from_encrypted_file($filename);
if(not ref $dmsg)
{
$err = "process_document(): unable to decrypt document: $dmsg";
}
}
else {
$dmsg = EMDIS::ECS::Message::read_from_file($filename);
if(not ref $dmsg)
{
$err = "process_document(): unable to read document: $dmsg";
}
}
# if error encountered, return it
return $err if $err;
# write cleartext document to temp file in $ECS_DAT_DIR/tmp
my $template = $filename;
if($template =~ /_\w{4}(\.\w+)*$/o) {
$template =~ s/_\w{4}(\.\w+)*$/_XXXX/o;
}
else {
$template .= '_XXXX';
}
my ($fh, $doc_fname) = tempfile($template, SUFFIX => '.doc.asc');
print $fh $dmsg->cleartext
or $err = "process_document: Unable to write file " .
"$doc_fname: $!";
close $fh;
chmod $EMDIS::ECS::FILEMODE, $doc_fname;
# if error encountered, remove temp file and return error
if($err)
{
unlink $doc_fname;
return $err;
}
# store copy of decrypted payload in $ECS_MBX_IN_FML_DIR
my $in_fml_filename = catfile($ECS_CFG->ECS_MBX_IN_FML_DIR,
sprintf("%s_%s_%010d.doc", $msg->sender, $ECS_CFG->THIS_NODE,
$msg->seq_num));
if(not copy($doc_fname, $in_fml_filename))
{
$err = "process_document(): could not copy $doc_fname " .
"to $in_fml_filename: $!";
unlink $doc_fname;
return $err;
}
chmod $EMDIS::ECS::FILEMODE, $in_fml_filename;
# copy payload file to from_XX directory (if ECS_FROM_DIR is configured)
my $from_dir = $ECS_CFG->ECS_FROM_DIR;
if ( defined $from_dir and $from_dir ne '' ) {
# create temporary file (without extension .doc!)
# to prevent a race condition on the interface
$from_dir = catdir( $from_dir, 'from_' . $msg->sender );
my $template = sprintf( "tmp_doc_d%010d.XXXXXX", $msg->seq_num );
my( $tmp_fh, $from_tmp_filename ) = tempfile( $template,
DIR => $from_dir,
UNLINK => 0 );
# close the filehandle. We just want to make sure we reserve the
# temporary filename
close( $tmp_fh );
# now copy the actual file content over
if ( not copy( $doc_fname, $from_tmp_filename ) ) {
$err = "process_document(): could not copy $doc_fname "
. "to $from_tmp_filename: $!";
}
# put the final filename together ...
my $from_filename = catfile( $from_dir,
sprintf("d%010d.doc", $msg->seq_num ) );
# ... and rename our temporary file in the same directory
if ( not rename( $from_tmp_filename, $from_filename ) )
{
$err = "process_document(): could not rename $from_tmp_filename "
. "to $from_filename: $!";
}
chmod $EMDIS::ECS::FILEMODE, $from_filename;
}
# remove temp file
unlink $doc_fname;
return $err if $err ne '';
unlink $filename; # remove input file after successful processing
# message was processed
# if needed, update $node->{doc_in_seq}
if(not $was_locked) {
$ECS_NODE_TBL->lock() # lock ECS_NODE_TBL if needed
or return "process_document(): unable to (write) lock " .
"ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR;
}
$node = $ECS_NODE_TBL->read($msg->sender);
$err = $ECS_NODE_TBL->ERROR;
if((not $err) and (ref $node))
{
$node->{doc_in_seq}++;
if($msg->seq_num == $node->{doc_in_seq})
{
if(is_yes($ECS_CFG->ALWAYS_ACK))
{
# only send DOC_MSG_ACK if $ECS_CFG->ALWAYS_ACK is set
$err = send_ecs_message($msg->sender, '',
"msg_type=DOC_MSG_ACK\n",
"seq_num=$node->{doc_in_seq}\n",
"# 10-4 " . rand() . "\n");
if($err)
{
$err = "unable to send DOC_MSG_ACK $node->{doc_in_seq} " .
"meta-message to node " . $msg->sender . ": $err";
}
else
{
$node->{doc_in_seq_ack} = $node->{doc_in_seq};
}
}
$ECS_NODE_TBL->write($msg->sender,$node);
$err = $ECS_NODE_TBL->ERROR;
}
}
$ECS_NODE_TBL->unlock() unless $was_locked; # release node_tbl lock
return "process_document(): $err" if $err;
return '';
}
# ----------------------------------------------------------------------
# Process specified meta-message.
# returns error message, if any
sub process_meta_message
{
my $msg = shift;
my $filename = shift;
print "$DEBUG_LABEL process_meta_message(\$msg, $filename)\n"
if $ECS_CFG->ECS_DEBUG > 0;
script/ecs_scan_mail view on Meta::CPAN
{
my $e = move_to_dir($f, $ECS_CFG->ECS_MBX_TRASH_DIR);
push @err, "process_message(): $e"
if $e;
}
return join EOL(), @err;
}
# don't process "early" message (seq_num too high)
if($msg->seq_num > ($node->{in_seq} + 1)) {
$err = "process_message(): $err; " . EOL()
if $err;
$err .= "process_message(): received early message: " .
$msg->sender . ":" . $msg->seq_num;
return $err;
}
# sanity checks on $msg_part_filenames
return "process_message(): unexpected error: \$msg_part_filenames " .
"not defined!"
if not defined $msg_part_filenames;
return "process_message(): unexpected error: \$msg_part_filenames " .
"not an ARRAY reference!"
if 'ARRAY' ne ref $msg_part_filenames;
return "process_message(): unexpected error: \$msg_part_filenames " .
"array is wrong size (expected " . $msg->num_parts . ", found " .
scalar(@$msg_part_filenames) . ")"
if $msg->num_parts != scalar(@$msg_part_filenames);
# create payload file for each message part
my @mp_payload_filename = ();
for my $msg_part_fname (@$msg_part_filenames)
{
# decrypt message part (if indicated)
my $dmsg;
if($decrypt) {
$dmsg = EMDIS::ECS::Message::read_from_encrypted_file($msg_part_fname);
if(not ref $dmsg)
{
$err = "process_message(): unable to decrypt message: $dmsg";
last;
}
}
else {
$dmsg = EMDIS::ECS::Message::read_from_file($msg_part_fname);
if(not ref $dmsg)
{
$err = "process_message(): unable to read message: $dmsg";
last;
}
}
# write FML message payload to temp file in $ECS_DAT_DIR/tmp
my $template = $filename;
$template =~ s/_\w{4}(\.\w+)*$/_XXXX/;
my ($fh, $mp_payload_fname) = tempfile($template, SUFFIX => '.fml');
print $fh $dmsg->cleartext
or $err = "process_message: Unable to write file " .
"$mp_payload_fname: $!";
close $fh;
chmod $EMDIS::ECS::FILEMODE, $mp_payload_fname;
push @mp_payload_filename, $mp_payload_fname;
if($err)
{
last;
}
}
# if error encountered, remove any partial payload files and return error
if($err)
{
for my $fname (@mp_payload_filename)
{
unlink $fname;
}
return $err;
}
my $payload_filename;
# concatenate decrypted message part files as needed
if($#mp_payload_filename == 0)
{
$payload_filename = $mp_payload_filename[0];
}
else
{
my $fh;
my $template = catfile($ECS_CFG->ECS_TMP_DIR,
sprintf("%s_%s_%010d_XXXX", $msg->sender, $ECS_CFG->THIS_NODE,
$msg->seq_num));
($fh, $payload_filename) = tempfile($template, SUFFIX => '.fml');
binmode($fh);
for my $fname (@mp_payload_filename)
{
open(PART, $fname)
or $err = "process_message(): Unable to open message " .
"part file $fname: $!";
last if $err;
binmode(PART);
while(1)
{
my $buffer;
my $readlen = sysread PART, $buffer, 65536;
if(not defined $readlen)
{
$err = "process_message(): unexpected problem reading " .
"file $fname: $!";
last;
}
last if $readlen <= 0;
if(not print $fh $buffer)
{
$err = "process_message(): unexpected problem writing " .
"file $payload_filename: $!";
last;
}
}
close(PART);
last if $err;
}
$fh->close();
# remove temp files
for my $fname (@mp_payload_filename)
{
unlink $fname;
}
}
if($err)
{
unlink $payload_filename;
return $err;
}
# store copy of decrypted FML in $ECS_MBX_IN_FML_DIR
my $in_fml_filename = catfile($ECS_CFG->ECS_MBX_IN_FML_DIR,
sprintf("%s_%s_%010d.fml", $msg->sender, $ECS_CFG->THIS_NODE,
$msg->seq_num));
if(not copy($payload_filename, $in_fml_filename))
{
$err = "process_message(): could not copy $payload_filename " .
"to $in_fml_filename: $!";
unlink $payload_filename;
return $err;
}
chmod $EMDIS::ECS::FILEMODE, $in_fml_filename;
# process file via ADAPTER_CMD (if ADAPTER_CMD is configured)
if( defined $ECS_CFG->ADAPTER_CMD and $ECS_CFG->ADAPTER_CMD ne '' )
{
# compose command
my $cmd = sprintf("%s %s %s %s",
$ECS_CFG->MSG_PROC,
$payload_filename,
$msg->sender,
$msg->seq_num);
print "$DEBUG_LABEL command: $cmd\n"
if $ECS_CFG->ECS_DEBUG > 0;
# set ADAPTER_CMD environment variable
$ENV{ADAPTER_CMD} = $ECS_CFG->ADAPTER_CMD;
# execute command
$err = timelimit_cmd($ECS_CFG->T_MSG_PROC, $cmd);
print "$DEBUG_LABEL command output:\n$EMDIS::ECS::cmd_output\n"
if $ECS_CFG->ECS_DEBUG > 0;
# format error message, if needed
# TODO: automatically send MSG_DEN response ??
$err = "process_message(): $err"
if($err);
}
if($err)
{
unlink $payload_filename;
return $err;
}
# copy payload file to from_XX directory (if ECS_FROM_DIR is configured)
my $from_dir = $ECS_CFG->ECS_FROM_DIR;
if ( defined $from_dir and $from_dir ne '' ) {
# create temporary file (without extension .msg!)
# to prevent a race condition on the interface
$from_dir = catdir( $from_dir, 'from_' . $msg->sender );
my $template = sprintf( "tmp_msg_%010d.XXXXXX", $msg->seq_num );
my( $tmp_fh, $from_tmp_filename ) = tempfile( $template,
DIR => $from_dir,
UNLINK => 0 );
# close the filehandle. We just want to make sure we reserve the
# temporary filename
close( $tmp_fh );
# now copy the actual file content over
if ( not copy( $payload_filename, $from_tmp_filename ) ) {
$err = "process_message(): could not copy $payload_filename "
. "to $from_tmp_filename: $!";
}
# put the final filename together ...
my $from_filename = catfile( $from_dir,
sprintf("%010d.msg", $msg->seq_num ) );
# ... and rename our temporary file in the same directory
if ( not rename( $from_tmp_filename, $from_filename ) )
{
$err = "process_message(): could not rename $from_tmp_filename "
. "to $from_filename: $!";
}
chmod $EMDIS::ECS::FILEMODE, $from_filename;
}
# remove temp file
unlink $payload_filename;
return $err if $err ne '';
# message was processed
# if needed, update $node->{in_seq}
if(not $was_locked) {
$ECS_NODE_TBL->lock() # lock ECS_NODE_TBL if needed
or return "process_message(): unable to (write) lock " .
"ECS_NODE_TBL: " . $ECS_NODE_TBL->ERROR;
}
$node = $ECS_NODE_TBL->read($msg->sender);
$err = $ECS_NODE_TBL->ERROR;
if((not $err) and (ref $node))
{
$node->{in_seq}++;
if($msg->seq_num == $node->{in_seq})
{
if(is_yes($ECS_CFG->ALWAYS_ACK))
{
# only send MSG_ACK if $ECS_CFG->ALWAYS_ACK is set
$err = send_ecs_message($msg->sender, '',
"msg_type=MSG_ACK\n",
"seq_num=$node->{in_seq}\n",
"# 10-4 " . rand() . "\n");
if($err)
{
$err = "unable to send MSG_ACK $node->{in_seq} " .
"meta-message to node " . $msg->sender . ": $err";
}
else
{
$node->{in_seq_ack} = $node->{in_seq};
}
}
$ECS_NODE_TBL->write($msg->sender,$node);
$err = $ECS_NODE_TBL->ERROR;
}
}
$ECS_NODE_TBL->unlock() unless $was_locked; # release node_tbl lock
return "process_message(): $err" if $err;
return '';
}
# ----------------------------------------------------------------------
# Re-read configuration when SIGHUP received.
sub sighup_handler
{
$reload_config = 1;
}
# ----------------------------------------------------------------------
# Set flag indicating program has been interrupted.
sub sigint_handler
{
$interrupted = 1;
( run in 0.727 second using v1.01-cache-2.11-cpan-39bf76dae61 )