AnyEvent-Handle-UDP
view release on metacpan or search on metacpan
lib/AnyEvent/Handle/UDP.pm view on Meta::CPAN
on_connect => $args{on_connect} || sub {},
fh => $args{fh} || bless(Symbol::gensym(), 'IO::Socket'),
buffers => [],
}, $class;
$self->{$_} = $args{$_} for grep { exists $args{$_} } qw/on_drain on_error on_timeout on_rtimeout on_wtimeout/;
$self->{$_} = AE::now() for qw/activity ractivity wactivity/;
$self->_bind_to($self->{fh}, $args{bind}) if exists $args{bind};
$self->_connect_to($self->{fh}, $args{connect}) if exists $args{connect};
$self->$_($args{$_}) for grep { exists $args{$_} } qw/timeout rtimeout wtimeout/;
$self->_drained;
return $self;
}
sub _insert {
my ($name, $sub) = @_;
no strict 'refs';
*{$name} = $subname->($name, $sub);
}
for my $name (qw/on_recv on_error on_timeout on_rtimeout on_wtimeout autoflush receive_size/) {
_insert($name, sub {
my $self = shift;
$self->{$name} = shift if @_;
return $self->{$name};
});
}
for my $name (qw/sockname peername/) {
_insert($name, sub {
my $self = shift;
return $self->{fh}->$name;
});
}
sub fh {
my $self = shift;
return $self->{fh};
}
sub on_drain {
my $self = shift;
if (@_) {
$self->{on_drain} = shift;
$self->_drained if not @{ $self->{buffers} };
}
return $self->{on_drain};
}
sub _drained {
my $self = shift;
$self->{on_drain}->($self) if defined $self->{on_drain};
}
for my $dir ('', 'r', 'w') {
my $timeout = "${dir}timeout";
my $activity = "${dir}activity";
my $on_timeout = "on_$timeout";
my $timer = "${dir}timer";
my $clear_timeout = "clear_$timeout";
my $timeout_reset = "${timeout}_reset";
my $callback;
$callback = sub {
my $self = shift;
if (not exists $self->{$timeout} or not $self->{fh}) {
delete $self->{$timer};
return;
}
my $now = AE::now;
my $after = $self->{$activity} + $self->{$timeout} - $now;
if ($after <= 0) {
$self->{$activity} = $now;
my $time = $self->{$on_timeout};
my $error = do { local $! = Errno::ETIMEDOUT; "$!" };
$time ? $time->($self) : $self->_error(0, $error);
return if not exists $self->{$timeout};
}
Scalar::Util::weaken($self);
return if not $self;
$self->{$timer} = AE::timer($after, 0, sub {
delete $self->{$timer};
$callback->($self);
});
};
_insert($timeout, sub {
my $self = shift;
if (@_) {
my $value = shift;
$self->{$timeout} = $value;
if ($value == 0) {
delete $self->{$timer};
delete $self->{$timeout};
return;
}
else {
$callback->($self);
}
}
return $self->{$timeout};
});
_insert($clear_timeout, sub {
my $self = shift;
delete $self->{$timeout};
return;
});
_insert($timeout_reset, sub {
my $self = shift;
$self->{$activity} = AE::now;
});
}
sub bind_to {
my ($self, $addr) = @_;
return $self->_bind_to($self->{fh}, $addr);
}
my $add_reader = sub {
my $self = shift;
$self->{reader} = AE::io($self->{fh}, 0, sub {
while (exists $self->{reader} and defined (my $addr = recv $self->{fh}, my ($buffer), $self->{receive_size}, 0)) {
$self->timeout_reset;
$self->rtimeout_reset;
$self->{on_recv}->($buffer, $self, $addr);
}
$self->_error(1, "Couldn't recv: $!") if not $non_fatal{$! + 0};
return;
});
};
sub _bind_to {
my ($self, $fh, $addr) = @_;
my $bind_to = sub {
my ($domain, $type, $proto, $sockaddr) = @_;
if (!Scalar::Util::openhandle($fh)) {
socket $fh, $domain, $type, $proto or die "Could not create socket: $!";
AnyEvent::Util::fh_nonblocking $fh, 1;
setsockopt $fh, Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 or die "Couldn't set so_reuseaddr: $!" if $self->{reuse_addr};
$add_reader->($self);
}
if (bind $fh, $sockaddr) {
$self->{on_bind}->();
}
else {
die "Could not bind: $!";
}
};
if (ref $addr) {
my ($host, $port) = @{$addr};
_on_addr($self, $fh, $host, $port, $bind_to);
( run in 2.156 seconds using v1.01-cache-2.11-cpan-e1769b4cff6 )