AnyEvent-Handle-UDP

 view release on metacpan or  search on metacpan

lib/AnyEvent/Handle/UDP.pm  view on Meta::CPAN

}

for my $name (qw/on_recv on_error on_timeout on_rtimeout on_wtimeout autoflush receive_size/) {
	_insert($name, sub {
		my $self = shift;
		$self->{$name} = shift if @_;
		return $self->{$name};
	});
}

for my $name (qw/sockname peername/) {
	_insert($name, sub {
		my $self = shift;
		return $self->{fh}->$name;
	});
}

sub fh {
	my $self = shift;
	return $self->{fh};
}

sub on_drain {
	my $self = shift;
	if (@_) {
		$self->{on_drain} = shift;
		$self->_drained if not @{ $self->{buffers} };
	}
	return $self->{on_drain};
}

sub _drained {
	my $self = shift;
	$self->{on_drain}->($self) if defined $self->{on_drain};
}

for my $dir ('', 'r', 'w') {
	my $timeout = "${dir}timeout";
	my $activity = "${dir}activity";
	my $on_timeout = "on_$timeout";
	my $timer = "${dir}timer";
	my $clear_timeout = "clear_$timeout";
	my $timeout_reset = "${timeout}_reset";

	my $callback;
	$callback = sub {
		my $self = shift;
		if (not exists $self->{$timeout} or not $self->{fh}) {
			delete $self->{$timer};
			return;
		}
		my $now = AE::now;
		my $after = $self->{$activity} + $self->{$timeout} - $now;
		if ($after <= 0) {
			$self->{$activity} = $now;
			my $time = $self->{$on_timeout};
			my $error = do { local $! = Errno::ETIMEDOUT; "$!" };
			$time ? $time->($self) : $self->_error(0, $error);
			return if not exists $self->{$timeout};
		}
		Scalar::Util::weaken($self);
		return if not $self;
		$self->{$timer} = AE::timer($after, 0, sub {
			delete $self->{$timer};
			$callback->($self);
		});
	};

	_insert($timeout, sub {
		my $self = shift;
		if (@_) {
			my $value = shift;
			$self->{$timeout} = $value;
			if ($value == 0) {
				delete $self->{$timer};
				delete $self->{$timeout};
				return;
			}
			else {
				$callback->($self);
			}
		}
		return $self->{$timeout};
	});

	_insert($clear_timeout, sub {
		my $self = shift;
		delete $self->{$timeout};
		return;
	});

	_insert($timeout_reset, sub {
		my $self = shift;
		$self->{$activity} = AE::now;
	});
}

sub bind_to {
	my ($self, $addr) = @_;
	return $self->_bind_to($self->{fh}, $addr);
}

my $add_reader = sub {
	my $self = shift;
	$self->{reader} = AE::io($self->{fh}, 0, sub {
		while (exists $self->{reader} and defined (my $addr = recv $self->{fh}, my ($buffer), $self->{receive_size}, 0)) {
			$self->timeout_reset;
			$self->rtimeout_reset;
			$self->{on_recv}->($buffer, $self, $addr);
		}
		$self->_error(1, "Couldn't recv: $!") if not $non_fatal{$! + 0};
		return;
	});
};

sub _bind_to {
	my ($self, $fh, $addr) = @_;
	my $bind_to = sub {
		my ($domain, $type, $proto, $sockaddr) = @_;
		if (!Scalar::Util::openhandle($fh)) {
			socket $fh, $domain, $type, $proto or die "Could not create socket: $!";



( run in 2.603 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )