AC-DC
view release on metacpan - search on metacpan
view release on metacpan or search on metacpan
lib/AC/DC/Protocol.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Sep-10 13:37 (EDT)
# Function:
#
# $Id$
package AC::DC::Protocol;
use Carp qw(croak confess);
use Digest::SHA1;
use Fcntl;
use POSIX;
use Socket;
use Time::HiRes 'time';
use strict;
# header:
# proto version(32)
# message type(32)
# auth length(32)
# data length(32)
# content length(32)
# msgidno(32)
# flags(32): is-reply(0), want-reply(1), is-error(2), data-encrypted(3), content-encrypted(4)
#
# followed by:
# Auth PB(auth-length)
# Data PB(data-length)
# Content(content-length)
my $VERSION = 0x41433032;
my $BUFSIZ = 65536;
my %MSGTYPE;
my %MSGREV;
# status => { num => 0, reqc => '', resc => 'ACPStdReply' },
sub header_size { return 28 }
sub new {
my $class = shift;
return bless { @_ }, $class;
}
sub add_msg {
my $class = shift;
my $name = shift;
my $num = shift;
my $reqc = shift;
my $resc = shift;
my $d = {
name => $name,
num => $num,
reqc => $reqc,
resc => $resc,
};
$MSGTYPE{$name} = $d;
$MSGREV{$num} = $name;
}
################################################################
sub encode_header {
my $me = shift;
my %p = @_;
# type, auth_length, data_length, content_length, msgidno,
# is_reply, want_reply, is_error
my $mt = $MSGTYPE{ $p{type} };
confess "unknown message type $p{type}\n" unless defined $mt;
my $flags = ( $p{is_reply} ? 1 : 0 )
| ( $p{want_reply} ? 2 : 0 )
| ( $p{is_error} ? 4 : 0 )
| ( $p{data_encrypted} ? 8 : 0 )
| ( $p{content_encrypted} ? 16 : 0 );
return pack( "NNNNNNN",
$VERSION, $mt->{num}, $p{auth_length}, $p{data_length}, $p{content_length}, $p{msgidno}, $flags );
}
sub decode_header {
my $me = shift;
my $headr = shift;
my( $ver, $mt, $al, $dl, $cl, $id, $fl )
= unpack("NNNNNNN", $headr);
my %p = (
auth_length => $al,
data_length => $dl,
content_length => $cl,
msgidno => $id,
type => $MSGREV{$mt},
);
confess "unknown protocol version $ver\n" unless $ver == $VERSION;
confess "unknown protocol message $mt\n" unless $p{type};
$p{is_reply} = ($fl & 1) ? 1 : 0;
$p{want_reply} = ($fl & 2) ? 1 : 0;
$p{is_error} = ($fl & 4) ? 1 : 0;
$p{data_encrypted} = ($fl & 8) ? 1 : 0;
$p{content_encrypted} = ($fl & 16) ? 1 : 0;
return \%p;
}
sub encrypt {
my $me = shift;
# NYI - placeholder
}
sub decrypt {
my $me = shift;
my $auth = shift;
my $buf = shift;
# NYI - placeholder
}
sub _encode_common {
my $me = shift;
my $how = shift;
my $proto = shift;
my $data = shift;
my $cont = shift; # reference
my $auth = shift; # NYI
my $mt = $MSGTYPE{ $proto->{type} };
confess "unknown message type $proto->{type}\n" unless defined $mt;
my $apb = $auth ? ACPAuth->encode( $auth ) : '';
my $gpb = $data ? $mt->{$how}->encode( $data ) : '';
if( $proto->{data_encrypted} && $gpb ){
$gpb = $me->encrypt( $auth, $gpb );
}
my $hdr = $me->encode_header(
type => $proto->{type},
want_reply => $proto->{want_reply},
is_reply => $proto->{is_reply},
msgidno => $proto->{msgidno},
data_encrypted => $proto->{data_encrypted},
content_encrypted => $proto->{content_encrypted},
auth_length => length($apb),
data_length => length($gpb),
content_length => ($cont ? length($$cont) : 0),
);
# caller needs to add content. (to avoid large copy)
return $hdr . $apb . $gpb;
}
sub _decode_common {
my $me = shift;
my $how = shift;
my $reply = shift;
my $data = shift;
my $mt = $MSGTYPE{ $reply->{type} };
confess "unknown message type $reply->{type}\n" unless defined $mt;
return unless $data || $reply->{data};
my $res = $mt->{$how}->decode( $data || $reply->{data} || '' );
return $res;
}
sub encode_request {
my $me = shift;
return $me->_encode_common( 'reqc', @_ );
}
sub encode_reply {
my $me = shift;
return $me->_encode_common( 'resc', @_ );
}
sub decode_request {
my $me = shift;
return $me->_decode_common( 'reqc', @_ );
}
sub decode_reply {
my $me = shift;
return $me->_decode_common( 'resc', @_ );
}
################################################################
sub _try_to_connect {
my $s = shift;
my $sa = shift;
my $to = shift;
my $fn = fileno($s);
my $wfd = "\0\0\0\0";
vec($wfd, $fn, 1) = 1;
my $i = connect($s, $sa);
return 1 if $i; # connected
return unless $! == EISCONN || $! == EALREADY || $! == EINPROGRESS;
# wait until connected or timeout
my $is = select(undef, $wfd, undef, $to);
return if $is == -1;
return 1 if vec($wfd, $fn, 1);
return;
}
sub connect_to_server {
my $me = shift;
my $ipn = shift;
my $port = shift;
my $timeo = shift;
my $s;
socket($s, PF_INET, SOCK_STREAM, 6) || confess "cannot create socket: $!\n";
setsockopt($s, Socket::IPPROTO_TCP(), Socket::TCP_NODELAY(), 1);
# set non-blocking
my $fl = fcntl($s, F_GETFL, 0);
fcntl($s, F_SETFL, O_NDELAY);
my $sa = sockaddr_in($port, $ipn);
my $to = $timeo ? $timeo / 2 : 0.25;
# try connecting up to 3 times
for (1..3){
# print STDERR "connecting\n";
my $ok = _try_to_connect($s, $sa, $to);
if( $ok ){
# reset non-blocking
fcntl($s, F_SETFL, $fl);
return $s;
}
}
my $ipa = inet_ntoa($ipn);
confess "connect failed to $ipa:$port\n";
}
sub write_request {
my $me = shift;
my $s = shift;
my $req = shift;
my $timeo = shift;
$timeo ||= 1;
# set non-blocking
my $fl = fcntl($s, F_GETFL, 0);
fcntl($s, F_SETFL, O_NDELAY);
my $fn = fileno($s);
my $tlen = length($req);
my $slen = 0;
while($tlen){
my $wfd = "\0\0\0\0";
vec($wfd, $fn, 1) = 1;
my $to = $timeo;
my $si = select(undef, $wfd, undef, $to);
confess "write data failed: $!\n" if $si == -1;
confess "write timeout\n" unless vec($wfd, $fn, 1);
my $l = $tlen > $BUFSIZ ? $BUFSIZ : $tlen;
my $i = syswrite($s, $req, $l, $slen);
confess "write failed $!\n" unless $i >= 1;
$tlen -= $i;
$slen += $i;
}
fcntl($s, F_SETFL, $fl);
return $slen;
}
sub read_data {
my $me = shift;
my $s = shift;
my $size = shift;
my $timeo = shift;
$timeo ||= 1;
# set non-blocking
my $fl = fcntl($s, F_GETFL, 0);
fcntl($s, F_SETFL, O_NDELAY);
my $fn = fileno($s);
my $data;
my $start = time();
while( my $len = $size - length($data) ){
$len = $BUFSIZ if $len > $BUFSIZ;
my $rfd = "\0\0\0\0";
vec($rfd, $fn, 1) = 1;
my $to = $start + $timeo - time();
my $t0 = time();
my $si = select($rfd, undef, undef, $to);
next if $si == -1 && $! == EINTR;
confess "read data failed: $!\n" if $si == -1;
confess "read timeout " . (time() - $t0) . "\n" unless vec($rfd, $fn, 1);
my $i = sysread($s, $data, $len, length($data));
next if !defined($i) && $! == EINTR;
confess "read failed: connection closed (read " . length($data) . " of $len)\n" if $i == 0;
}
fcntl($s, F_SETFL, $fl);
return $data;
}
################################################################
# stream fd to other fd
# return hash
sub sendfile {
my $me = shift;
my $out = shift;
my $in = shift;
my $size = shift;
my $timeo = shift;
# NB: sendfile(2) only supports file=>socket + file=>file
# not socket=>file, ...
# RSN - elastic buffering?
my $sha1 = Digest::SHA1->new();
while($size){
my $len = $size > $BUFSIZ ? $BUFSIZ : $size;
my $buf = $me->read_data($in, $len, $timeo);
my $i = length $buf;
confess "read failed: $!\n" unless $i > 0;
my $w = $me->write_request($out, $buf, $timeo);
$size -= $i;
$sha1->add($buf);
}
return $sha1->b64digest();
}
sub send_request {
my $me = shift;
my $ipn = shift;
my $port = shift;
my $req = shift;
my $debug = shift;
my $timeo = shift;
$debug ||= sub {};
$timeo ||= 0.5;
local $SIG{ALRM} = sub{ $debug->("timeout") };
my $s = $me->connect_to_server($ipn, $port, $timeo);
# send request
$debug->("sending request");
$me->write_request($s, $req, $timeo);
# get response or timeout
$debug->("reading header");
my $buf = $me->read_data($s, header_size(), $timeo);
my $p = $me->decode_header($buf);
# get auth
if( $p->{auth_length} ){
# read gpb
$debug->("reading auth $p->{auth_length}");
my $data = $me->read_data($s, $p->{auth_length}, $timeo);
$p->{auth} = $data;
}
# get data
if( $p->{data_length} ){
# read gpb
$debug->("reading data $p->{data_length}");
my $data = $me->read_data($s, $p->{data_length}, $timeo);
$p->{data} = $data;
}
# get content
if( $p->{content_length} ){
$debug->("reading content $p->{content_length}");
my $data = $me->read_data($s, $p->{content_length}, $timeo);
$p->{content} = $data;
}
return $p;
}
1;
view all matches for this distributionview release on metacpan - search on metacpan
( run in 0.673 second using v1.00-cache-2.02-grep-82fe00e-cpan-cec75d87357c )