AC-DC

 view release on metacpan or  search on metacpan

lib/AC/DC/IO.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-27 10:36 (EDT)
# Function: async multiplexed io
#
# $Id$

# callbacks:
#  readable
#  writeable
#  write_buffer_empty
#  timeout
#  error	=> shut()
#  shutdown

package AC::DC::IO;

use AC::DC::Debug 'io';
use AC::DC::IO::TCP;
use AC::DC::IO::UDP;
use AC::DC::IO::Forked;
use AC::DC::Callback;
use AC::DC::Sched;
use Time::HiRes 'time';
use Socket;
use Fcntl;
use POSIX;
use bytes;
use strict;

my $BUFSIZ = 8192;

my $maxfn  = 0;
my $rvec   = "\0\0\0\0";
my $wvec   = "\0\0\0\0";
my @fileno;
my @timeout;
my $exitrequested = 0;


sub import {
    my $pkg   = shift;
    my $param = shift;

    # import a stats monitor?
    if( $param && $param->{monitor} ){
        *add_idle = \&{ $param->{monitor} .'::add_idle' };
    }
}

sub underway {
    return $maxfn;
}

sub closeall {

    for my $x (@fileno){
        close($x->{fd}) if $x && $x->{fd};
    }
}

sub _cleanup {

    for my $f (@fileno){
        next unless $f;
        $f->shut();
    }
    @fileno = ();
    @timeout = ();
}

sub report {

lib/AC/DC/IO.pm  view on Meta::CPAN

}

sub timeout_rel {
    my $me = shift;
    my $to = shift;

    $to += $^T if $to;
    $me->timeout_abs( $to );
}

sub clear_timeout {
    my $me = shift;

    delete $me->{_timeout};
    @timeout = grep { $_ != $me } @timeout;
    return ;
}

################################################################
# buffered writing

sub write {
    my $me   = shift;
    my $data = shift;

    $me->{_wbuffer} .= $data;
    $me->wantwrite(1);
}

sub write_and_shut {
    my $me = shift;

    $me->write(@_);
    $me->set_callback('write_buffer_empty', \&shut);
}


sub _writable {
    my $me = shift;

    return $me->run_callback('writeable', undef) unless $me->{_wbuffer};

    my $len = length($me->{_wbuffer});
    my $bs = $me->{wbufsize} || $BUFSIZ;
    $len = $bs if $len > $bs;
    my $buf = substr($me->{_wbuffer}, 0, $len);
    my $i = syswrite( $me->{fd}, $buf );

    if( defined $i ){
        # debug("wrote $i bytes to $me->{info}");
        substr($me->{_wbuffer}, 0, $i) = '';
        if( length($me->{_wbuffer}) ){
            $me->timeout_rel( $me->{writebuf_timeout} ) if $me->{writebuf_timeout};
        }else{
            $me->wantwrite(0);
            $me->run_callback('write_buffer_empty', undef);
        }
    }else{
        my $e = $!;
        debug( "write failed ($e) for $me->{info}");
        $me->run_callback('error', {
            cause	=> 'write',
            error	=> $e,
        });
        $me->shut();
    }
}

################################################################

sub _readable {
    my $me = shift;

    $me->run_callback('readable', undef);
}

sub _timeout {
    my $me = shift;

    debug("io - timeout $me->{info}");
    $me->run_callback('timeout', undef);
}

################################################################

sub _setnbio {
    my $me = shift;

    my $fd = $me->{fd};
    fcntl($fd, F_SETFL, O_NDELAY);
}

################################################################

sub _oneloop {

    my $t0 = time();
    $^T = $t0;
    my $r = $rvec;
    my $w = $wvec;

    my $t;
    if( @timeout ){
        $t = $timeout[0]{_timeout} - $^T;
        $t = 0 if $t < 0;
    }

    my $i = select($r, $w, undef, $t);

    if( $i == -1 ){
        return if $! == EINTR;
        fatal( "select failed: $!" );
    }

    my $t1 = time();
    $^T = $t1;

    # dispatch
    for my $n (0 .. $maxfn){
        if( vec($r, $n, 1) && vec($rvec, $n, 1) ){
            my $x = $fileno[$n];
            # debug("fileno $n ($x->{info}) is readable");
            $x->_readable();



( run in 0.716 second using v1.01-cache-2.11-cpan-39bf76dae61 )