AnyEvent-Connection

 view release on metacpan or  search on metacpan

lib/AnyEvent/Connection.pm  view on Meta::CPAN

use warnings;
};
use Object::Event 1.21;
use base 'Object::Event';

use AnyEvent 5;
use AnyEvent::Socket;

use Carp;

use Scalar::Util qw(weaken);
use AnyEvent::Connection::Raw;
use AnyEvent::Connection::Util;
# @rewrite s/^# //; # Development hacks, see L<Devel::Rewrite>
# use Devel::Leak::Cb;

=head1 NAME

AnyEvent::Connection - Base class for tcp connectful clients

=cut

lib/AnyEvent/Connection.pm  view on Meta::CPAN


#sub connected {
#	warn "Connected";
#	shift->event(connected => ());
#}

sub connect {
	my $self = shift;
	$self->{connecting} and return;
	$self->{connecting} = 1;
	weaken $self;
	croak "Only client can connect but have $self->{type}" if $self->{type} and $self->{type} ne 'client';
	$self->{type} = 'client';
	
	warn "Connecting to $self->{host}:$self->{port}..." if $self->{debug};
	# @rewrite s/sub {/cb connect {/;
	$self->{_}{con}{cb} = sub {
		pop;
		delete $self->{_}{con};
			if (my $fh = shift) {
				warn "Connected @_" if $self->{debug};

lib/AnyEvent/Connection.pm  view on Meta::CPAN

			$self->{_}{con}{cb}, $self->{_}{con}{pre}
	;
}

sub accept {
	croak "Not implemented yet";
}


sub _reconnect_after {
	weaken( my $self = shift );
	$self->{reconnect} or return $self->{connecting} = 0;
	$self->{timers}{reconnect} = AnyEvent->timer(
		after => $self->{reconnect},
		cb => sub {
			$self or return;
			delete $self->{timers}{reconnect};
			$self->{connecting} = 0;
			$self->connect;
		}
	);
}

sub periodic_stop;
sub periodic {
	weaken( my $self = shift );
	my $interval = shift;
	my $cb = shift;
	#warn "Create periodic $interval";
	$self->{timers}{int $cb} = AnyEvent->timer(
		after => $interval,
		interval => $interval,
		cb => sub {
			local *periodic_stop = sub {
				warn "Stopping periodic ".int $cb;
				delete $self->{timers}{int $cb}; undef $cb

lib/AnyEvent/Connection.pm  view on Meta::CPAN

		},
	);
	defined wantarray and return AnyEvent::Util::guard(sub {
		delete $self->{timers}{int $cb};
		undef $cb;
	});
	return;
}

sub after {
	weaken( my $self = shift );
	my $interval = shift;
	my $cb = shift;
	#warn "Create after $interval";
	$self->{timers}{int $cb} = AnyEvent->timer(
		after => $interval,
		cb => sub {
			$self or return;
			delete $self->{timers}{int $cb};
			$cb->();
			undef $cb;

lib/AnyEvent/Connection/Raw.pm  view on Meta::CPAN

	AnyEvent::Connection::Raw;

use common::sense 2;m{
use strict;
use warnings;
};
use Object::Event 1.21;
use base 'Object::Event';
use AnyEvent::Handle;
use AnyEvent::Connection::Util;
use Scalar::Util qw(weaken);
use Carp;
# @rewrite s/^# //;
# use Devel::Leak::Cb;

sub _call_waiting {
	my $me = shift;
	for my $k (keys %{ $me->{waitingcb} }) {
		warn "call waiting $k with @_" if $me->{debug};
		if ($me->{waitingcb}{$k}) {
			$me->{waitingcb}{$k}->(undef, @_);
		}
		delete $me->{waitingcb}{$k};
	}
}

sub new {
	my $pkg = shift;
	my $self = $pkg->SUPER::new(@_);
	$self->{nl} = "\015\012" unless defined $self->{nl};
	$self->{debug} = 0 unless defined $self->{debug};
	weaken(my $me = $self);
	# @rewrite s/sub /cb 'conn.cb.eof' /;
	$self->{cb}{eof} = sub {
		$me or return;
		#local *__ANON__ = 'conn.cb.eof';
		warn "[\U$me->{side}\E] Eof on handle";
		delete $me->{h};
		$me->event('disconnect');
		$me->_call_waiting("EOF from handle");
	} ;
	# @rewrite s/sub /cb 'conn.cb.err' /;

lib/AnyEvent/Connection/Raw.pm  view on Meta::CPAN

	warn ">> @_  " if $self->{debug};
	return;
}
*reply = \&say;

sub recv {
	my ($self,$bytes,%args) = @_;
	$args{cb}  or croak "no cb for recv at @{[ (caller)[1,2] ]}";
	$self->{h} or return $args{cb}(undef,"Not connected");
	warn "<+ read $bytes " if $self->{debug};
	weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
	$self->{h}->unshift_read( chunk => $bytes, sub {
		#local *__ANON__ = 'conn.recv.read';
		# Also eat CRLF or LF from read buffer
		substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\015";
		substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\012";
		delete $self->{waitingcb}{int $args{cb}};
		shift; (delete $args{cb})->(@_);
		%args = ();
	} );
}

sub command {
	my $self = shift;
	my $write = shift;
	if (utf8::is_utf8($write)) {
		utf8::encode $write;
	}
	my %args = @_;
	$args{cb}  or croak "no cb for command at @{[ (caller)[1,2] ]}";
	$self->{h} or return $args{cb}(undef,"Not connected"),%args = ();
	weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
	
	#my $i if 0;
	#my $c = ++$i;
	warn ">> $write  " if $self->{debug};
	$self->{h}->push_write("$write$self->{nl}");
	#$self->{h}->timeout( $self->{select_timeout} );
	warn "<? read  " if $self->{debug};
	# @rewrite s/sub {/cb 'conn.command.read' {/;
	$self->{h}->push_read( regex => qr<\015?\012>, sub {
		#local *__ANON__ = 'conn.command.read';



( run in 0.239 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )