AnyEvent-Handle-UDP

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

Revision history for AnyEvent-Handle-UDP

0.050     2020-03-26 23:05:21+01:00 Europe/Amsterdam
          - Add on_bind and on_connect callbacks
          - Allow injecting the socket into the handle
          - Allow destroy to be called in on_recv callback
          - Fix error handling in address lookup

0.049     2018-05-16 23:53:00+02:00 Europe/Amsterdam
          - Remove namespace::clean dependency

0.048     2017-01-12 22:32:02+01:00 Europe/Amsterdam
          - Fixed timeout handling in destroy

Changes  view on Meta::CPAN

          Fix {,r,w}timeout_reset to actually work
          Made it work with old perl+old Socket.pm
          Switch from Test::Exception to Test::Fatal

0.037     2012-11-15 00:29:31 Europe/Amsterdam
          Handle absence of IPv6 better (hopefully)

0.036     2012-06-06 19:23:18 Europe/Amsterdam
          Make fh an IO::Socket object
          Don't insist on port number in tests
          Refactor bind/connect logic

0.035     2012-05-02 23:15:17 Europe/Amsterdam
          Make dependency on Sub::Name optional
          Stop depending on Const::Fast

0.034     2012-04-30 23:42:57 Europe/Amsterdam
          Make push_send accept an address array too
          Added support for timeouts
          Make sure buffers is predefined

examples/anyevent-udp-server.pl  view on Meta::CPAN

use strict;
use warnings;

use AnyEvent::Handle::UDP;
use AnyEvent::Log ();

# Default for AnyEvent is to log nothing
$AnyEvent::Log::FILTER->level('debug');

# AE::Handle::UDP does all for us:
# be sure to use the "bind" option!
my $udp_server = AnyEvent::Handle::UDP->new(
    # Bind to this host and port
    bind => ['0.0.0.0', 4000],

    # AnyEvent will run this callback when getting some input
    on_recv => sub {
        my ($data, $ae_handle, $client_addr) = @_;
        chomp $data;
        AE::log warn => "Received '$data' (handle: $ae_handle)";
        # Send back the command echoed to the client who contacted us 
        $ae_handle->push_send("echo [$data]\n", $client_addr);
    }
);

lib/AnyEvent/Handle/UDP.pm  view on Meta::CPAN

my %non_fatal = map { ( $_ => 1 ) } Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR;

sub new {
	my ($class, %args) = @_;
	my $self = bless {
		on_recv      => $args{on_recv} || Carp::croak('on_recv not given'),
		reuse_addr   => exists $args{reuse_addr} ? !!$args{reuse_addr} : 1,
		receive_size => $args{receive_size} || 1500,
		family       => $args{family}       || 0,
		autoflush    => $args{autoflush}    || 0,
		on_bind      => $args{on_bind}      || sub {},
		on_connect   => $args{on_connect}   || sub {},
		fh           => $args{fh}           || bless(Symbol::gensym(), 'IO::Socket'),
		buffers      => [],
	}, $class;
	$self->{$_} = $args{$_} for grep { exists $args{$_} } qw/on_drain on_error on_timeout on_rtimeout on_wtimeout/;
	$self->{$_} = AE::now() for qw/activity ractivity wactivity/;

	$self->_bind_to($self->{fh}, $args{bind}) if exists $args{bind};
	$self->_connect_to($self->{fh}, $args{connect}) if exists $args{connect};

	$self->$_($args{$_}) for grep { exists $args{$_} } qw/timeout rtimeout wtimeout/;

	$self->_drained;
	return $self;
}

sub _insert {
	my ($name, $sub) = @_;

lib/AnyEvent/Handle/UDP.pm  view on Meta::CPAN

		delete $self->{$timeout};
		return;
	});

	_insert($timeout_reset, sub {
		my $self = shift;
		$self->{$activity} = AE::now;
	});
}

sub bind_to {
	my ($self, $addr) = @_;
	return $self->_bind_to($self->{fh}, $addr);
}

my $add_reader = sub {
	my $self = shift;
	$self->{reader} = AE::io($self->{fh}, 0, sub {
		while (exists $self->{reader} and defined (my $addr = recv $self->{fh}, my ($buffer), $self->{receive_size}, 0)) {
			$self->timeout_reset;
			$self->rtimeout_reset;
			$self->{on_recv}->($buffer, $self, $addr);
		}
		$self->_error(1, "Couldn't recv: $!") if not $non_fatal{$! + 0};
		return;
	});
};

sub _bind_to {
	my ($self, $fh, $addr) = @_;
	my $bind_to = sub {
		my ($domain, $type, $proto, $sockaddr) = @_;
		if (!Scalar::Util::openhandle($fh)) {
			socket $fh, $domain, $type, $proto or die "Could not create socket: $!";
			AnyEvent::Util::fh_nonblocking $fh, 1;
			setsockopt $fh, Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 or die "Couldn't set so_reuseaddr: $!" if $self->{reuse_addr};
			$add_reader->($self);
		}
		if (bind $fh, $sockaddr) {
			$self->{on_bind}->();
		}
		else {
			die "Could not bind: $!";
		}
	};
	if (ref $addr) {
		my ($host, $port) = @{$addr};
		_on_addr($self, $fh, $host, $port, $bind_to);
	}
	else {
		eval { $bind_to->(Socket::sockaddr_family($addr), Socket::SOCK_DGRAM, 0, $addr); 1 }
			or $self->_error(1, $@);
	}
	return;
}

