Danga-Socket

 view release on metacpan or  search on metacpan

lib/Danga/Socket.pm  view on Meta::CPAN


  package My::Socket
  use Danga::Socket;
  use base ('Danga::Socket');
  use fields ('my_attribute');

  sub new {
     my My::Socket $self = shift;
     $self = fields::new($self) unless ref $self;
     $self->SUPER::new( @_ );

     $self->{my_attribute} = 1234;
     return $self;
  }

  sub event_err { ... }
  sub event_hup { ... }
  sub event_write { ... }
  sub event_read { ... }
  sub close { ... }

  $my_sock->tcp_cork($bool);

  # write returns 1 if all writes have gone through, or 0 if there
  # are writes in queue
  $my_sock->write($scalar);
  $my_sock->write($scalarref);
  $my_sock->write(sub { ... });  # run when previous data written
  $my_sock->write(undef);        # kick-starts

  # read max $bytecount bytes, or undef on connection closed
  $scalar_ref = $my_sock->read($bytecount);

  # watch for writability.  not needed with ->write().  write()
  # will automatically turn on watch_write when you wrote too much
  # and turn it off when done
  $my_sock->watch_write($bool);

  # watch for readability
  $my_sock->watch_read($bool);

  # if you read too much and want to push some back on
  # readable queue.  (not incredibly well-tested)
  $my_sock->push_back_read($buf); # scalar or scalar ref

  Danga::Socket->AddOtherFds(..);
  Danga::Socket->SetLoopTimeout($millisecs);
  Danga::Socket->DescriptorMap();
  Danga::Socket->WatchedSockets();  # count of DescriptorMap keys
  Danga::Socket->SetPostLoopCallback($code);
  Danga::Socket->EventLoop();

=head1 DESCRIPTION

This is an abstract base class for objects backed by a socket which
provides the basic framework for event-driven asynchronous IO,
designed to be fast.  Danga::Socket is both a base class for objects,
and an event loop.

Callers subclass Danga::Socket.  Danga::Socket's constructor registers
itself with the Danga::Socket event loop, and invokes callbacks on the
object for readability, writability, errors, and other conditions.

Because Danga::Socket uses the "fields" module, your subclasses must
too.

=head1 MORE INFO

For now, see servers using Danga::Socket for guidance.  For example:
perlbal, mogilefsd, or ddlockd.

=head1 API

Note where "C<CLASS>" is used below, normally you would call these methods as:

  Danga::Socket->method(...);

However using a subclass works too.

The CLASS methods are all methods for the event loop part of Danga::Socket,
whereas the object methods are all used on your subclasses.

=cut

###########################################################################

package Danga::Socket;
use strict;
use bytes;
use POSIX ();
use Time::HiRes ();
use IO::Handle qw();
use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);

my $opt_bsd_resource = eval "use BSD::Resource; 1;";

use vars qw{$VERSION};
$VERSION = "1.62";

use warnings;
no  warnings qw(deprecated);

use Sys::Syscall qw(:epoll);

use fields ('sock',              # underlying socket
            'fd',                # numeric file descriptor
            'write_buf',         # arrayref of scalars, scalarrefs, or coderefs to write
            'write_buf_offset',  # offset into first array of write_buf to start writing at
            'write_buf_size',    # total length of data in all write_buf items
            'write_set_watch',   # bool: true if we internally set watch_write rather than by a subclass
            'read_push_back',    # arrayref of "pushed-back" read data the application didn't want
            'closed',            # bool: socket is closed
            'corked',            # bool: socket is corked
            'event_watch',       # bitmask of events the client is interested in (POLLIN,OUT,etc.)
            'peer_v6',           # bool: cached; if peer is an IPv6 address
            'peer_ip',           # cached stringified IP address of $sock
            'peer_port',         # cached port number of $sock
            'local_v6',          # bool: cached; if local IP address is an IPv6 address
            'local_ip',          # cached stringified IP address of local end of $sock
            'local_port',        # cached port number of local end of $sock
            'writer_func',       # subref which does writing.  must return bytes written (or undef) and set $! on errors

lib/Danga/Socket.pm  view on Meta::CPAN

    my ($class, $ref) = @_;

    if (ref $class) {
        # per-object callback
        my Danga::Socket $self = $class;
        if (defined $ref && ref $ref eq 'CODE') {
            $PLCMap{$self->{fd}} = $ref;
        } else {
            delete $PLCMap{$self->{fd}};
        }
    } else {
        # global callback
        $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
    }
}

# Internal function: run the post-event callback, send read events
# for pushed-back data, and close pending connections.  returns 1
# if event loop should continue, or 0 to shut it all down.
sub PostEventLoop {
    # fire read events for objects with pushed-back read data
    my $loop = 1;
    while ($loop) {
        $loop = 0;
        foreach my $fd (keys %PushBackSet) {
            my Danga::Socket $pob = $PushBackSet{$fd};

            # a previous event_read invocation could've closed a
            # connection that we already evaluated in "keys
            # %PushBackSet", so skip ones that seem to have
            # disappeared.  this is expected.
            next unless $pob;

            die "ASSERT: the $pob socket has no read_push_back" unless @{$pob->{read_push_back}};
            next unless (! $pob->{closed} &&
                         $pob->{event_watch} & POLLIN);
            $loop = 1;
            $pob->event_read;
        }
    }

    # now we can close sockets that wanted to close during our event processing.
    # (we didn't want to close them during the loop, as we didn't want fd numbers
    #  being reused and confused during the event loop)
    while (my $sock = shift @ToClose) {
        my $fd = fileno($sock);

        # close the socket.  (not a Danga::Socket close)
        $sock->close;

        # and now we can finally remove the fd from the map.  see
        # comment above in _cleanup.
        delete $DescriptorMap{$fd};
    }


    # by default we keep running, unless a postloop callback (either per-object
    # or global) cancels it
    my $keep_running = 1;

    # per-object post-loop-callbacks
    for my $plc (values %PLCMap) {
        $keep_running &&= $plc->(\%DescriptorMap, \%OtherFds);
    }

    # now we're at the very end, call callback if defined
    if (defined $PostLoopCallback) {
        $keep_running &&= $PostLoopCallback->(\%DescriptorMap, \%OtherFds);
    }

    return $keep_running;
}

#####################################################################
### Danga::Socket-the-object code
#####################################################################

=head2 OBJECT METHODS

=head2 C<< CLASS->new( $socket ) >>

Create a new Danga::Socket subclass object for the given I<socket> which will
react to events on it during the C<EventLoop>.

This is normally (always?) called from your subclass via:

  $class->SUPER::new($socket);

=cut
sub new {
    my Danga::Socket $self = shift;
    $self = fields::new($self) unless ref $self;

    my $sock = shift;

    $self->{sock}        = $sock;
    my $fd = fileno($sock);

    Carp::cluck("undef sock and/or fd in Danga::Socket->new.  sock=" . ($sock || "") . ", fd=" . ($fd || ""))
        unless $sock && $fd;

    $self->{fd}          = $fd;
    $self->{write_buf}      = [];
    $self->{write_buf_offset} = 0;
    $self->{write_buf_size} = 0;
    $self->{closed} = 0;
    $self->{corked} = 0;
    $self->{read_push_back} = [];

    $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;

    _InitPoller();

    if ($HaveEpoll) {
        epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $self->{event_watch})
            and die "couldn't add epoll watch for $fd\n";
    }
    elsif ($HaveKQueue) {
        # Add them to the queue but disabled for now
        $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
                        IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());



( run in 0.882 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )