AnyEvent-Handle-UDP
view release on metacpan or search on metacpan
lib/AnyEvent/Handle/UDP.pm view on Meta::CPAN
}
for my $name (qw/on_recv on_error on_timeout on_rtimeout on_wtimeout autoflush receive_size/) {
_insert($name, sub {
my $self = shift;
$self->{$name} = shift if @_;
return $self->{$name};
});
}
for my $name (qw/sockname peername/) {
_insert($name, sub {
my $self = shift;
return $self->{fh}->$name;
});
}
sub fh {
my $self = shift;
return $self->{fh};
}
sub on_drain {
my $self = shift;
if (@_) {
$self->{on_drain} = shift;
$self->_drained if not @{ $self->{buffers} };
}
return $self->{on_drain};
}
sub _drained {
my $self = shift;
$self->{on_drain}->($self) if defined $self->{on_drain};
}
for my $dir ('', 'r', 'w') {
my $timeout = "${dir}timeout";
my $activity = "${dir}activity";
my $on_timeout = "on_$timeout";
my $timer = "${dir}timer";
my $clear_timeout = "clear_$timeout";
my $timeout_reset = "${timeout}_reset";
my $callback;
$callback = sub {
my $self = shift;
if (not exists $self->{$timeout} or not $self->{fh}) {
delete $self->{$timer};
return;
}
my $now = AE::now;
my $after = $self->{$activity} + $self->{$timeout} - $now;
if ($after <= 0) {
$self->{$activity} = $now;
my $time = $self->{$on_timeout};
my $error = do { local $! = Errno::ETIMEDOUT; "$!" };
$time ? $time->($self) : $self->_error(0, $error);
return if not exists $self->{$timeout};
}
Scalar::Util::weaken($self);
return if not $self;
$self->{$timer} = AE::timer($after, 0, sub {
delete $self->{$timer};
$callback->($self);
});
};
_insert($timeout, sub {
my $self = shift;
if (@_) {
my $value = shift;
$self->{$timeout} = $value;
if ($value == 0) {
delete $self->{$timer};
delete $self->{$timeout};
return;
}
else {
$callback->($self);
}
}
return $self->{$timeout};
});
_insert($clear_timeout, sub {
my $self = shift;
delete $self->{$timeout};
return;
});
_insert($timeout_reset, sub {
my $self = shift;
$self->{$activity} = AE::now;
});
}
sub bind_to {
my ($self, $addr) = @_;
return $self->_bind_to($self->{fh}, $addr);
}
my $add_reader = sub {
my $self = shift;
$self->{reader} = AE::io($self->{fh}, 0, sub {
while (exists $self->{reader} and defined (my $addr = recv $self->{fh}, my ($buffer), $self->{receive_size}, 0)) {
$self->timeout_reset;
$self->rtimeout_reset;
$self->{on_recv}->($buffer, $self, $addr);
}
$self->_error(1, "Couldn't recv: $!") if not $non_fatal{$! + 0};
return;
});
};
sub _bind_to {
my ($self, $fh, $addr) = @_;
my $bind_to = sub {
my ($domain, $type, $proto, $sockaddr) = @_;
if (!Scalar::Util::openhandle($fh)) {
socket $fh, $domain, $type, $proto or die "Could not create socket: $!";
( run in 2.603 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )