Danga-Socket
view release on metacpan or search on metacpan
lib/Danga/Socket.pm view on Meta::CPAN
package My::Socket
use Danga::Socket;
use base ('Danga::Socket');
use fields ('my_attribute');
sub new {
my My::Socket $self = shift;
$self = fields::new($self) unless ref $self;
$self->SUPER::new( @_ );
$self->{my_attribute} = 1234;
return $self;
}
sub event_err { ... }
sub event_hup { ... }
sub event_write { ... }
sub event_read { ... }
sub close { ... }
$my_sock->tcp_cork($bool);
# write returns 1 if all writes have gone through, or 0 if there
# are writes in queue
$my_sock->write($scalar);
$my_sock->write($scalarref);
$my_sock->write(sub { ... }); # run when previous data written
$my_sock->write(undef); # kick-starts
# read max $bytecount bytes, or undef on connection closed
$scalar_ref = $my_sock->read($bytecount);
# watch for writability. not needed with ->write(). write()
# will automatically turn on watch_write when you wrote too much
# and turn it off when done
$my_sock->watch_write($bool);
# watch for readability
$my_sock->watch_read($bool);
# if you read too much and want to push some back on
# readable queue. (not incredibly well-tested)
$my_sock->push_back_read($buf); # scalar or scalar ref
Danga::Socket->AddOtherFds(..);
Danga::Socket->SetLoopTimeout($millisecs);
Danga::Socket->DescriptorMap();
Danga::Socket->WatchedSockets(); # count of DescriptorMap keys
Danga::Socket->SetPostLoopCallback($code);
Danga::Socket->EventLoop();
=head1 DESCRIPTION
This is an abstract base class for objects backed by a socket which
provides the basic framework for event-driven asynchronous IO,
designed to be fast. Danga::Socket is both a base class for objects,
and an event loop.
Callers subclass Danga::Socket. Danga::Socket's constructor registers
itself with the Danga::Socket event loop, and invokes callbacks on the
object for readability, writability, errors, and other conditions.
Because Danga::Socket uses the "fields" module, your subclasses must
too.
=head1 MORE INFO
For now, see servers using Danga::Socket for guidance. For example:
perlbal, mogilefsd, or ddlockd.
=head1 API
Note where "C<CLASS>" is used below, normally you would call these methods as:
Danga::Socket->method(...);
However using a subclass works too.
The CLASS methods are all methods for the event loop part of Danga::Socket,
whereas the object methods are all used on your subclasses.
=cut
###########################################################################
package Danga::Socket;
use strict;
use bytes;
use POSIX ();
use Time::HiRes ();
use IO::Handle qw();
use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
my $opt_bsd_resource = eval "use BSD::Resource; 1;";
use vars qw{$VERSION};
$VERSION = "1.62";
use warnings;
no warnings qw(deprecated);
use Sys::Syscall qw(:epoll);
use fields ('sock', # underlying socket
'fd', # numeric file descriptor
'write_buf', # arrayref of scalars, scalarrefs, or coderefs to write
'write_buf_offset', # offset into first array of write_buf to start writing at
'write_buf_size', # total length of data in all write_buf items
'write_set_watch', # bool: true if we internally set watch_write rather than by a subclass
'read_push_back', # arrayref of "pushed-back" read data the application didn't want
'closed', # bool: socket is closed
'corked', # bool: socket is corked
'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.)
'peer_v6', # bool: cached; if peer is an IPv6 address
'peer_ip', # cached stringified IP address of $sock
'peer_port', # cached port number of $sock
'local_v6', # bool: cached; if local IP address is an IPv6 address
'local_ip', # cached stringified IP address of local end of $sock
'local_port', # cached port number of local end of $sock
'writer_func', # subref which does writing. must return bytes written (or undef) and set $! on errors
lib/Danga/Socket.pm view on Meta::CPAN
my ($class, $ref) = @_;
if (ref $class) {
# per-object callback
my Danga::Socket $self = $class;
if (defined $ref && ref $ref eq 'CODE') {
$PLCMap{$self->{fd}} = $ref;
} else {
delete $PLCMap{$self->{fd}};
}
} else {
# global callback
$PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
}
}
# Internal function: run the post-event callback, send read events
# for pushed-back data, and close pending connections. returns 1
# if event loop should continue, or 0 to shut it all down.
sub PostEventLoop {
# fire read events for objects with pushed-back read data
my $loop = 1;
while ($loop) {
$loop = 0;
foreach my $fd (keys %PushBackSet) {
my Danga::Socket $pob = $PushBackSet{$fd};
# a previous event_read invocation could've closed a
# connection that we already evaluated in "keys
# %PushBackSet", so skip ones that seem to have
# disappeared. this is expected.
next unless $pob;
die "ASSERT: the $pob socket has no read_push_back" unless @{$pob->{read_push_back}};
next unless (! $pob->{closed} &&
$pob->{event_watch} & POLLIN);
$loop = 1;
$pob->event_read;
}
}
# now we can close sockets that wanted to close during our event processing.
# (we didn't want to close them during the loop, as we didn't want fd numbers
# being reused and confused during the event loop)
while (my $sock = shift @ToClose) {
my $fd = fileno($sock);
# close the socket. (not a Danga::Socket close)
$sock->close;
# and now we can finally remove the fd from the map. see
# comment above in _cleanup.
delete $DescriptorMap{$fd};
}
# by default we keep running, unless a postloop callback (either per-object
# or global) cancels it
my $keep_running = 1;
# per-object post-loop-callbacks
for my $plc (values %PLCMap) {
$keep_running &&= $plc->(\%DescriptorMap, \%OtherFds);
}
# now we're at the very end, call callback if defined
if (defined $PostLoopCallback) {
$keep_running &&= $PostLoopCallback->(\%DescriptorMap, \%OtherFds);
}
return $keep_running;
}
#####################################################################
### Danga::Socket-the-object code
#####################################################################
=head2 OBJECT METHODS
=head2 C<< CLASS->new( $socket ) >>
Create a new Danga::Socket subclass object for the given I<socket> which will
react to events on it during the C<EventLoop>.
This is normally (always?) called from your subclass via:
$class->SUPER::new($socket);
=cut
sub new {
my Danga::Socket $self = shift;
$self = fields::new($self) unless ref $self;
my $sock = shift;
$self->{sock} = $sock;
my $fd = fileno($sock);
Carp::cluck("undef sock and/or fd in Danga::Socket->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
unless $sock && $fd;
$self->{fd} = $fd;
$self->{write_buf} = [];
$self->{write_buf_offset} = 0;
$self->{write_buf_size} = 0;
$self->{closed} = 0;
$self->{corked} = 0;
$self->{read_push_back} = [];
$self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
_InitPoller();
if ($HaveEpoll) {
epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $self->{event_watch})
and die "couldn't add epoll watch for $fd\n";
}
elsif ($HaveKQueue) {
# Add them to the queue but disabled for now
$KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
( run in 0.882 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )