AnyEvent-Connection
view release on metacpan or search on metacpan
ex/example.pl view on Meta::CPAN
package My::Client;
use base 'AnyEvent::Connection';
package main;
my $cl = My::Client->new(
host => '127.0.0.1',
port => 7,
reconnect => 1,
debug => 0,
timeout => 1,
);
my $cv = AnyEvent->condvar;
my $fails = 0;
$cl->reg_cb(
connected => sub {
my ($cl,$con,$host,$port) = @_;
warn "Connected $host:$port";
$cl->disconnect('requested');
},
lib/AnyEvent/Connection.pm view on Meta::CPAN
=cut
sub new {
my $self = shift->SUPER::new(@_);
$self->init(@_);
return $self;
}
sub init {
my $self = shift;
$self->{debug} ||= 0;
$self->{connected} = 0;
$self->{connecting} = 0;
$self->{reconnect} = 1 unless defined $self->{reconnect};
$self->{timeout} ||= 3;
$self->{timers} = {};
$self->{rawcon} ||= 'AnyEvent::Connection::Raw';
#warn "Init $self";
}
#sub connected {
lib/AnyEvent/Connection.pm view on Meta::CPAN
#}
sub connect {
my $self = shift;
$self->{connecting} and return;
$self->{connecting} = 1;
weaken $self;
croak "Only client can connect but have $self->{type}" if $self->{type} and $self->{type} ne 'client';
$self->{type} = 'client';
warn "Connecting to $self->{host}:$self->{port}..." if $self->{debug};
# @rewrite s/sub {/cb connect {/;
$self->{_}{con}{cb} = sub {
pop;
delete $self->{_}{con};
if (my $fh = shift) {
warn "Connected @_" if $self->{debug};
$self->{con} = $self->{rawcon}->new(
fh => $fh,
timeout => $self->{timeout},
debug => $self->{debug},
);
$self->{con}->reg_cb(
disconnect => sub {
warn "Disconnected $self->{host}:$self->{port} @_" if $self->{debug};
$self->disconnect(@_);
$self->_reconnect_after();
},
);
$self->{connected} = 1;
#warn "Send connected event";
$self->event(connected => $self->{con}, @_);
} else {
warn "Not connected $self->{host}:$self->{port}: $!" if $self->{debug};
$self->event(connfail => "$!");
$self->_reconnect_after();
}
};
$self->{_}{con}{pre} = sub { $self->{timeout} };
$self->{_}{con}{grd} =
AnyEvent::Socket::tcp_connect
$self->{host}, $self->{port},
$self->{_}{con}{cb}, $self->{_}{con}{pre}
;
lib/AnyEvent/Connection.pm view on Meta::CPAN
sub AnyEvent::Connection::destroyed::AUTOLOAD {}
sub destroy {
my ($self) = @_;
$self->DESTROY;
bless $self, "AnyEvent::Connection::destroyed";
}
sub DESTROY {
my $self = shift;
warn "(".int($self).") Destroying AE::CNN" if $self->{debug};
$self->disconnect;
%$self = ();
}
BEGIN {
no strict 'refs';
for my $m (qw(push_write push_read unshift_read say reply recv command want_command)) {
*$m = sub {
my $self = shift;
$self->{connected} or return $self->event( error => "Not connected for $m" );
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
use AnyEvent::Handle;
use AnyEvent::Connection::Util;
use Scalar::Util qw(weaken);
use Carp;
# @rewrite s/^# //;
# use Devel::Leak::Cb;
sub _call_waiting {
my $me = shift;
for my $k (keys %{ $me->{waitingcb} }) {
warn "call waiting $k with @_" if $me->{debug};
if ($me->{waitingcb}{$k}) {
$me->{waitingcb}{$k}->(undef, @_);
}
delete $me->{waitingcb}{$k};
}
}
sub new {
my $pkg = shift;
my $self = $pkg->SUPER::new(@_);
$self->{nl} = "\015\012" unless defined $self->{nl};
$self->{debug} = 0 unless defined $self->{debug};
weaken(my $me = $self);
# @rewrite s/sub /cb 'conn.cb.eof' /;
$self->{cb}{eof} = sub {
$me or return;
#local *__ANON__ = 'conn.cb.eof';
warn "[\U$me->{side}\E] Eof on handle";
delete $me->{h};
$me->event('disconnect');
$me->_call_waiting("EOF from handle");
} ;
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
sub destroy {
my ($self) = @_;
$self->DESTROY;
bless $self, "AnyEvent::Connection::Raw::destroyed";
}
*close = \&destroy;
sub AnyEvent::Connection::Raw::destroyed::AUTOLOAD {}
sub DESTROY {
my $self = shift;
warn "(".int($self).") Destroying AE::CNN::Raw" if $self->{debug};
delete $self->{fh};
$self->_call_waiting("destroying connection");
$self->{h} and $self->{h}->destroy;
delete $self->{h};
%$self = ();
return;
}
sub push_write {
my $self = shift;
$self->{h} or return;
for (@_) {
if (!ref and utf8::is_utf8($_)) {
$_ = $_;
utf8::encode $_;
}
}
$self->{h}->push_write(@_);
warn ">> @_ " if $self->{debug};
}
sub push_read {
my $self = shift;
my $cb = pop;
$self->{h} or return;
$self->{h}->timeout($self->{timeout}) if $self->{timeout};
$self->{h}->push_read(@_,sub {
shift->timeout(undef); # disable timeout and remove handle from @_
$cb->($self,@_);
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
sub say {
my $self = shift;
$self->{h} or return;
for (@_) {
if (!ref and utf8::is_utf8($_)) {
$_ = $_;
utf8::encode $_;
}
}
$self->{h}->push_write("@_$self->{nl}");
warn ">> @_ " if $self->{debug};
return;
}
*reply = \&say;
sub recv {
my ($self,$bytes,%args) = @_;
$args{cb} or croak "no cb for recv at @{[ (caller)[1,2] ]}";
$self->{h} or return $args{cb}(undef,"Not connected");
warn "<+ read $bytes " if $self->{debug};
weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
$self->{h}->unshift_read( chunk => $bytes, sub {
#local *__ANON__ = 'conn.recv.read';
# Also eat CRLF or LF from read buffer
substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\015";
substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\012";
delete $self->{waitingcb}{int $args{cb}};
shift; (delete $args{cb})->(@_);
%args = ();
} );
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
if (utf8::is_utf8($write)) {
utf8::encode $write;
}
my %args = @_;
$args{cb} or croak "no cb for command at @{[ (caller)[1,2] ]}";
$self->{h} or return $args{cb}(undef,"Not connected"),%args = ();
weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
#my $i if 0;
#my $c = ++$i;
warn ">> $write " if $self->{debug};
$self->{h}->push_write("$write$self->{nl}");
#$self->{h}->timeout( $self->{select_timeout} );
warn "<? read " if $self->{debug};
# @rewrite s/sub {/cb 'conn.command.read' {/;
$self->{h}->push_read( regex => qr<\015?\012>, sub {
#local *__ANON__ = 'conn.command.read';
shift;
for (@_) {
chomp;
substr($_,-1,1) = '' if substr($_, -1,1) eq "\015";
}
warn "<< @_ " if $self->{debug};
delete $self->{waitingcb}{int $args{cb}};
delete($args{cb})->(@_);
%args = ();
undef $self;
} );
#sub {
#$self->{state}{handle}->timeout( 0 ) if $self->_qsize < 1;
#diag "<< $c. $write: $_[1] (".$self->_qsize."), timeout ".($self->{state}{handle}->timeout ? 'enabled' : 'disabled');
#$cb->(@_);
#});
t/01-test.t view on Meta::CPAN
};
$cv->recv;
},
client => sub {
my $port = shift;
diag "$$: cl $port";
my $cl = Echo::Client->new(
host => '127.0.0.1',
port => $port,
reconnect => 0.1,
debug => 0,
);
my $action = 0;
$cl->reg_cb(
connected => sub {
isa_ok $_[0], 'AnyEvent::Connection', 'connected client';
isa_ok $_[1], 'AnyEvent::Connection::Raw', 'connected connection';
is $_[2], '127.0.0.1', 'connected host';
is $_[3], $port, 'connected port';
if($action == 0) {
shift->reconnect();
xt/01-test.t view on Meta::CPAN
};
$cv->recv;
},
client => sub {
my $port = shift;
diag "$$: cl $port";
my $cl = Echo::Client->new(
host => '127.0.0.1',
port => $port,
reconnect => 0.1,
debug => 0,
);
my $action = 0;
$cl->reg_cb(
connected => sub {
isa_ok $_[0], 'AnyEvent::Connection', 'connected client';
isa_ok $_[1], 'AnyEvent::Connection::Raw', 'connected connection';
is $_[2], '127.0.0.1', 'connected host';
is $_[3], $port, 'connected port';
if($action == 0) {
shift->reconnect();
( run in 0.517 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )