AnyEvent-Connection
view release on metacpan or search on metacpan
lib/AnyEvent/Connection.pm view on Meta::CPAN
5678910111213141516171819202122232425use
warnings;
};
use
Object::Event 1.21;
use
AnyEvent 5;
use
AnyEvent::Socket;
use
Carp;
# @rewrite s/^# //; # Development hacks, see L<Devel::Rewrite>
# use Devel::Leak::Cb;
=head1 NAME
AnyEvent::Connection - Base class for tcp connectful clients
=cut
lib/AnyEvent/Connection.pm view on Meta::CPAN
229230231232233234235236237238239240241242243244245246247248249#sub connected {
# warn "Connected";
# shift->event(connected => ());
#}
sub
connect
{
my
$self
=
shift
;
$self
->{connecting} and
return
;
$self
->{connecting} = 1;
weaken
$self
;
croak
"Only client can connect but have $self->{type}"
if
$self
->{type} and
$self
->{type} ne
'client'
;
$self
->{type} =
'client'
;
warn
"Connecting to $self->{host}:$self->{port}..."
if
$self
->{debug};
# @rewrite s/sub {/cb connect {/;
$self
->{_}{con}{cb} =
sub
{
pop
;
delete
$self
->{_}{con};
if
(
my
$fh
=
shift
) {
warn
"Connected @_"
if
$self
->{debug};
lib/AnyEvent/Connection.pm view on Meta::CPAN
275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
$self
->{_}{con}{cb},
$self
->{_}{con}{pre}
;
}
sub
accept
{
croak
"Not implemented yet"
;
}
sub
_reconnect_after {
weaken(
my
$self
=
shift
);
$self
->{reconnect} or
return
$self
->{connecting} = 0;
$self
->{timers}{reconnect} = AnyEvent->timer(
after
=>
$self
->{reconnect},
cb
=>
sub
{
$self
or
return
;
delete
$self
->{timers}{reconnect};
$self
->{connecting} = 0;
$self
->
connect
;
}
);
}
sub
periodic_stop;
sub
periodic {
weaken(
my
$self
=
shift
);
my
$interval
=
shift
;
my
$cb
=
shift
;
#warn "Create periodic $interval";
$self
->{timers}{
int
$cb
} = AnyEvent->timer(
after
=>
$interval
,
interval
=>
$interval
,
cb
=>
sub
{
local
*periodic_stop
=
sub
{
warn
"Stopping periodic "
.
int
$cb
;
delete
$self
->{timers}{
int
$cb
};
undef
$cb
lib/AnyEvent/Connection.pm view on Meta::CPAN
314315316317318319320321322323324325326327328329330331332333334
},
);
defined
wantarray
and
return
AnyEvent::Util::guard(
sub
{
delete
$self
->{timers}{
int
$cb
};
undef
$cb
;
});
return
;
}
sub
after
{
weaken(
my
$self
=
shift
);
my
$interval
=
shift
;
my
$cb
=
shift
;
#warn "Create after $interval";
$self
->{timers}{
int
$cb
} = AnyEvent->timer(
after
=>
$interval
,
cb
=>
sub
{
$self
or
return
;
delete
$self
->{timers}{
int
$cb
};
$cb
->();
undef
$cb
;
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
2345678910111213141516171819202122232425262728293031323334353637383940414243
AnyEvent::Connection::Raw;
use
common::sense 2;m{
use
strict;
use
warnings;
};
use
Object::Event 1.21;
use
AnyEvent::Handle;
use
Carp;
# @rewrite s/^# //;
# use Devel::Leak::Cb;
sub
_call_waiting {
my
$me
=
shift
;
for
my
$k
(
keys
%{
$me
->{waitingcb} }) {
warn
"call waiting $k with @_"
if
$me
->{debug};
if
(
$me
->{waitingcb}{
$k
}) {
$me
->{waitingcb}{
$k
}->(
undef
,
@_
);
}
delete
$me
->{waitingcb}{
$k
};
}
}
sub
new {
my
$pkg
=
shift
;
my
$self
=
$pkg
->SUPER::new(
@_
);
$self
->{nl} =
"\015\012"
unless
defined
$self
->{nl};
$self
->{debug} = 0
unless
defined
$self
->{debug};
weaken(
my
$me
=
$self
);
# @rewrite s/sub /cb 'conn.cb.eof' /;
$self
->{cb}{
eof
} =
sub
{
$me
or
return
;
#local *__ANON__ = 'conn.cb.eof';
warn
"[\U$me->{side}\E] Eof on handle"
;
delete
$me
->{h};
$me
->event(
'disconnect'
);
$me
->_call_waiting(
"EOF from handle"
);
} ;
# @rewrite s/sub /cb 'conn.cb.err' /;
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
warn
">> @_ "
if
$self
->{debug};
return
;
}
*reply
= \
&say
;
sub
recv
{
my
(
$self
,
$bytes
,
%args
) =
@_
;
$args
{cb} or croak
"no cb for recv at @{[ (caller)[1,2] ]}"
;
$self
->{h} or
return
$args
{cb}(
undef
,
"Not connected"
);
warn
"<+ read $bytes "
if
$self
->{debug};
weaken(
$self
->{waitingcb}{
int
$args
{cb}} =
$args
{cb} );
$self
->{h}->unshift_read(
chunk
=>
$bytes
,
sub
{
#local *__ANON__ = 'conn.recv.read';
# Also eat CRLF or LF from read buffer
substr
(
$self
->{h}{rbuf}, 0, 1 ) =
''
if
substr
(
$self
->{h}{rbuf}, 0, 1 ) eq
"\015"
;
substr
(
$self
->{h}{rbuf}, 0, 1 ) =
''
if
substr
(
$self
->{h}{rbuf}, 0, 1 ) eq
"\012"
;
delete
$self
->{waitingcb}{
int
$args
{cb}};
shift
; (
delete
$args
{cb})->(
@_
);
%args
= ();
} );
}
sub
command {
my
$self
=
shift
;
my
$write
=
shift
;
if
(utf8::is_utf8(
$write
)) {
utf8::encode
$write
;
}
my
%args
=
@_
;
$args
{cb} or croak
"no cb for command at @{[ (caller)[1,2] ]}"
;
$self
->{h} or
return
$args
{cb}(
undef
,
"Not connected"
),
%args
= ();
weaken(
$self
->{waitingcb}{
int
$args
{cb}} =
$args
{cb} );
#my $i if 0;
#my $c = ++$i;
warn
">> $write "
if
$self
->{debug};
$self
->{h}->push_write(
"$write$self->{nl}"
);
#$self->{h}->timeout( $self->{select_timeout} );
warn
"<? read "
if
$self
->{debug};
# @rewrite s/sub {/cb 'conn.command.read' {/;
$self
->{h}->push_read(
regex
=>
qr<\015?\012>
,
sub
{
#local *__ANON__ = 'conn.command.read';
( run in 0.284 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )