IO-EventMux

 view release on metacpan or  search on metacpan

lib/IO/EventMux.pm  view on Meta::CPAN

        *handles = *_eventloop_handles_select;
        %eventloopvars = (    
            readfh        => IO::Select->new(),
            writefh       => IO::Select->new(),
        ); 
    }

    return bless {
        %eventloopvars,

        # GLOBAL
        auto_accept   => 1,
        auto_write    => 1,
        auto_read     => 1,
        auto_close    => 1,
        errors        => 0,
        read_size     => 65536,
        
        fhs           => { },
        sessions      => { },
        listenfh      => { },
        
        events        => [ ],
        actionq       => [ ],
        
        # FH only
        return_last   => 0,
        type          => 'stream',
        class         => 'socket',

    }, $class;
}

=head2 B<mux([$timeout])>

This method will block until ether an event occurs on one of the file handles
or the $timeout (floating point seconds) expires.  If the $timeout argument is
not present, it waits forever.  If $timeout is 0, it returns immediately.

The return value is always a hash, which always has the key 'type', indicating
what kind it is.  It will also usually carry the 'fh' key, indicating what file
handle the event happened on.

The 'type' key can have the following values:

=over

=item timeout

Nothing happened and timeout occurred.

=item error

An error occurred in connection with the file handle, such as 
"connection refused", etc.

=item accepted

A new client connected to a listening socket and the connection was accepted by
EventMux. The listening socket file handle is in the 'parent_fh' key. If the 
file handle is a unix domain socket the credentials of the user connection will be available in the keys; 'pid', 'uid' and 'gid'. 

=item ready 

A file handle is ready to be written to, this can be use full when working with
nonblocking connects so you know when the remote connection accepted the
connection.

=item accepting

A new client is trying to connect to a listening socket, but the user code must
call accept manually.  This only happens when the ManualAccept option is
set.

=item read

A socket has incoming data.  If the socket's Buffered option is set, this
will be what the buffering rule define.

The data is contained in the 'data' key of the event hash.  If recv() 
returned a sender address, it is contained in the 'sender' key and must be 
manually unpacked according to the socket domain, e.g. with 
C<Socket::unpack_sockaddr_in()>.

=item read_last

A socket last data before it was closed did not match the buffering rules, as defined by the IO::Buffered type given. he read_last type contains the result of a call to C<read_last()> on the chosen buffer type.

The default is not to return read_last and if no buffer is set read will contain this information.

=item sent

A socket has sent all the data in it's queue with the send call. This however
does not indicate that the data has reached the other end, normally only that
the data has reached the local buffer of the kernel.

=item closing

A file handle was detected to be have been closed by the other end or the file 
handle was set to be closed by the user. So EventMux stooped listening for 
events on this file handle. Event data like 'Meta' is still accessible.

The 'missing' key indicates the amount of data or packets left in the user 
space buffer when the file handle was closed. This does not indicate the amount
of data received by the other end, only that the user space buffer left. 

=item closed

A socket/pipe was disconnected/closed, the file descriptor, all internal 
references, and data store with the file handle was removed.

=item can_write

The ManualWrite option is set for the file handle, and C<select()> has
indicated that the handle can be written to.

=item can_read

The ManualRead option is set for the file handle, and C<select()> has
indicated that the handle can be read from.

lib/IO/EventMux.pm  view on Meta::CPAN

    } elsif($@ =~ /Cannot determine peer address/ and $cfg->{ready} == 0) {
        # To soon to send data, retry when we get a ready
        return;

    } elsif($@) {
        $self->push_event({ type => 'error', error => "$@", fh => $fh });   
        return;

    } elsif ($rv < length $cfg->{outbuffer}) {
        # only part of the data was sent
        substr($cfg->{outbuffer}, 0, $rv) = '';
        _eventloop_add($self, "writefh", $fh);
        
    } else {
        # all pending data was sent
        $cfg->{outbuffer} = '';
        _eventloop_remove($self, "writefh", $fh);

        if($cfg->{ready} == 0) {
            $cfg->{ready} = 1;
            $self->push_event({ type => 'ready', fh => $fh });
        }
        
        $self->push_event({type => 'sent', fh => $fh});
    }


    return $rv;
}

=head2 B<push_event($event)> 

Push event on queue

=cut

sub push_event {
    my($self, @events) = @_;
    push(@{$self->{events}}, @events);
}

=head2 B<nonblock($fh)> 

Puts socket into nonblocking mode.

=cut

sub nonblock {
    my $socket = $_[1];

    my $flags = fcntl($socket, F_GETFL, 0)
        or die "Can't get flags for socket: $!\n";
    if (not $flags & O_NONBLOCK) {
        fcntl($socket, F_SETFL, $flags | O_NONBLOCK)
            or die "Can't make socket nonblocking: $!\n";
    }
}

=head2 B<socket_creds($fh)>

Return credentials on UNIX domain sockets.

=cut

sub socket_creds {
    my ($self, $fh) = @_;
    my %creds;

    # TODO: Support localhost TCP via: /proc/net/tcp
    my $rv = getsockopt($fh, SOL_SOCKET, SO_PEERCRED);
    if(defined $rv) {
        my ($pid, $uid, $gid) = unpack('LLL', $rv);
        %creds = (pid => $pid, uid => $uid, gid => $gid);
    }

    return %creds;
}


=head2 B<socket_type($fh)>

Return socket type.

=cut

sub socket_type {
    my ($self, $fh) = @_;
   
    my $ptype = getsockopt($fh, SOL_SOCKET, SO_TYPE);
    if(defined $ptype) {
        my $type = unpack("S", $ptype);
        if($type == SOCK_STREAM) { # type = 1
            return 'stream';

        } elsif($type == SOCK_DGRAM or $type == SOCK_RAW) { # type = 2,3
            return 'dgram';
        
        } else {
            croak "Unknown socket type: $type";
        }

    } else {
        return;
    }
}


=head2 B<socket_listening($fh)>

Check if the socket is set to listening mode

=cut

sub socket_listening {
    my ($self, $fh) = @_;
    my $listening = getsockopt($fh, SOL_SOCKET, SO_ACCEPTCONN);
    if(defined $listening) {
        return unpack("I", $listening);
    } else {
        return;
    }



( run in 0.647 second using v1.01-cache-2.11-cpan-39bf76dae61 )