AC-DC

 view release on metacpan or  search on metacpan

lib/AC/DC/Protocol.pm  view on Meta::CPAN

       );

    # caller needs to add content. (to avoid large copy)
    return $hdr . $apb . $gpb;

}

sub _decode_common {
    my $me    = shift;
    my $how   = shift;
    my $reply = shift;
    my $data  = shift;

    my $mt = $MSGTYPE{ $reply->{type} };
    confess "unknown message type $reply->{type}\n" unless defined $mt;

    return unless $data || $reply->{data};
    my $res = $mt->{$how}->decode( $data || $reply->{data} || '' );
    return $res;
}

sub encode_request {
    my $me = shift;

    return $me->_encode_common( 'reqc', @_ );
}
sub encode_reply {
    my $me = shift;

    return $me->_encode_common( 'resc', @_ );
}

sub decode_request {
    my $me    = shift;

    return $me->_decode_common( 'reqc', @_ );
}

sub decode_reply {
    my $me    = shift;

    return $me->_decode_common( 'resc', @_ );
}

################################################################

sub _try_to_connect {
    my $s  = shift;
    my $sa = shift;
    my $to = shift;

    my $fn = fileno($s);
    my $wfd = "\0\0\0\0";
    vec($wfd, $fn, 1) = 1;

    my $i = connect($s, $sa);
    return 1 if $i;	# connected
    return unless $! == EISCONN || $! == EALREADY || $! == EINPROGRESS;

    # wait until connected or timeout
    my $is = select(undef, $wfd, undef, $to);
    return if $is == -1;
    return 1 if vec($wfd, $fn, 1);
    return;
}

sub connect_to_server {
    my $me    = shift;
    my $ipn   = shift;
    my $port  = shift;
    my $timeo = shift;

    my $s;
    socket($s, PF_INET, SOCK_STREAM, 6) || confess "cannot create socket: $!\n";
    setsockopt($s, Socket::IPPROTO_TCP(), Socket::TCP_NODELAY(), 1);

    # set non-blocking
    my $fl = fcntl($s, F_GETFL, 0);
    fcntl($s, F_SETFL, O_NDELAY);

    my $sa  = sockaddr_in($port, $ipn);
    my $to  = $timeo ? $timeo / 2 : 0.25;

    # try connecting up to 3 times
    for (1..3){
        # print STDERR "connecting\n";
        my $ok = _try_to_connect($s, $sa, $to);

        if( $ok ){
            # reset non-blocking
            fcntl($s, F_SETFL, $fl);
            return $s;
        }
    }

    my $ipa = inet_ntoa($ipn);
    confess "connect failed to $ipa:$port\n";
}

sub write_request {
    my $me    = shift;
    my $s     = shift;
    my $req   = shift;
    my $timeo = shift;

    $timeo ||= 1;

    # set non-blocking
    my $fl = fcntl($s, F_GETFL, 0);
    fcntl($s, F_SETFL, O_NDELAY);
    my $fn = fileno($s);

    my $tlen = length($req);
    my $slen = 0;

    while($tlen){
        my $wfd = "\0\0\0\0";
        vec($wfd, $fn, 1) = 1;
        my $to = $timeo;

        my $si = select(undef, $wfd, undef, $to);
        confess "write data failed: $!\n" if $si == -1;
        confess "write timeout\n" unless vec($wfd, $fn, 1);

        my $l = $tlen > $BUFSIZ ? $BUFSIZ : $tlen;
        my $i = syswrite($s, $req, $l, $slen);
        confess "write failed $!\n" unless $i >= 1;
        $tlen -= $i;
        $slen += $i;
    }

    fcntl($s, F_SETFL, $fl);
    return $slen;

}

sub read_data {
    my $me    = shift;
    my $s     = shift;
    my $size  = shift;
    my $timeo = shift;

    $timeo ||= 1;

    # set non-blocking
    my $fl = fcntl($s, F_GETFL, 0);
    fcntl($s, F_SETFL, O_NDELAY);
    my $fn = fileno($s);

    my $data;
    my $start = time();
    while( my $len = $size - length($data) ){
        $len = $BUFSIZ if $len > $BUFSIZ;
        my $rfd = "\0\0\0\0";
        vec($rfd, $fn, 1) = 1;
        my $to = $start + $timeo - time();
        my $t0 = time();

        my $si = select($rfd, undef, undef, $to);
        next if $si == -1 && $! == EINTR;
        confess "read data failed: $!\n" if $si == -1;
        confess "read timeout " . (time() - $t0) . "\n" unless vec($rfd, $fn, 1);

        my $i = sysread($s, $data, $len, length($data));
        next if !defined($i) && $! == EINTR;
        confess "read failed: connection closed (read " . length($data) . " of $len)\n" if $i == 0;
    }

    fcntl($s, F_SETFL, $fl);
    return $data;
}

################################################################

# stream fd to other fd
# return hash
sub sendfile {
    my $me    = shift;
    my $out   = shift;
    my $in    = shift;
    my $size  = shift;
    my $timeo = shift;

    # NB: sendfile(2) only supports file=>socket + file=>file
    #     not socket=>file, ...
    # RSN - elastic buffering?

    my $sha1 = Digest::SHA1->new();

    while($size){
        my $len = $size > $BUFSIZ ? $BUFSIZ : $size;
        my $buf = $me->read_data($in, $len, $timeo);
        my $i = length $buf;
        confess "read failed: $!\n" unless $i > 0;
        my $w = $me->write_request($out, $buf, $timeo);
        $size -= $i;
        $sha1->add($buf);
    }

    return $sha1->b64digest();
}

sub send_request {
    my $me    = shift;
    my $ipn   = shift;
    my $port  = shift;
    my $req   = shift;
    my $debug = shift;
    my $timeo = shift;

    $debug ||= sub {};
    $timeo ||= 0.5;
    local $SIG{ALRM} = sub{ $debug->("timeout") };

    my $s = $me->connect_to_server($ipn, $port, $timeo);

    # send request
    $debug->("sending request");
    $me->write_request($s, $req, $timeo);



( run in 0.501 second using v1.01-cache-2.11-cpan-39bf76dae61 )