sub connect_to {
	my ($self, $addr) = @_;
	return $self->_connect_to($self->{fh}, $addr);
}

lib/AnyEvent/Handle/UDP.pm  view on Meta::CPAN


AnyEvent::Handle::UDP - client/server UDP handles for AnyEvent

=head1 VERSION

version 0.050

=head1 SYNOPSIS

 my $echo_server = AnyEvent::Handle::UDP->new(
     bind => ['0.0.0.0', 4000],
     on_recv => sub {
         my ($data, $ae_handle, $client_addr) = @_;
         $ae_handle->push_send($data, $client_addr);
     },
 );

=head1 DESCRIPTION

This module is an abstraction around UDP sockets for use with AnyEvent.

=head1 ATTRIBUTES

=head2 on_recv

The callback for when a package arrives. It takes three arguments: the datagram, the handle and the address the datagram was received from.

=head2 on_error

The callback for when an error occurs. It takes three arguments: the handle, a boolean indicating the error is fatal or not, and the error message.

=head2 on_bind

The callback for when the bind has been performed (this may be after object construction if address lookup is involved).

=head2 on_connect

The callback for when the connect has been performed (this may be after object construction if address lookup is involved).

=head2 on_drain

This sets the callback that is called when the send buffer becomes empty. The callback takes the handle as its only argument.

=head2 autoflush

lib/AnyEvent/Handle/UDP.pm  view on Meta::CPAN

=head2 receive_size

The buffer size for the receiving in bytes. It defaults to 1500, which is slightly more than the MTA on ethernet.

=head2 family

Sets the socket family. The default is C<0>, which means either IPv4 or IPv6. The values C<4> and C<6> mean IPv4 and IPv6 respectively.

=head2 fh

The underlying filehandle. Note that this doesn't cooperate with the C<connect> and C<bind> parameters.

=head2 reuse_addr

If true will enable quick reuse of the bound address

=head2 timeout

=head2 rtimeout

=head2 wtimeout

lib/AnyEvent/Handle/UDP.pm  view on Meta::CPAN

=head2 new

Create a new UDP handle. As arguments it accepts any attribute, as well as these two:

=over 4

=item * connect

Set the address to which datagrams are sent by default, and the only address from which datagrams are received. It must be either a packed sockaddr struct or an arrayref containing a hostname and a portnumber.

=item * bind

The address to bind the socket to. It must be either a packed sockaddr struct or an arrayref containing a hostname and a portnumber.

=back

All except C<on_recv> are optional, though using either C<connect> or C<bind> (or both) is strongly recommended unless you give it a connected/bound C<fh>.

=head2 bind_to($address)

Bind to the specified addres. Note that a bound socket may be rebound to another address. C<$address> must be in the same form as the bind argument to new.

=head2 connect_to($address)

Connect to the specified address. Note that a connected socket may be reconnected to another address. C<$address> must be in the same form as the connect argument to new.

=head2 push_send($message, $to = undef, $cv = AnyEvent::CondVar->new)

Try to send a message. If a socket is not connected a receptient address must also be given. If it is connected giving a receptient may not work as expected, depending on your platform. It returns C<$cv>, which will become true when C<$message> is se...

=head2 timeout_reset

t/10-basics.t  view on Meta::CPAN

use warnings FATAL => 'all';
use Test::More tests => 3;
use AnyEvent::Handle::UDP;
use Socket qw/unpack_sockaddr_in/;
use IO::Socket::INET;

alarm 3;

{
	my $cb = AE::cv;
	my $server = AnyEvent::Handle::UDP->new(bind => [ localhost => 0 ], on_recv => $cb);
	my $port = (unpack_sockaddr_in($server->sockname))[0];
	my $client = IO::Socket::INET->new(PeerHost => 'localhost', PeerPort => $port, Proto => 'udp');
	send $client, "Hello", 0;
	is($cb->recv, "Hello", 'received "Hello"');
}

{
	my $cb = AE::cv;
	my $server = AnyEvent::Handle::UDP->new(bind => [ localhost => 0 ], on_recv => sub {
		my ($message, $handle, $client_addr) = @_;
		is($message, "Hello", "received \"Hello\"");
		$handle->push_send("World", $client_addr);
	});
	my $client = AnyEvent::Handle::UDP->new(connect => $server->sockname, on_recv => $cb);
	$client->push_send("Hello");
	is($cb->recv, "World", 'received "World"');
}

t/20-timeout.t  view on Meta::CPAN


use AnyEvent::Handle::UDP;
use IO::Socket::INET;

alarm 12;

{
	my $cb = AE::cv;
	my $cb2 = AE::cv;
	my $server = AnyEvent::Handle::UDP->new(
		bind => [ localhost => 0 ],
		on_recv => $cb, 
		timeout => 3,    on_timeout => sub { $cb->croak("Timeout") },
		rtimeout => 4.5, on_rtimeout => sub { $cb2->croak("Read Timeout") }
	);
	my $start_time = AE::now;
	like(exception { $cb->recv }, qr/Timeout/, 'Receive throws a timeout');
	cmp_ok AE::now, '>=', $start_time + 3, 'Three seconds have passed';
	like(exception { $cb2->recv }, qr/Read Timeout/, 'Receive throws a timeout again');
	cmp_ok AE::now, '>=', $start_time + 4.5, '1.5 more seconds have passed';
	$server->timeout_reset;



( run in 1.036 second using v1.01-cache-2.11-cpan-2398b32b56e )