AnyEvent-Connection
view release on metacpan or search on metacpan
lib/AnyEvent/Connection.pm view on Meta::CPAN
use warnings;
};
use Object::Event 1.21;
use base 'Object::Event';
use AnyEvent 5;
use AnyEvent::Socket;
use Carp;
use Scalar::Util qw(weaken);
use AnyEvent::Connection::Raw;
use AnyEvent::Connection::Util;
# @rewrite s/^# //; # Development hacks, see L<Devel::Rewrite>
# use Devel::Leak::Cb;
=head1 NAME
AnyEvent::Connection - Base class for tcp connectful clients
=cut
lib/AnyEvent/Connection.pm view on Meta::CPAN
#sub connected {
# warn "Connected";
# shift->event(connected => ());
#}
sub connect {
my $self = shift;
$self->{connecting} and return;
$self->{connecting} = 1;
weaken $self;
croak "Only client can connect but have $self->{type}" if $self->{type} and $self->{type} ne 'client';
$self->{type} = 'client';
warn "Connecting to $self->{host}:$self->{port}..." if $self->{debug};
# @rewrite s/sub {/cb connect {/;
$self->{_}{con}{cb} = sub {
pop;
delete $self->{_}{con};
if (my $fh = shift) {
warn "Connected @_" if $self->{debug};
lib/AnyEvent/Connection.pm view on Meta::CPAN
$self->{_}{con}{cb}, $self->{_}{con}{pre}
;
}
sub accept {
croak "Not implemented yet";
}
sub _reconnect_after {
weaken( my $self = shift );
$self->{reconnect} or return $self->{connecting} = 0;
$self->{timers}{reconnect} = AnyEvent->timer(
after => $self->{reconnect},
cb => sub {
$self or return;
delete $self->{timers}{reconnect};
$self->{connecting} = 0;
$self->connect;
}
);
}
sub periodic_stop;
sub periodic {
weaken( my $self = shift );
my $interval = shift;
my $cb = shift;
#warn "Create periodic $interval";
$self->{timers}{int $cb} = AnyEvent->timer(
after => $interval,
interval => $interval,
cb => sub {
local *periodic_stop = sub {
warn "Stopping periodic ".int $cb;
delete $self->{timers}{int $cb}; undef $cb
lib/AnyEvent/Connection.pm view on Meta::CPAN
},
);
defined wantarray and return AnyEvent::Util::guard(sub {
delete $self->{timers}{int $cb};
undef $cb;
});
return;
}
sub after {
weaken( my $self = shift );
my $interval = shift;
my $cb = shift;
#warn "Create after $interval";
$self->{timers}{int $cb} = AnyEvent->timer(
after => $interval,
cb => sub {
$self or return;
delete $self->{timers}{int $cb};
$cb->();
undef $cb;
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
AnyEvent::Connection::Raw;
use common::sense 2;m{
use strict;
use warnings;
};
use Object::Event 1.21;
use base 'Object::Event';
use AnyEvent::Handle;
use AnyEvent::Connection::Util;
use Scalar::Util qw(weaken);
use Carp;
# @rewrite s/^# //;
# use Devel::Leak::Cb;
sub _call_waiting {
my $me = shift;
for my $k (keys %{ $me->{waitingcb} }) {
warn "call waiting $k with @_" if $me->{debug};
if ($me->{waitingcb}{$k}) {
$me->{waitingcb}{$k}->(undef, @_);
}
delete $me->{waitingcb}{$k};
}
}
sub new {
my $pkg = shift;
my $self = $pkg->SUPER::new(@_);
$self->{nl} = "\015\012" unless defined $self->{nl};
$self->{debug} = 0 unless defined $self->{debug};
weaken(my $me = $self);
# @rewrite s/sub /cb 'conn.cb.eof' /;
$self->{cb}{eof} = sub {
$me or return;
#local *__ANON__ = 'conn.cb.eof';
warn "[\U$me->{side}\E] Eof on handle";
delete $me->{h};
$me->event('disconnect');
$me->_call_waiting("EOF from handle");
} ;
# @rewrite s/sub /cb 'conn.cb.err' /;
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
warn ">> @_ " if $self->{debug};
return;
}
*reply = \&say;
sub recv {
my ($self,$bytes,%args) = @_;
$args{cb} or croak "no cb for recv at @{[ (caller)[1,2] ]}";
$self->{h} or return $args{cb}(undef,"Not connected");
warn "<+ read $bytes " if $self->{debug};
weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
$self->{h}->unshift_read( chunk => $bytes, sub {
#local *__ANON__ = 'conn.recv.read';
# Also eat CRLF or LF from read buffer
substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\015";
substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\012";
delete $self->{waitingcb}{int $args{cb}};
shift; (delete $args{cb})->(@_);
%args = ();
} );
}
sub command {
my $self = shift;
my $write = shift;
if (utf8::is_utf8($write)) {
utf8::encode $write;
}
my %args = @_;
$args{cb} or croak "no cb for command at @{[ (caller)[1,2] ]}";
$self->{h} or return $args{cb}(undef,"Not connected"),%args = ();
weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
#my $i if 0;
#my $c = ++$i;
warn ">> $write " if $self->{debug};
$self->{h}->push_write("$write$self->{nl}");
#$self->{h}->timeout( $self->{select_timeout} );
warn "<? read " if $self->{debug};
# @rewrite s/sub {/cb 'conn.command.read' {/;
$self->{h}->push_read( regex => qr<\015?\012>, sub {
#local *__ANON__ = 'conn.command.read';
( run in 0.239 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )