AC-DC
view release on metacpan - search on metacpan
view release on metacpan or search on metacpan
lib/AC/DC/IO.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-27 10:36 (EDT)
# Function: async multiplexed io
#
# $Id$
# callbacks:
# readable
# writeable
# write_buffer_empty
# timeout
# error => shut()
# shutdown
package AC::DC::IO;
use AC::DC::Debug 'io';
use AC::DC::IO::TCP;
use AC::DC::IO::UDP;
use AC::DC::IO::Forked;
use AC::DC::Callback;
use AC::DC::Sched;
use Time::HiRes 'time';
use Socket;
use Fcntl;
use POSIX;
use bytes;
use strict;
my $BUFSIZ = 8192;
my $maxfn = 0;
my $rvec = "\0\0\0\0";
my $wvec = "\0\0\0\0";
my @fileno;
my @timeout;
my $exitrequested = 0;
sub import {
my $pkg = shift;
my $param = shift;
# import a stats monitor?
if( $param && $param->{monitor} ){
*add_idle = \&{ $param->{monitor} .'::add_idle' };
}
}
sub underway {
return $maxfn;
}
sub closeall {
for my $x (@fileno){
close($x->{fd}) if $x && $x->{fd};
}
}
sub _cleanup {
for my $f (@fileno){
next unless $f;
$f->shut();
}
@fileno = ();
@timeout = ();
}
sub report {
my $txt;
for my $x (@fileno){
$txt .= fileno($x->{fd}) . "\t$x->{info}\n";
}
return $txt;
}
sub request_exit { $exitrequested = 1 }
sub init {
my $me = shift;
my $fd = shift;
$me->{fd} = $fd;
$me->_setnbio();
my $n = fileno($fd);
$fileno[ $n ] = $me;
$maxfn = $n if $n > $maxfn;
debug("init io fileno $n (/$maxfn) - $me->{info}");
}
sub shut {
my $me = shift;
$me->clear_timeout();
my $fd = $me->{fd};
return unless $fd;
my $n = fileno($fd);
debug("shutting down fileno $n $me->{info}");
$me->wantread(0);
$me->wantwrite(0);
delete $me->{fd};
close $fd;
$fileno[$n] = undef;
$me->run_callback('shutdown', undef);
delete $me->{_callback};
if( $n >= $maxfn ){
while( $maxfn && !$fileno[$maxfn] ){ $maxfn -- }
}
}
sub wantread {
my $me = shift;
my $v = shift;
return unless defined $me->{fd};
$me->{_wantread} = $v;
my $n = fileno($me->{fd});
vec($rvec,$n,1) = $v ? 1 : 0;
return ;
}
sub wantwrite {
my $me = shift;
my $v = shift;
return unless defined $me->{fd};
$me->{_wantwrite} = $v;
my $n = fileno($me->{fd});
vec($wvec,$n,1) = $v ? 1 : 0;
return ;
}
sub timeout_abs {
my $me = shift;
my $t = shift;
$me->clear_timeout() if $me->{_timeout};
return unless $t;
$me->{_timeout} = $t;
my $i = 0;
foreach my $x (@timeout){
last if $x && $x->{_timeout} > $t;
$i++;
}
splice @timeout, $i, 0, $me;
return ;
}
sub timeout_rel {
my $me = shift;
my $to = shift;
$to += $^T if $to;
$me->timeout_abs( $to );
}
sub clear_timeout {
my $me = shift;
delete $me->{_timeout};
@timeout = grep { $_ != $me } @timeout;
return ;
}
################################################################
# buffered writing
sub write {
my $me = shift;
my $data = shift;
$me->{_wbuffer} .= $data;
$me->wantwrite(1);
}
sub write_and_shut {
my $me = shift;
$me->write(@_);
$me->set_callback('write_buffer_empty', \&shut);
}
sub _writable {
my $me = shift;
return $me->run_callback('writeable', undef) unless $me->{_wbuffer};
my $len = length($me->{_wbuffer});
my $bs = $me->{wbufsize} || $BUFSIZ;
$len = $bs if $len > $bs;
my $buf = substr($me->{_wbuffer}, 0, $len);
my $i = syswrite( $me->{fd}, $buf );
if( defined $i ){
# debug("wrote $i bytes to $me->{info}");
substr($me->{_wbuffer}, 0, $i) = '';
if( length($me->{_wbuffer}) ){
$me->timeout_rel( $me->{writebuf_timeout} ) if $me->{writebuf_timeout};
}else{
$me->wantwrite(0);
$me->run_callback('write_buffer_empty', undef);
}
}else{
my $e = $!;
debug( "write failed ($e) for $me->{info}");
$me->run_callback('error', {
cause => 'write',
error => $e,
});
$me->shut();
}
}
################################################################
sub _readable {
my $me = shift;
$me->run_callback('readable', undef);
}
sub _timeout {
my $me = shift;
debug("io - timeout $me->{info}");
$me->run_callback('timeout', undef);
}
################################################################
sub _setnbio {
my $me = shift;
my $fd = $me->{fd};
fcntl($fd, F_SETFL, O_NDELAY);
}
################################################################
sub _oneloop {
my $t0 = time();
$^T = $t0;
my $r = $rvec;
my $w = $wvec;
my $t;
if( @timeout ){
$t = $timeout[0]{_timeout} - $^T;
$t = 0 if $t < 0;
}
my $i = select($r, $w, undef, $t);
if( $i == -1 ){
return if $! == EINTR;
fatal( "select failed: $!" );
}
my $t1 = time();
$^T = $t1;
# dispatch
for my $n (0 .. $maxfn){
if( vec($r, $n, 1) && vec($rvec, $n, 1) ){
my $x = $fileno[$n];
# debug("fileno $n ($x->{info}) is readable");
$x->_readable();
}
if( vec($w, $n, 1) && vec($wvec, $n, 1) ){
my $x = $fileno[$n];
# debug("fileno $n ($x->{info}) is writeable");
$x->_writable();
}
}
# timeouts
while(@timeout && $timeout[0]{_timeout} <= $^T){
my $x = shift @timeout;
debug("timed out $x->{info}");
delete $x->{_timeout};
$x->_timeout();
}
my $t2 = time();
# track idle/busy time
# debug("add idle? $t0, $t1, $t2 " . (defined &add_idle ? 'f' : '!'));
add_idle( $t1 - $t0, $t2 - $t0 ) if defined &add_idle;
}
sub mainloop {
while(1){
_oneloop();
last if $exitrequested;
}
_cleanup();
}
1;
view all matches for this distributionview release on metacpan - search on metacpan
( run in 1.245 second using v1.00-cache-2.02-grep-82fe00e-cpan-cec75d87357c )