EMDIS-ECS

 view release on metacpan or  search on metacpan

lib/EMDIS/ECS/FileBackedMessage.pm  view on Meta::CPAN

        # filename ending in .doc or .doc.xml indicates document (not message)
        $this->{is_ecs_message} = '';
        $this->{is_meta_message} = '';
        $this->{is_document} = 1;
    }

    $this->{temp_files} = [];
    $this->{is_closed} = 0;

    # if $filename not specified, read input from stdin
    if(not $filename)
    {
        # read from stdin, create temp file
        my $template = sprintf('%s_XXXX', format_datetime(time,
            '%04d%02d%02d_%02d%02d%02d'));
        return "Unable to create temp file from stdin: ECS is not configured!"
            unless ecs_is_configured();
        my $fh;
        ($fh, $filename) = tempfile($template,
            DIR => catdir($ECS_CFG->ECS_TMP_DIR),
            SUFFIX => '.msg');
        binmode(STDIN);
        binmode($fh);
        while(1)
        {
            my $buffer;

            my $readlen = read STDIN, $buffer, 65536;
            if(not defined $readlen)
            {
                $err = "Unexpected problem reading STDIN: $!";
                last;
            }

            last if $readlen == 0;

            if(not print $fh $buffer)
            {
                $err = "Unexpected problem writing file $filename: $!";
                last;
            }
        }
        close $fh;
        if($err)
        {
            unlink $filename;
            return $err;
        }
        push @{$this->{temp_files}}, $filename;
    }

    $this->{filename} = $filename;
    my $file_handle;
    return "Unable to open input file $filename: $!"
        unless  open $file_handle, "+< $filename";
    $this->{file_handle} = $file_handle;
    binmode $file_handle;

    # get exclusive lock (with retry loop)
    # protects against reading a file while another process is writing it
    my $locked = '';
    for my $retry (1..5)
    {
        $locked = flock $file_handle, LOCK_EX | LOCK_NB;
        last if $locked;
    }
    if(!$locked)
    {
        $err = "Unable to lock input file $filename: $!";
        close $file_handle;
        return $err;
    }

    my $email_headers = '';
    my $data_offset = 0;

    # attempt to read email headers only if sender_node_id not yet defined
    if(not exists $this->{sender_node_id})
    {
        # attempt to read email headers from file, determine data offset
        my $buf;
        while(1)
        {
            my $bytecount = read $file_handle, $buf, 1;

            if(not defined $bytecount)
            {
                $err = "Unexpected problem reading from file $filename: $!";
                last;
            }

            if($bytecount > 0)
            {
                $email_headers .= $buf;
                $data_offset++;

                # first empty line ends potential email header
                last if $email_headers =~ /\r?\n\r?\n$/so;
            }
            elsif($bytecount == 0 or $data_offset >= 1048576)
            {
                # assume file does not contain email header
                # if EOF encountered or no empty line found in first X bytes
                $data_offset = 0;
                last;
            }
        }
        if($err)
        {
            close $file_handle;
            return $err;
        }
    }

    if($data_offset > 0)
    {
        # convert headers to more easily parseable format, store in this obj
        $email_headers =~ s/\r?\n/\n/go;

        # look for "Subject" line
        if($email_headers =~ /^Subject:\s*(.+?)$/imo)
        {
            $this->{subject} = $1;
            $this->{email_headers} = $email_headers;
            $this->{data_offset} = $data_offset;
        }
    }

lib/EMDIS/ECS/FileBackedMessage.pm  view on Meta::CPAN

    }
    elsif($fml =~ /^\s*msg_type\s*=\s*\S+/isom)
    {
        $this->{is_ecs_message} = 1;
        $this->{is_meta_message} = 1;
        $this->{is_document} = '';
        return '';
    }
    else
    {
        $this->{is_ecs_message} = '';
        $this->{is_meta_message} = '';
        $this->{is_document} = '';
        return '';
    }

    # Note: this code only understands the simple forms of FML assignments
    # (not the extended /FIELDS form)

    # look for HUB_RCV
    if($fml =~ /HUB_RCV\s*=\s*([^,;]+)/iso) # presumes [^,;] in HUB_RCV
    {
        $this->{hub_rcv} = dequote(trim($1));
    }

    # look for HUB_SND
    if($fml =~ /HUB_SND\s*=\s*([^,;]+)/iso) # presumes [^,;] in HUB_SND
    {
        $this->{hub_snd} = dequote(trim($1));
    }

    return '';
}

# ----------------------------------------------------------------------
sub send_this_message
{
    my $this = shift;
    return "send_this_message() must only be called as an instance method!"
        unless ref $this;
    return "send_this_message(): this FileBackedMessage object is closed!"
        if $this->{is_closed};
    return "send_this_message(): this FileBackedMessage object represents " .
        "only a partial message!"
        if defined $this->{num_parts} and $this->{num_parts} > 1;

    # initialize
    my $rcv_node_id = shift;
    my $is_re_send = shift;
    my $part_num = shift;
    return "send_this_message(): ECS has not been configured."
        unless ecs_is_configured();
    my $cfg = $ECS_CFG;
    my $node_tbl = $ECS_NODE_TBL;
    my $err = '';

    return "send_this_message(): Missing \$rcv_node_id!"
        unless defined $rcv_node_id and $rcv_node_id;

    # lock node_tbl, look up $rcv_node_id
    my $was_locked = $node_tbl->LOCK;
    if(not $was_locked)
    {
        $node_tbl->lock()  # lock node_tbl
            or return "send_this_message(): unable to lock node_tbl: " .
                $node_tbl->ERROR;
    }
    my $node = $node_tbl->read($rcv_node_id);
    if(not $node)
    {
        $node_tbl->unlock() unless $was_locked;  # unlock node_tbl if needed
        return "send_this_message(): node not found: $rcv_node_id";
    }
    if(not $node->{addr})
    {
        $node_tbl->unlock() unless $was_locked;  # unlock node_tbl if needed
        return "send_this_message(): addr not defined for node: $rcv_node_id";
    }

    # compute or assign message seq_num
    my $seq_num = '';
    if($is_re_send and not $this->{is_document})
    {
        # sanity checks
        if(not defined $this->{seq_num})
        {
            $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
            return "send_this_message(): seq_num not defined for RE_SEND";
        }
        if($this->{seq_num} > $node->{out_seq})
        {
            $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
            return "send_this_message(): seq_num for RE_SEND (" .
                $this->{seq_num} . ") is greater than out_seq for node " .
                "$rcv_node_id (" . $node->{out_seq} . ")!";
        }
        $seq_num = $this->{seq_num};
    }
    elsif($is_re_send and $this->{is_document})
    {
        # sanity checks
        if(not defined $this->{seq_num})
        {
            $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
            return "send_this_message(): seq_num not defined for DOC_RE_SEND";
        }
        if($this->{seq_num} > $node->{doc_out_seq})
        {
            $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
            return "send_this_message(): seq_num for DOC_RE_SEND (" .
                $this->{seq_num} . ") is greater than doc_out_seq for node " .
                "$rcv_node_id (" . $node->{doc_out_seq} . ")!";
        }
        $seq_num = $this->{seq_num};
    }
    elsif($this->{is_document})
    {
        # automatically get next (doc) sequence number
        $node->{doc_out_seq}++;
        $seq_num = $node->{doc_out_seq};
    }
    elsif(not $this->{is_meta_message})
    {
        # only allow $part_num to be specified if this is a RE_SEND request
        if($part_num)
        {
            $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
            return "send_this_message(): part_num specified ($part_num), for " .
                "non- RE_SEND request!";
        }
        # automatically get next (msg) sequence number
        $node->{out_seq}++;
        $seq_num = $node->{out_seq};
    }

    # compute message part size
    my $msg_part_size = $cfg->MSG_PART_SIZE_DFLT;
    if(defined $node->{msg_part_size} and $node->{msg_part_size} > 0)
    {
        $msg_part_size = $node->{msg_part_size};
    }

    # compute data size
    my $file_size = (stat $this->{file_handle})[7];
    my $data_size = $file_size - $this->{data_offset};
    if($data_size <= 0)
    {
        $node_tbl->unlock() unless $was_locked;  # unlock node_tbl if needed
        return "send_this_message(): data_size is <= 0 ($data_size)!";
    }

    # for document, force num_parts = 1
    if($this->{is_document})
    {
        $msg_part_size = $data_size;
    }

    # compute num_parts
    my $num_parts = int($data_size / $msg_part_size);
    $num_parts++ if ($data_size % $msg_part_size) > 0;
    # num_parts should be 1 for meta message
    if($this->{is_meta_message} and $num_parts > 1)
    {
        $node_tbl->unlock() unless $was_locked;  # unlock node_tbl if needed
        return "send_this_message(): num_parts cannot be > 1 for meta message!";
    }
    # $part_num cannot be greater than $num_parts
    if(defined $part_num and $part_num and $part_num > $num_parts)
    {
        $node_tbl->unlock() unless $was_locked;  # unlock node_tbl if needed
        return "send_this_message(): part_num ($part_num) cannot be greater " .
            "than num_parts ($num_parts)!";
    }

    # compute base subject
    my $subject = $cfg->MAIL_MRK . ':' . $cfg->THIS_NODE;
    $subject .= ":$seq_num" if $seq_num;
    $subject .= ":DOC" if $this->{is_document};

    if($is_re_send)
    {
        # to save disk space, don't copy message to file for RE_SEND
        log_info("send_this_message(): transmitting $rcv_node_id RE_SEND " .
                 "message $seq_num" . ($part_num ? ":$part_num" : '') . "\n");
    }
    else
    {
        # copy message to file (for non- RE_SEND)

        my $filename;

        if($this->{is_meta_message})
        {
            # copy meta message to mboxes/out subdirectory
            $filename = sprintf("%s_%s_%s.msg",
                                $cfg->THIS_NODE, $rcv_node_id, "META");
            my $dirname = $cfg->ECS_MBX_OUT_DIR;
            # create directory if it doesn't already exist
            mkdir $dirname unless -e $dirname;
            $filename = catfile($dirname, $filename);
        }
        else
        {
            # copy regular message or document file to mboxes/out_NODE subdirectory
            if($this->{is_document})
            {
                $filename = format_doc_filename($rcv_node_id, $seq_num);
            }
            else
            {
                $filename = format_msg_filename($rcv_node_id, $seq_num);
            }
            # create directory if it doesn't already exist
            my $dirname = dirname($filename);
            mkdir $dirname unless -e $dirname;
        }

        # don't overwrite $filename file if it already exists
        my $fh;
        if(-e $filename)
        {
            my $template = $filename . "_XXXXXX";
            ($fh, $filename) = tempfile($template);
            return "send_this_message(): unable to open _XXXX file: " .
                "$filename"
                    unless $fh;
        }
        else
        {
            $fh = new IO::File;
            return "send_this_message(): unable to open file: " .
                "$filename"
                unless $fh->open("> $filename");
        }

        print $fh "Subject: $subject\n";
        print $fh "To: $node->{addr}\n";
        print $fh "From: " . $cfg->SMTP_FROM . "\n\n";
        # copy data to $fh
        $err = "Unable to position file pointer for file $this->{filename}" .
            " to position $this->{data_offset}: $!"
            unless seek $this->{file_handle}, $this->{data_offset}, 0;
        my $buffer;
        while(1)
        {
            if($err)
            {
                $node_tbl->unlock() unless $was_locked;  # unlock if needed
                close $fh;
                unlink $filename;
                return $err;
            }

            my $bytecount = read $this->{file_handle}, $buffer, 65536;
            if(not defined $bytecount)
            {
                $err = "send_this_message(): Problem reading input file " .
                    "$this->{filename}: $!";
            }
            elsif($bytecount == 0)
            {
                last; # EOF
            }
            else
            {
                print $fh $buffer
                    or $err = "send_this_message(): Problem writing output " .
                        "file $filename: $!";
            }
        }
        close $fh;
        chmod $FILEMODE, $filename;
    }

    my $custom_headers = {};
    $custom_headers->{'x-emdis-hub-rcv'} = $rcv_node_id;
    $custom_headers->{'x-emdis-hub-snd'} = $cfg->THIS_NODE;

    if($num_parts == 1)
    {
        # read all data, send single email message
        $err = "send_this_message(): Unable to position file pointer for " .
            "file $this->{filename} to position $this->{data_offset}: $!"
            unless seek $this->{file_handle}, $this->{data_offset}, 0;

        if(not $err)
        {
            my $all_data;
            my $bytecount = read $this->{file_handle}, $all_data, $data_size;

            if(not defined $bytecount)
            {
                $err = "send_this_message(): Problem reading input file " .
                    "$this->{filename}: $!";
            }
            elsif($bytecount != $data_size)
            {
                $err = "send_this_message(): Problem reading from input file " .
                    "$this->{filename}: expected $msg_part_size bytes, " .
                        "found $bytecount bytes.";
            }
            elsif($this->{is_meta_message}
                  and ($node->{encr_meta} !~ /true/io))
            {
                # don't encrypt meta-message
                if(is_yes($cfg->ENABLE_AMQP) and exists $node->{amqp_addr_meta} and $node->{amqp_addr_meta}) {
                    # send meta-message via AMQP (if indicated by node config)
                    $err = send_amqp_message(

lib/EMDIS/ECS/FileBackedMessage.pm  view on Meta::CPAN

                        "$this->{filename}: $!";
                }
                elsif($part_num < $num_parts and $bytecount != $msg_part_size)
                {
                    $err = "send_this_message(): Problem reading $rcv_node_id " .
                        "message part $part_num/$num_parts from input file " .
                        "$this->{filename}: expected $msg_part_size bytes, " .
                        "found $bytecount bytes.";
                }
                elsif($bytecount <= 0)
                {
                    $err = "send_this_message(): Problem reading $rcv_node_id " .
                        "message part $part_num/$num_parts from input file " .
                            "$this->{filename}: found $bytecount bytes.";
                }
                else
                {
                    # send encrypted email message
                    $err = send_encrypted_message(
                        $node->{encr_typ},
                        $node->{addr_r},
                        $node->{addr},
                        $node->{encr_out_keyid},
                        $node->{encr_out_passphrase},
                        $node,
                        "$subject:$part_num/$num_parts",
                        $custom_headers,
                        $part_data);
                }
            }

            if($err)
            {
                if($parts_sent == 0)
                {
                    # nothing sent yet, so quit now (possible smtp problem?)
                    last;
                }
                else
                {
                    # part of message was sent, so log error and continue
                    log_error($err);
                    $err = '';
                }
            }
            else
            {
                $parts_sent++;
            }
        }
    }

    if(not $err)
    {
        # update node last_out, possibly out_seq
        $node->{last_out} = time();
        $err = $node_tbl->ERROR
            unless $node_tbl->write($rcv_node_id, $node);
    }
    $node_tbl->unlock()  # unlock node_tbl if needed
        unless $was_locked;

    return $err;
}

1;

__DATA__

# embedded POD documentation
# for more info:  man perlpod

=head1 NAME

EMDIS::ECS::FileBackedMessage - an ECS email message

=head1 SYNOPSIS

 use EMDIS::ECS::FileBackedMessage;

 $msg = new EMDIS::ECS::FileBackedMessage($message_file);
 die "unable to define message: $msg\n" unless ref $msg;

 $msg = EMDIS::ECS::FileBackedMessage($sender_node_id, $seq_num, $data_file);
 die "unable to define message: $msg\n" unless ref $msg;

 $msg->send_this_message($rcv_node_id);

=head1 DESCRIPTION

ECS file-backed message object, capable of handling very large messages.

The send_this_message subroutine of this object knows how to split a large data
file into multiple encrypted email messages, as specified by the EMDISCORD RFC.

=head1 SEE ALSO

EMDIS::ECS, EMDIS::ECS::Config, EMDIS::ECS::LockedHash, EMDIS::ECS::Message

=head1 AUTHOR

Joel Schneider <jschneid@nmdp.org>

=head1 COPYRIGHT AND LICENSE

THIS PACKAGE IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED 
WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF 
MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.

Copyright (C) 2010-2021 National Marrow Donor Program. All rights reserved.

See LICENSE file for license details.

=head1 HISTORY

ECS, the EMDIS Communication System, was originally designed and
implemented by the ZKRD (http://www.zkrd.de/).  This Perl implementation
of ECS was originally developed by the National Marrow Donor Program
(http://www.marrow.org/).



( run in 0.735 second using v1.01-cache-2.11-cpan-ceb78f64989 )