IPC-Transit
view release on metacpan or search on metacpan
lib/IPC/Transit.pm view on Meta::CPAN
}
my $qname = $args{qname};
die "IPC::Transit::receive: parameter 'qname' required"
unless $qname;
die "IPC::Transit::receive: parameter 'qname' must be a scalar"
if ref $qname;
if( not $args{override_local} and
$local_queues and
$local_queues->{$qname}) {
my $m = shift @{$local_queues->{$qname}};
return $m->{message};
}
my $ret = eval {
my $flags = IPC::Transit::Internal::_get_flags('nowait') if $args{nonblock};
my $from_queue = IPC::Transit::Internal::_initialize_queue(%args);
my $ref = { #just doing this so we can pass the possibly big serialized
#data around as a reference
serialized_wire_data => '',
};
if(not $from_queue->rcv($ref->{serialized_wire_data}, 102400000, 0, $flags)) {
return undef;
}
if(not defined $ref->{serialized_wire_data}) {
print STDERR "IPC::Transit::receive: received message had no data";
return undef;
}
my ($header_length, $wire_headers) = _parse_wire_header($ref);
if(not defined $wire_headers) {
print STDERR 'IPC::Transit::receive: received message had no wire headers: ' . substr($ref->{serialized_wire_data}, 0, 30) . "\n";
return undef;
}
if(not defined $header_length) {
print STDERR 'IPC::Transit::receive: received message had no header length: ' . substr($ref->{serialized_wire_data}, 0, 30) . "\n";
return undef;
}
sync_serialized_wire_data($wire_headers, $ref);
my $message = {
wire_headers => $wire_headers,
serialized_message => substr(
$ref->{serialized_wire_data},
$header_length + length($header_length) + 1,
9999999, # :(
),
};
my $used_default_public = 1;
if($message->{wire_headers}->{n}) {
#we be encrypted
#validate $IPC::Transit::my_keys->{private}
#
my $source = $message->{wire_headers}->{S};
my $public_key;
if($IPC::Transit::public_keys->{$source}) {
$public_key = $IPC::Transit::public_keys->{$source};
$used_default_public = 0;
} else {
$public_key = $IPC::Transit::public_keys->{default};
}
my @private_keys = ($IPC::Transit::my_keys->{default});
push @private_keys, $IPC::Transit::my_keys->{private}
if $IPC::Transit::my_keys->{private};
my $nonce = decode_base64($message->{wire_headers}->{n});
my $public_keys;
if(not ref $public_key) {
$public_keys = [$public_key];
} else {
$public_keys = $public_key;
}
push @$public_keys, $IPC::Transit::public_keys->{default};
my $cleartext;
PUBLIC:
foreach my $public (@$public_keys) {
foreach my $private_key (@private_keys) {
$cleartext = crypto_box_open(
$message->{serialized_message},
$nonce,
decode_base64($public),
decode_base64($private_key),
);
last PUBLIC if $cleartext;
}
}
$message->{serialized_message} = $cleartext;
}
return undef unless _thaw($message);
$message->{message}->{'.ipc_transit_meta'}->{encrypt_source} =
$message->{wire_headers}->{S} if $message->{wire_headers}->{S};
$message->{message}->{'.ipc_transit_meta'}->{encrypt_source} = 'default'
if $used_default_public;
return $message if $args{raw};
return $message->{message};
};
die $@ if $@;
return $ret;
}
sub sync_serialized_wire_data {
my ($wire_headers, $ref) = @_;
if($wire_headers->{f} and -r $wire_headers->{f}) {
eval {
local $SIG{ALRM} = sub { die "timed out\n"; };
alarm 5;
open my $fh, '<', $wire_headers->{f}
or die "failed to open $wire_headers->{f} for reading: $!";
read $fh, $ref->{serialized_wire_data}, 1024000000
or die "failed to read from $wire_headers->{f}: $!";
close $fh or die "failed to close $wire_headers->{f}: $!";
};
alarm 0;
unlink $wire_headers->{f};
}
}
sub post_remote {
#This is very simple, first-generation logic. It assumes that every
#message that is received that has a qname set is destined for off box.
#so here, we want to post this message to the destination over http
my $message = shift;
my $http = HTTP::Lite->new;
my $vars = {
message => $message->{serialized_wire_data},
};
$http->prepare_post($vars);
my $url = 'http://' . $message->{message}->{'.ipc_transit_meta'}->{destination} . ':9816/message';
my $req;
eval {
$req = $http->request($url)
or die "Unable to get document: $!";
};
print STDERR "IPC::Transit::post_remote: (\$url=$url) failed: $@\n" if $@;
return $req;
}
sub no_local_queue {
my %args;
{ my @args = @_;
die 'IPC::Transit::no_local_queue: even number of arguments required'
lib/IPC/Transit.pm view on Meta::CPAN
return IPC::Transit::Internal::_queue_exists($qname);
}
sub _parse_wire_header {
my $ref = shift;
if($ref->{serialized_wire_data} !~ /^(\d+)/sm) {
print STDERR 'IPC::Transit::_parse_wire_header: malformed message received: ' . substr($ref->{serialized_wire_data}, 0, 60) . "\n";
return (undef, undef);
}
my $header_length = $1;
return (
$header_length,
deserialize_wire_meta(
substr( $ref->{serialized_wire_data},
length($header_length) + 1,
$header_length
)
),
);
}
sub local_queue {
my %args;
{ my @args = @_;
die 'IPC::Transit::local_queue: even number of arguments required'
if scalar @args % 2;
%args = @args;
}
my $qname = $args{qname};
$local_queues = {} unless $local_queues;
$local_queues->{$qname} = [] unless $local_queues->{$qname};
return 1;
}
sub pack_message {
my $args = shift;
$args->{message}->{'.ipc_transit_meta'} = {}
unless $args->{message}->{'.ipc_transit_meta'};
foreach my $key (keys %$wire_header_arg_translate) {
next unless $args->{$key};
$args->{$wire_header_arg_translate->{$key}} = $args->{$key};
}
foreach my $key (keys %$args) {
next if $wire_header_args->{$key};
next if $std_args->{$key};
$args->{message}->{'.ipc_transit_meta'}->{$key} = $args->{$key};
}
if($args->{encrypt}) {
$args->{message}->{'.ipc_transit_meta'}->{destination} = $args->{destination};
}
$args->{message}->{'.ipc_transit_meta'}->{source_hostname} = _get_my_hostname();
if($args->{encrypt}) {
my $sender = _get_my_hostname();
if(not $sender) {
die 'encrypt selected but unable to determine hostname. Set $IPC::Transit::my_hostname to override';
}
}
if($args->{encrypt}) {
my $nonce = crypto_box_nonce();
$args->{nonce} = encode_base64($nonce);
my $my_private_key;
if($IPC::Transit::my_keys->{private}) {
$my_private_key = $IPC::Transit::my_keys->{private};
$args->{message}->{'.ipc_transit_meta'}->{signed_destination} = 'my_private';
} else {
$my_private_key = $IPC::Transit::my_keys->{default};
$args->{message}->{'.ipc_transit_meta'}->{signed_destination} = 'default';
}
my $their_public_key;
if($IPC::Transit::public_keys->{$args->{destination}}) {
$their_public_key = $IPC::Transit::public_keys->{$args->{destination}};
} else {
$their_public_key = $IPC::Transit::public_keys->{default};
}
$args->{serialized_message} = _freeze($args);
my $cipher_text = crypto_box(
$args->{serialized_message},
$nonce,
decode_base64($their_public_key),
decode_base64($my_private_key)
);
$args->{serialized_message} = $cipher_text;
$args->{source} = _get_my_hostname();
} else {
$args->{serialized_message} = _freeze($args);
}
$args->{message_length} = length $args->{serialized_message};
if($args->{message_length} > $IPC::Transit::max_message_size) {
my $s;
eval {
my $fh;
($fh, $args->{local_filename}) = _get_tmp_file();
$s = serialize_wire_meta($args);
print $fh "$s$args->{serialized_message}"
or die "failed to write to file $args->{local_filename}: $!";
close $fh or die "failed to close $args->{local_filename}: $!";
chmod 0666, $args->{local_filename};
};
if($@) {
unlink $args->{local_filename};
die "IPC::Transit::pack_message: failed: $@";
}
$args->{serialized_wire_data} = $s;
return;
}
my $s = serialize_wire_meta($args);
$args->{serialized_wire_data} = "$s$args->{serialized_message}";
return;
}
sub serialize_wire_meta {
my $args = shift;
my $s = '';
foreach my $key (keys %$args) {
my $translated_key = $wire_header_arg_translate->{$key};
if($translated_key and $wire_header_args->{$translated_key}) {
if($wire_header_args->{$translated_key} == 1) {
$s = "$s$translated_key=$args->{$key},";
} elsif($wire_header_args->{$translated_key}->{$args->{$key}}) {
$s = "$s$translated_key=$args->{$key},";
} else {
die "passed wire argument $translated_key had value of $args->{$translated_key} not of allowed type";
}
}
}
chop $s; #no trailing ,
my $l = length $s;
return "$l:$s";
}
sub deserialize_wire_meta {
my $header = shift;
my $ret = {};
foreach my $part (split ',', $header) {
my ($key, $val) = split '=', $part;
$ret->{$key} = $val;
}
return $ret;
}
lib/IPC/Transit.pm view on Meta::CPAN
=head2 receive(qname => 'some_queue', nonblock => [0|1], override_local => [0|1])
This function fetches a hash reference from 'some_queue' and returns it.
By default, it will block until a reference is available. Setting nonblock
to a true value will cause this to return immediately with 'undef' is
no messages are available.
override_local defaults to false; if set to true, the receive will always
do a non-process local receive.
=head2 stat(qname => 'some_queue')
Returns various stats about the passed queue name, per IPC::Msg::stat:
print Dumper IPC::Transit::stat(qname => 'test');
$VAR1 = {
'ctime' => 1335141770,
'cuid' => 1000,
'lrpid' => 0,
'uid' => 1000,
'lspid' => 0,
'mode' => 438,
'qnum' => 0,
'cgid' => 1000,
'rtime' => 0,
'qbytes' => 16384,
'stime' => 0,
'gid' => 1000
}
=head2 stats()
Return an array of hash references, each containing the information
obtained by the stat() call, one entry for each queue on the system.
=head2 CRYPTO
On send(), if the crypto argument is set, IPC::Transit will sign and
encrypt the message before it is sent. The necessary configs, including
relevant keys, are set in some global variables.
See an actual example of this in action under ex/crypto.pl
Please note that this module does not directly assist with the always
onerous task of key distribution.
=head3 $IPC::Transit::my_hostname
If not set, this defaults to the output of the module Sys::Hostname.
This value is placed into the message by the sender, and used by the
receiver to lookup the public key of the sender.
=head3 $IPC::Transit::my_keys
This is a hash reference initially populated, in the attribute 'default',
with the private half of a default key pair. For actual secure
communication, a new key pair must be generated on both sides, and the
sender's private key needs to be placed here:
$IPC::Transit::my_keys->{private} = $real_private_key
=head3 $IPC::Transit::public_keys
As above, this is a hash reference initially populated, in the attribute
'default', with the public half of a default key pair. For actual secure
communication, a new key pair must be generated on both sides, and the
receiver's public key needs to be placed here:
$IPC::Transit::public_keys->{$receiver_hostname} = $real_public_key_from_receiver
$receiver_hostname must exactly match what is passed into the 'destination'
field of send().
All of these keys must be base 64 encoded 32 byte primes, as used by
the Crypto::Sodium package.
=head3 IPC::Transit::gen_key_pair()
This returns a two element array representing a public/privte key pair,
properly base64 encoded for use in $IPC::Transit::my_keys and
$IPC::Transit::public_keys
=head1 SEE ALSO
A zillion other queueing systems.
=head1 TODO
Implement nonblock flag for send()
=head1 BUGS
Patches, flames, opinions, enhancement ideas are all welcome.
I am not satisfied with not supporting Windows, but it is considered
secondary. I am open to the possibility of adding abstractions for this
kind of support as long as it doesn't impact the primary goals.
=head1 COPYRIGHT
Copyright (c) 2012, 2013, 2016 Dana M. Diederich. All Rights Reserved.
=head1 LICENSE
This module is free software. It may be used, redistributed
and/or modified under the terms of the Perl Artistic License
(see http://www.perl.com/perl/misc/Artistic.html)
=head1 AUTHOR
Dana M. Diederich <dana@realms.org>
=cut
( run in 0.923 second using v1.01-cache-2.11-cpan-f56aa216473 )