AnyEvent-Connection

 view release on metacpan or  search on metacpan

ex/example.pl  view on Meta::CPAN

package My::Client;

use base 'AnyEvent::Connection';

package main;

my $cl = My::Client->new(
	host      => '127.0.0.1',
	port      => 7,
	reconnect => 1,
	debug     => 0,
	timeout   => 1,
);
my $cv = AnyEvent->condvar;
my $fails = 0;
$cl->reg_cb(
	connected => sub {
		my ($cl,$con,$host,$port) = @_;
		warn "Connected $host:$port";
		$cl->disconnect('requested');
	},

lib/AnyEvent/Connection.pm  view on Meta::CPAN

=cut

sub new {
	my $self = shift->SUPER::new(@_);
	$self->init(@_);
	return $self;
}

sub init {
	my $self = shift;
	$self->{debug}   ||= 0;
	$self->{connected} = 0;
	$self->{connecting} = 0;
	$self->{reconnect} = 1 unless defined $self->{reconnect};
	$self->{timeout} ||= 3;
	$self->{timers}    = {};
	$self->{rawcon}  ||= 'AnyEvent::Connection::Raw';
	#warn "Init $self";
}

#sub connected {

lib/AnyEvent/Connection.pm  view on Meta::CPAN

#}

sub connect {
	my $self = shift;
	$self->{connecting} and return;
	$self->{connecting} = 1;
	weaken $self;
	croak "Only client can connect but have $self->{type}" if $self->{type} and $self->{type} ne 'client';
	$self->{type} = 'client';
	
	warn "Connecting to $self->{host}:$self->{port}..." if $self->{debug};
	# @rewrite s/sub {/cb connect {/;
	$self->{_}{con}{cb} = sub {
		pop;
		delete $self->{_}{con};
			if (my $fh = shift) {
				warn "Connected @_" if $self->{debug};
				$self->{con} = $self->{rawcon}->new(
					fh      => $fh,
					timeout => $self->{timeout},
					debug   => $self->{debug},
				);
				$self->{con}->reg_cb(
					disconnect => sub {
						warn "Disconnected $self->{host}:$self->{port} @_" if $self->{debug};
						$self->disconnect(@_);
						$self->_reconnect_after();
					},
				);
				$self->{connected} = 1;
				#warn "Send connected event";
				$self->event(connected => $self->{con}, @_);
			} else {
				warn "Not connected $self->{host}:$self->{port}: $!" if $self->{debug};
				$self->event(connfail => "$!");
				$self->_reconnect_after();
			}
	};
	$self->{_}{con}{pre} = sub { $self->{timeout} };
	$self->{_}{con}{grd} =
		AnyEvent::Socket::tcp_connect
			$self->{host}, $self->{port},
			$self->{_}{con}{cb}, $self->{_}{con}{pre}
	;

lib/AnyEvent/Connection.pm  view on Meta::CPAN

sub AnyEvent::Connection::destroyed::AUTOLOAD {}

sub destroy {
	my ($self) = @_;
	$self->DESTROY;
	bless $self, "AnyEvent::Connection::destroyed";
}

sub DESTROY {
	my $self = shift;
	warn "(".int($self).") Destroying AE::CNN" if $self->{debug};
	$self->disconnect;
	%$self = ();
}

BEGIN {
	no strict 'refs';
	for my $m (qw(push_write push_read unshift_read say reply recv command want_command)) {
		*$m = sub {
			my $self = shift;
			$self->{connected} or return $self->event( error => "Not connected for $m" );

lib/AnyEvent/Connection/Raw.pm  view on Meta::CPAN

use AnyEvent::Handle;
use AnyEvent::Connection::Util;
use Scalar::Util qw(weaken);
use Carp;
# @rewrite s/^# //;
# use Devel::Leak::Cb;

sub _call_waiting {
	my $me = shift;
	for my $k (keys %{ $me->{waitingcb} }) {
		warn "call waiting $k with @_" if $me->{debug};
		if ($me->{waitingcb}{$k}) {
			$me->{waitingcb}{$k}->(undef, @_);
		}
		delete $me->{waitingcb}{$k};
	}
}

sub new {
	my $pkg = shift;
	my $self = $pkg->SUPER::new(@_);
	$self->{nl} = "\015\012" unless defined $self->{nl};
	$self->{debug} = 0 unless defined $self->{debug};
	weaken(my $me = $self);
	# @rewrite s/sub /cb 'conn.cb.eof' /;
	$self->{cb}{eof} = sub {
		$me or return;
		#local *__ANON__ = 'conn.cb.eof';
		warn "[\U$me->{side}\E] Eof on handle";
		delete $me->{h};
		$me->event('disconnect');
		$me->_call_waiting("EOF from handle");
	} ;

lib/AnyEvent/Connection/Raw.pm  view on Meta::CPAN


sub destroy {
	my ($self) = @_;
	$self->DESTROY;
	bless $self, "AnyEvent::Connection::Raw::destroyed";
}
*close = \&destroy;
sub AnyEvent::Connection::Raw::destroyed::AUTOLOAD {}
sub DESTROY {
	my $self = shift;
	warn "(".int($self).") Destroying AE::CNN::Raw" if $self->{debug};
	delete $self->{fh};
	$self->_call_waiting("destroying connection");
	$self->{h} and $self->{h}->destroy;
	delete $self->{h};
	%$self = ();
	return;
}

sub push_write {
	my $self = shift;
	$self->{h} or return;
	for (@_) {
		if (!ref and utf8::is_utf8($_)) {
			$_ = $_;
			utf8::encode $_;
		}
	}
	$self->{h}->push_write(@_);
	warn ">> @_  " if $self->{debug};
}

sub push_read {
	my $self = shift;
	my $cb = pop;
	$self->{h} or return;
	$self->{h}->timeout($self->{timeout}) if $self->{timeout};
	$self->{h}->push_read(@_,sub {
		shift->timeout(undef); # disable timeout and remove handle from @_
		$cb->($self,@_);

lib/AnyEvent/Connection/Raw.pm  view on Meta::CPAN

sub say {
	my $self = shift;
	$self->{h} or return;
	for (@_) {
		if (!ref and utf8::is_utf8($_)) {
			$_ = $_;
			utf8::encode $_;
		}
	}
	$self->{h}->push_write("@_$self->{nl}");
	warn ">> @_  " if $self->{debug};
	return;
}
*reply = \&say;

sub recv {
	my ($self,$bytes,%args) = @_;
	$args{cb}  or croak "no cb for recv at @{[ (caller)[1,2] ]}";
	$self->{h} or return $args{cb}(undef,"Not connected");
	warn "<+ read $bytes " if $self->{debug};
	weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
	$self->{h}->unshift_read( chunk => $bytes, sub {
		#local *__ANON__ = 'conn.recv.read';
		# Also eat CRLF or LF from read buffer
		substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\015";
		substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\012";
		delete $self->{waitingcb}{int $args{cb}};
		shift; (delete $args{cb})->(@_);
		%args = ();
	} );

lib/AnyEvent/Connection/Raw.pm  view on Meta::CPAN

	if (utf8::is_utf8($write)) {
		utf8::encode $write;
	}
	my %args = @_;
	$args{cb}  or croak "no cb for command at @{[ (caller)[1,2] ]}";
	$self->{h} or return $args{cb}(undef,"Not connected"),%args = ();
	weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
	
	#my $i if 0;
	#my $c = ++$i;
	warn ">> $write  " if $self->{debug};
	$self->{h}->push_write("$write$self->{nl}");
	#$self->{h}->timeout( $self->{select_timeout} );
	warn "<? read  " if $self->{debug};
	# @rewrite s/sub {/cb 'conn.command.read' {/;
	$self->{h}->push_read( regex => qr<\015?\012>, sub {
		#local *__ANON__ = 'conn.command.read';
		shift;
		for (@_) {
			chomp;
			substr($_,-1,1) = '' if substr($_, -1,1) eq "\015";
		}
		warn "<< @_  " if $self->{debug};
		delete $self->{waitingcb}{int $args{cb}};
		delete($args{cb})->(@_);
		%args = ();
		undef $self;
	} );
	#sub {
		#$self->{state}{handle}->timeout( 0 ) if $self->_qsize < 1;
		#diag "<< $c. $write: $_[1] (".$self->_qsize."), timeout ".($self->{state}{handle}->timeout ? 'enabled' : 'disabled');
		#$cb->(@_);
	#});

t/01-test.t  view on Meta::CPAN

		};
		$cv->recv;
	},
	client => sub {
		my $port = shift;
		diag "$$: cl $port";
		my $cl = Echo::Client->new(
			host  => '127.0.0.1',
			port  => $port,
			reconnect => 0.1,
			debug => 0,
		);
		my $action = 0;
		$cl->reg_cb(
			connected => sub {
				isa_ok $_[0], 'AnyEvent::Connection', 'connected client';
				isa_ok $_[1], 'AnyEvent::Connection::Raw', 'connected connection';
				is $_[2], '127.0.0.1', 'connected host';
				is $_[3], $port, 'connected port';
				if($action == 0) {
					shift->reconnect();

xt/01-test.t  view on Meta::CPAN

		};
		$cv->recv;
	},
	client => sub {
		my $port = shift;
		diag "$$: cl $port";
		my $cl = Echo::Client->new(
			host  => '127.0.0.1',
			port  => $port,
			reconnect => 0.1,
			debug => 0,
		);
		my $action = 0;
		$cl->reg_cb(
			connected => sub {
				isa_ok $_[0], 'AnyEvent::Connection', 'connected client';
				isa_ok $_[1], 'AnyEvent::Connection::Raw', 'connected connection';
				is $_[2], '127.0.0.1', 'connected host';
				is $_[3], $port, 'connected port';
				if($action == 0) {
					shift->reconnect();



( run in 0.517 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )