AnyEvent-Connection
view release on metacpan or search on metacpan
ex/example.pl view on Meta::CPAN
456789101112131415161718192021222324package
My::Client;
package
main;
my
$cl
= My::Client->new(
host
=>
'127.0.0.1'
,
port
=> 7,
reconnect
=> 1,
debug
=> 0,
timeout
=> 1,
);
my
$cv
= AnyEvent->condvar;
my
$fails
= 0;
$cl
->reg_cb(
connected
=>
sub
{
my
(
$cl
,
$con
,
$host
,
$port
) =
@_
;
warn
"Connected $host:$port"
;
$cl
->disconnect(
'requested'
);
},
lib/AnyEvent/Connection.pm view on Meta::CPAN
210211212213214215216217218219220221222223224225226227228229230=cut
sub new {
my $self = shift->SUPER::new(@_);
$self->init(@_);
return $self;
}
sub init {
my $self = shift;
$self->{debug} ||= 0;
$self->{connected} = 0;
$self->{connecting} = 0;
$self->{reconnect} = 1 unless defined $self->{reconnect};
$self->{timeout} ||= 3;
$self->{timers} = {};
$self->{rawcon} ||= 'AnyEvent::Connection::Raw';
#warn "Init $self";
}
#sub connected {
lib/AnyEvent/Connection.pm view on Meta::CPAN
233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276#}
sub
connect
{
my
$self
=
shift
;
$self
->{connecting} and
return
;
$self
->{connecting} = 1;
weaken
$self
;
croak
"Only client can connect but have $self->{type}"
if
$self
->{type} and
$self
->{type} ne
'client'
;
$self
->{type} =
'client'
;
warn
"Connecting to $self->{host}:$self->{port}..."
if
$self
->{debug};
# @rewrite s/sub {/cb connect {/;
$self
->{_}{con}{cb} =
sub
{
pop
;
delete
$self
->{_}{con};
if
(
my
$fh
=
shift
) {
warn
"Connected @_"
if
$self
->{debug};
$self
->{con} =
$self
->{rawcon}->new(
fh
=>
$fh
,
timeout
=>
$self
->{timeout},
debug
=>
$self
->{debug},
);
$self
->{con}->reg_cb(
disconnect
=>
sub
{
warn
"Disconnected $self->{host}:$self->{port} @_"
if
$self
->{debug};
$self
->disconnect(
@_
);
$self
->_reconnect_after();
},
);
$self
->{connected} = 1;
#warn "Send connected event";
$self
->event(
connected
=>
$self
->{con},
@_
);
}
else
{
warn
"Not connected $self->{host}:$self->{port}: $!"
if
$self
->{debug};
$self
->event(
connfail
=>
"$!"
);
$self
->_reconnect_after();
}
};
$self
->{_}{con}{pre} =
sub
{
$self
->{timeout} };
$self
->{_}{con}{grd} =
AnyEvent::Socket::tcp_connect
$self
->{host},
$self
->{port},
$self
->{_}{con}{cb},
$self
->{_}{con}{pre}
;
lib/AnyEvent/Connection.pm view on Meta::CPAN
367368369370371372373374375376377378379380381382383384385386387sub
AnyEvent::Connection::destroyed::AUTOLOAD {}
sub
destroy {
my
(
$self
) =
@_
;
$self
->DESTROY;
bless
$self
,
"AnyEvent::Connection::destroyed"
;
}
sub
DESTROY {
my
$self
=
shift
;
warn
"("
.
int
(
$self
).
") Destroying AE::CNN"
if
$self
->{debug};
$self
->disconnect;
%$self
= ();
}
BEGIN {
no
strict
'refs'
;
for
my
$m
(
qw(push_write push_read unshift_read say reply recv command want_command)
) {
*$m
=
sub
{
my
$self
=
shift
;
$self
->{connected} or
return
$self
->event(
error
=>
"Not connected for $m"
);
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
101112131415161718192021222324252627282930313233343536373839404142use
AnyEvent::Handle;
use
Carp;
# @rewrite s/^# //;
# use Devel::Leak::Cb;
sub
_call_waiting {
my
$me
=
shift
;
for
my
$k
(
keys
%{
$me
->{waitingcb} }) {
warn
"call waiting $k with @_"
if
$me
->{debug};
if
(
$me
->{waitingcb}{
$k
}) {
$me
->{waitingcb}{
$k
}->(
undef
,
@_
);
}
delete
$me
->{waitingcb}{
$k
};
}
}
sub
new {
my
$pkg
=
shift
;
my
$self
=
$pkg
->SUPER::new(
@_
);
$self
->{nl} =
"\015\012"
unless
defined
$self
->{nl};
$self
->{debug} = 0
unless
defined
$self
->{debug};
weaken(
my
$me
=
$self
);
# @rewrite s/sub /cb 'conn.cb.eof' /;
$self
->{cb}{
eof
} =
sub
{
$me
or
return
;
#local *__ANON__ = 'conn.cb.eof';
warn
"[\U$me->{side}\E] Eof on handle"
;
delete
$me
->{h};
$me
->event(
'disconnect'
);
$me
->_call_waiting(
"EOF from handle"
);
} ;
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
69707172737475767778798081828384858687888990919293949596979899100101102103104105106107108sub
destroy {
my
(
$self
) =
@_
;
$self
->DESTROY;
bless
$self
,
"AnyEvent::Connection::Raw::destroyed"
;
}
*close
= \
&destroy
;
sub
AnyEvent::Connection::Raw::destroyed::AUTOLOAD {}
sub
DESTROY {
my
$self
=
shift
;
warn
"("
.
int
(
$self
).
") Destroying AE::CNN::Raw"
if
$self
->{debug};
delete
$self
->{fh};
$self
->_call_waiting(
"destroying connection"
);
$self
->{h} and
$self
->{h}->destroy;
delete
$self
->{h};
%$self
= ();
return
;
}
sub
push_write {
my
$self
=
shift
;
$self
->{h} or
return
;
for
(
@_
) {
if
(!
ref
and utf8::is_utf8(
$_
)) {
$_
=
$_
;
utf8::encode
$_
;
}
}
$self
->{h}->push_write(
@_
);
warn
">> @_ "
if
$self
->{debug};
}
sub
push_read {
my
$self
=
shift
;
my
$cb
=
pop
;
$self
->{h} or
return
;
$self
->{h}->timeout(
$self
->{timeout})
if
$self
->{timeout};
$self
->{h}->push_read(
@_
,
sub
{
shift
->timeout(
undef
);
# disable timeout and remove handle from @_
$cb
->(
$self
,
@_
);
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148sub
say
{
my
$self
=
shift
;
$self
->{h} or
return
;
for
(
@_
) {
if
(!
ref
and utf8::is_utf8(
$_
)) {
$_
=
$_
;
utf8::encode
$_
;
}
}
$self
->{h}->push_write(
"@_$self->{nl}"
);
warn
">> @_ "
if
$self
->{debug};
return
;
}
*reply
= \
&say
;
sub
recv
{
my
(
$self
,
$bytes
,
%args
) =
@_
;
$args
{cb} or croak
"no cb for recv at @{[ (caller)[1,2] ]}"
;
$self
->{h} or
return
$args
{cb}(
undef
,
"Not connected"
);
warn
"<+ read $bytes "
if
$self
->{debug};
weaken(
$self
->{waitingcb}{
int
$args
{cb}} =
$args
{cb} );
$self
->{h}->unshift_read(
chunk
=>
$bytes
,
sub
{
#local *__ANON__ = 'conn.recv.read';
# Also eat CRLF or LF from read buffer
substr
(
$self
->{h}{rbuf}, 0, 1 ) =
''
if
substr
(
$self
->{h}{rbuf}, 0, 1 ) eq
"\015"
;
substr
(
$self
->{h}{rbuf}, 0, 1 ) =
''
if
substr
(
$self
->{h}{rbuf}, 0, 1 ) eq
"\012"
;
delete
$self
->{waitingcb}{
int
$args
{cb}};
shift
; (
delete
$args
{cb})->(
@_
);
%args
= ();
} );
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186if
(utf8::is_utf8(
$write
)) {
utf8::encode
$write
;
}
my
%args
=
@_
;
$args
{cb} or croak
"no cb for command at @{[ (caller)[1,2] ]}"
;
$self
->{h} or
return
$args
{cb}(
undef
,
"Not connected"
),
%args
= ();
weaken(
$self
->{waitingcb}{
int
$args
{cb}} =
$args
{cb} );
#my $i if 0;
#my $c = ++$i;
warn
">> $write "
if
$self
->{debug};
$self
->{h}->push_write(
"$write$self->{nl}"
);
#$self->{h}->timeout( $self->{select_timeout} );
warn
"<? read "
if
$self
->{debug};
# @rewrite s/sub {/cb 'conn.command.read' {/;
$self
->{h}->push_read(
regex
=>
qr<\015?\012>
,
sub
{
#local *__ANON__ = 'conn.command.read';
shift
;
for
(
@_
) {
chomp
;
substr
(
$_
,-1,1) =
''
if
substr
(
$_
, -1,1) eq
"\015"
;
}
warn
"<< @_ "
if
$self
->{debug};
delete
$self
->{waitingcb}{
int
$args
{cb}};
delete
(
$args
{cb})->(
@_
);
%args
= ();
undef
$self
;
} );
#sub {
#$self->{state}{handle}->timeout( 0 ) if $self->_qsize < 1;
#diag "<< $c. $write: $_[1] (".$self->_qsize."), timeout ".($self->{state}{handle}->timeout ? 'enabled' : 'disabled');
#$cb->(@_);
#});
t/01-test.t view on Meta::CPAN
505152535455565758596061626364656667686970
};
$cv
->
recv
;
},
client
=>
sub
{
my
$port
=
shift
;
diag
"$$: cl $port"
;
my
$cl
= Echo::Client->new(
host
=>
'127.0.0.1'
,
port
=>
$port
,
reconnect
=> 0.1,
debug
=> 0,
);
my
$action
= 0;
$cl
->reg_cb(
connected
=>
sub
{
isa_ok
$_
[0],
'AnyEvent::Connection'
,
'connected client'
;
isa_ok
$_
[1],
'AnyEvent::Connection::Raw'
,
'connected connection'
;
is
$_
[2],
'127.0.0.1'
,
'connected host'
;
is
$_
[3],
$port
,
'connected port'
;
if
(
$action
== 0) {
shift
->reconnect();
xt/01-test.t view on Meta::CPAN
555657585960616263646566676869707172737475
};
$cv
->
recv
;
},
client
=>
sub
{
my
$port
=
shift
;
diag
"$$: cl $port"
;
my
$cl
= Echo::Client->new(
host
=>
'127.0.0.1'
,
port
=>
$port
,
reconnect
=> 0.1,
debug
=> 0,
);
my
$action
= 0;
$cl
->reg_cb(
connected
=>
sub
{
isa_ok
$_
[0],
'AnyEvent::Connection'
,
'connected client'
;
isa_ok
$_
[1],
'AnyEvent::Connection::Raw'
,
'connected connection'
;
is
$_
[2],
'127.0.0.1'
,
'connected host'
;
is
$_
[3],
$port
,
'connected port'
;
if
(
$action
== 0) {
shift
->reconnect();
( run in 0.586 second using v1.01-cache-2.11-cpan-10033ea8487 )