AnyEvent-Handle-UDP
view release on metacpan or search on metacpan
Revision history for AnyEvent-Handle-UDP
0.050 2020-03-26 23:05:21+01:00 Europe/Amsterdam
- Add on_bind and on_connect callbacks
- Allow injecting the socket into the handle
- Allow destroy to be called in on_recv callback
- Fix error handling in address lookup
0.049 2018-05-16 23:53:00+02:00 Europe/Amsterdam
- Remove namespace::clean dependency
0.048 2017-01-12 22:32:02+01:00 Europe/Amsterdam
- Fixed timeout handling in destroy
Fix {,r,w}timeout_reset to actually work
Made it work with old perl+old Socket.pm
Switch from Test::Exception to Test::Fatal
0.037 2012-11-15 00:29:31 Europe/Amsterdam
Handle absence of IPv6 better (hopefully)
0.036 2012-06-06 19:23:18 Europe/Amsterdam
Make fh an IO::Socket object
Don't insist on port number in tests
Refactor bind/connect logic
0.035 2012-05-02 23:15:17 Europe/Amsterdam
Make dependency on Sub::Name optional
Stop depending on Const::Fast
0.034 2012-04-30 23:42:57 Europe/Amsterdam
Make push_send accept an address array too
Added support for timeouts
Make sure buffers is predefined
examples/anyevent-udp-server.pl view on Meta::CPAN
use strict;
use warnings;
use AnyEvent::Handle::UDP;
use AnyEvent::Log ();
# Default for AnyEvent is to log nothing
$AnyEvent::Log::FILTER->level('debug');
# AE::Handle::UDP does all for us:
# be sure to use the "bind" option!
my $udp_server = AnyEvent::Handle::UDP->new(
# Bind to this host and port
bind => ['0.0.0.0', 4000],
# AnyEvent will run this callback when getting some input
on_recv => sub {
my ($data, $ae_handle, $client_addr) = @_;
chomp $data;
AE::log warn => "Received '$data' (handle: $ae_handle)";
# Send back the command echoed to the client who contacted us
$ae_handle->push_send("echo [$data]\n", $client_addr);
}
);
lib/AnyEvent/Handle/UDP.pm view on Meta::CPAN
my %non_fatal = map { ( $_ => 1 ) } Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR;
sub new {
my ($class, %args) = @_;
my $self = bless {
on_recv => $args{on_recv} || Carp::croak('on_recv not given'),
reuse_addr => exists $args{reuse_addr} ? !!$args{reuse_addr} : 1,
receive_size => $args{receive_size} || 1500,
family => $args{family} || 0,
autoflush => $args{autoflush} || 0,
on_bind => $args{on_bind} || sub {},
on_connect => $args{on_connect} || sub {},
fh => $args{fh} || bless(Symbol::gensym(), 'IO::Socket'),
buffers => [],
}, $class;
$self->{$_} = $args{$_} for grep { exists $args{$_} } qw/on_drain on_error on_timeout on_rtimeout on_wtimeout/;
$self->{$_} = AE::now() for qw/activity ractivity wactivity/;
$self->_bind_to($self->{fh}, $args{bind}) if exists $args{bind};
$self->_connect_to($self->{fh}, $args{connect}) if exists $args{connect};
$self->$_($args{$_}) for grep { exists $args{$_} } qw/timeout rtimeout wtimeout/;
$self->_drained;
return $self;
}
sub _insert {
my ($name, $sub) = @_;
lib/AnyEvent/Handle/UDP.pm view on Meta::CPAN
delete $self->{$timeout};
return;
});
_insert($timeout_reset, sub {
my $self = shift;
$self->{$activity} = AE::now;
});
}
sub bind_to {
my ($self, $addr) = @_;
return $self->_bind_to($self->{fh}, $addr);
}
my $add_reader = sub {
my $self = shift;
$self->{reader} = AE::io($self->{fh}, 0, sub {
while (exists $self->{reader} and defined (my $addr = recv $self->{fh}, my ($buffer), $self->{receive_size}, 0)) {
$self->timeout_reset;
$self->rtimeout_reset;
$self->{on_recv}->($buffer, $self, $addr);
}
$self->_error(1, "Couldn't recv: $!") if not $non_fatal{$! + 0};
return;
});
};
sub _bind_to {
my ($self, $fh, $addr) = @_;
my $bind_to = sub {
my ($domain, $type, $proto, $sockaddr) = @_;
if (!Scalar::Util::openhandle($fh)) {
socket $fh, $domain, $type, $proto or die "Could not create socket: $!";
AnyEvent::Util::fh_nonblocking $fh, 1;
setsockopt $fh, Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 or die "Couldn't set so_reuseaddr: $!" if $self->{reuse_addr};
$add_reader->($self);
}
if (bind $fh, $sockaddr) {
$self->{on_bind}->();
}
else {
die "Could not bind: $!";
}
};
if (ref $addr) {
my ($host, $port) = @{$addr};
_on_addr($self, $fh, $host, $port, $bind_to);
}
else {
eval { $bind_to->(Socket::sockaddr_family($addr), Socket::SOCK_DGRAM, 0, $addr); 1 }
or $self->_error(1, $@);
}
return;
}
sub connect_to {
my ($self, $addr) = @_;
return $self->_connect_to($self->{fh}, $addr);
}
lib/AnyEvent/Handle/UDP.pm view on Meta::CPAN
AnyEvent::Handle::UDP - client/server UDP handles for AnyEvent
=head1 VERSION
version 0.050
=head1 SYNOPSIS
my $echo_server = AnyEvent::Handle::UDP->new(
bind => ['0.0.0.0', 4000],
on_recv => sub {
my ($data, $ae_handle, $client_addr) = @_;
$ae_handle->push_send($data, $client_addr);
},
);
=head1 DESCRIPTION
This module is an abstraction around UDP sockets for use with AnyEvent.
=head1 ATTRIBUTES
=head2 on_recv
The callback for when a package arrives. It takes three arguments: the datagram, the handle and the address the datagram was received from.
=head2 on_error
The callback for when an error occurs. It takes three arguments: the handle, a boolean indicating the error is fatal or not, and the error message.
=head2 on_bind
The callback for when the bind has been performed (this may be after object construction if address lookup is involved).
=head2 on_connect
The callback for when the connect has been performed (this may be after object construction if address lookup is involved).
=head2 on_drain
This sets the callback that is called when the send buffer becomes empty. The callback takes the handle as its only argument.
=head2 autoflush
lib/AnyEvent/Handle/UDP.pm view on Meta::CPAN
=head2 receive_size
The buffer size for the receiving in bytes. It defaults to 1500, which is slightly more than the MTA on ethernet.
=head2 family
Sets the socket family. The default is C<0>, which means either IPv4 or IPv6. The values C<4> and C<6> mean IPv4 and IPv6 respectively.
=head2 fh
The underlying filehandle. Note that this doesn't cooperate with the C<connect> and C<bind> parameters.
=head2 reuse_addr
If true will enable quick reuse of the bound address
=head2 timeout
=head2 rtimeout
=head2 wtimeout
lib/AnyEvent/Handle/UDP.pm view on Meta::CPAN
=head2 new
Create a new UDP handle. As arguments it accepts any attribute, as well as these two:
=over 4
=item * connect
Set the address to which datagrams are sent by default, and the only address from which datagrams are received. It must be either a packed sockaddr struct or an arrayref containing a hostname and a portnumber.
=item * bind
The address to bind the socket to. It must be either a packed sockaddr struct or an arrayref containing a hostname and a portnumber.
=back
All except C<on_recv> are optional, though using either C<connect> or C<bind> (or both) is strongly recommended unless you give it a connected/bound C<fh>.
=head2 bind_to($address)
Bind to the specified addres. Note that a bound socket may be rebound to another address. C<$address> must be in the same form as the bind argument to new.
=head2 connect_to($address)
Connect to the specified address. Note that a connected socket may be reconnected to another address. C<$address> must be in the same form as the connect argument to new.
=head2 push_send($message, $to = undef, $cv = AnyEvent::CondVar->new)
Try to send a message. If a socket is not connected a receptient address must also be given. If it is connected giving a receptient may not work as expected, depending on your platform. It returns C<$cv>, which will become true when C<$message> is se...
=head2 timeout_reset
t/10-basics.t view on Meta::CPAN
use warnings FATAL => 'all';
use Test::More tests => 3;
use AnyEvent::Handle::UDP;
use Socket qw/unpack_sockaddr_in/;
use IO::Socket::INET;
alarm 3;
{
my $cb = AE::cv;
my $server = AnyEvent::Handle::UDP->new(bind => [ localhost => 0 ], on_recv => $cb);
my $port = (unpack_sockaddr_in($server->sockname))[0];
my $client = IO::Socket::INET->new(PeerHost => 'localhost', PeerPort => $port, Proto => 'udp');
send $client, "Hello", 0;
is($cb->recv, "Hello", 'received "Hello"');
}
{
my $cb = AE::cv;
my $server = AnyEvent::Handle::UDP->new(bind => [ localhost => 0 ], on_recv => sub {
my ($message, $handle, $client_addr) = @_;
is($message, "Hello", "received \"Hello\"");
$handle->push_send("World", $client_addr);
});
my $client = AnyEvent::Handle::UDP->new(connect => $server->sockname, on_recv => $cb);
$client->push_send("Hello");
is($cb->recv, "World", 'received "World"');
}
t/20-timeout.t view on Meta::CPAN
use AnyEvent::Handle::UDP;
use IO::Socket::INET;
alarm 12;
{
my $cb = AE::cv;
my $cb2 = AE::cv;
my $server = AnyEvent::Handle::UDP->new(
bind => [ localhost => 0 ],
on_recv => $cb,
timeout => 3, on_timeout => sub { $cb->croak("Timeout") },
rtimeout => 4.5, on_rtimeout => sub { $cb2->croak("Read Timeout") }
);
my $start_time = AE::now;
like(exception { $cb->recv }, qr/Timeout/, 'Receive throws a timeout');
cmp_ok AE::now, '>=', $start_time + 3, 'Three seconds have passed';
like(exception { $cb2->recv }, qr/Read Timeout/, 'Receive throws a timeout again');
cmp_ok AE::now, '>=', $start_time + 4.5, '1.5 more seconds have passed';
$server->timeout_reset;
( run in 1.036 second using v1.01-cache-2.11-cpan-2398b32b56e )