AnyEvent-Connection
view release on metacpan or search on metacpan
lib/AnyEvent/Connection.pm view on Meta::CPAN
package AnyEvent::Connection;
use common::sense 2;m{
use strict;
use warnings;
};
use Object::Event 1.21;
use base 'Object::Event';
use AnyEvent 5;
use AnyEvent::Socket;
use Carp;
use Scalar::Util qw(weaken);
use AnyEvent::Connection::Raw;
use AnyEvent::Connection::Util;
# @rewrite s/^# //; # Development hacks, see L<Devel::Rewrite>
# use Devel::Leak::Cb;
=head1 NAME
AnyEvent::Connection - Base class for tcp connectful clients
=cut
our $VERSION = '0.06';
=head1 SYNOPSIS
package MyTCPClient;
use base 'AnyEvent::Connection';
package main;
my $client = MyTCPClient->new(
host => 'localhost',
port => 12345,
);
$client->reg_cb(
connected => sub {
my ($client,$connection,$host,$port) = @_;
# ...
$client->after(
$interval, sub {
# Called after interval, if connection still alive
}
);
}
connfail = sub {
my ($client,$reason) = @_;
# ...
},
disconnect => sub {
my ($client,$reason) = @_;
},
error => sub {
my ($client,$error) = @_;
# Called in error conditions for callbackless methods
},
);
$client->connect;
=head1 EVENTS
=over 4
=item connected ($connobject, $host, $port)
Called when client get connected.
=item connfail
Called, when client fails to connect
=item disconnect
lib/AnyEvent/Connection.pm view on Meta::CPAN
=item say
Same as push_write + newline
=item reply
Same as push_write + newline
=back
For next methods there is a feature.
Callback will be called in any way, either by successful processing or by error or object destruction
=over 4
=item recv($bytes, %args, cb => $cb->())
Similar to
$fh->push_read(chunk => $bytes, $cb->());
=item command($data, %args, cb => $cb->());
Similar to
$fh->push_write($data);
$fh->push_read(line => $cb->());
=back
=cut
sub new {
my $self = shift->SUPER::new(@_);
$self->init(@_);
return $self;
}
sub init {
my $self = shift;
$self->{debug} ||= 0;
$self->{connected} = 0;
$self->{connecting} = 0;
$self->{reconnect} = 1 unless defined $self->{reconnect};
$self->{timeout} ||= 3;
$self->{timers} = {};
$self->{rawcon} ||= 'AnyEvent::Connection::Raw';
#warn "Init $self";
}
#sub connected {
# warn "Connected";
# shift->event(connected => ());
#}
sub connect {
my $self = shift;
$self->{connecting} and return;
$self->{connecting} = 1;
weaken $self;
croak "Only client can connect but have $self->{type}" if $self->{type} and $self->{type} ne 'client';
$self->{type} = 'client';
warn "Connecting to $self->{host}:$self->{port}..." if $self->{debug};
# @rewrite s/sub {/cb connect {/;
$self->{_}{con}{cb} = sub {
pop;
delete $self->{_}{con};
if (my $fh = shift) {
warn "Connected @_" if $self->{debug};
$self->{con} = $self->{rawcon}->new(
fh => $fh,
timeout => $self->{timeout},
debug => $self->{debug},
);
$self->{con}->reg_cb(
disconnect => sub {
warn "Disconnected $self->{host}:$self->{port} @_" if $self->{debug};
$self->disconnect(@_);
$self->_reconnect_after();
},
);
$self->{connected} = 1;
#warn "Send connected event";
$self->event(connected => $self->{con}, @_);
} else {
warn "Not connected $self->{host}:$self->{port}: $!" if $self->{debug};
$self->event(connfail => "$!");
$self->_reconnect_after();
}
};
$self->{_}{con}{pre} = sub { $self->{timeout} };
$self->{_}{con}{grd} =
AnyEvent::Socket::tcp_connect
$self->{host}, $self->{port},
$self->{_}{con}{cb}, $self->{_}{con}{pre}
;
}
sub accept {
croak "Not implemented yet";
}
sub _reconnect_after {
weaken( my $self = shift );
$self->{reconnect} or return $self->{connecting} = 0;
$self->{timers}{reconnect} = AnyEvent->timer(
after => $self->{reconnect},
cb => sub {
$self or return;
delete $self->{timers}{reconnect};
$self->{connecting} = 0;
$self->connect;
}
);
}
sub periodic_stop;
sub periodic {
weaken( my $self = shift );
my $interval = shift;
my $cb = shift;
#warn "Create periodic $interval";
$self->{timers}{int $cb} = AnyEvent->timer(
after => $interval,
interval => $interval,
cb => sub {
local *periodic_stop = sub {
warn "Stopping periodic ".int $cb;
delete $self->{timers}{int $cb}; undef $cb
};
$self or return;
$cb->();
},
);
defined wantarray and return AnyEvent::Util::guard(sub {
delete $self->{timers}{int $cb};
undef $cb;
});
return;
}
sub after {
weaken( my $self = shift );
my $interval = shift;
my $cb = shift;
#warn "Create after $interval";
$self->{timers}{int $cb} = AnyEvent->timer(
after => $interval,
cb => sub {
$self or return;
delete $self->{timers}{int $cb};
$cb->();
undef $cb;
},
);
defined wantarray and return AnyEvent::Util::guard(sub {
delete $self->{timers}{int $cb};
undef $cb;
});
return;
}
sub reconnect {
my $self = shift;
$self->disconnect;
$self->connect;
}
sub disconnect {
my $self = shift;
#$self->{con} or return;
#warn "Disconnecting $self->{connected} || $self->{connecting} || $self->{reconnect} by @{[ (caller)[1,2] ]}";
ref $self->{con} eq 'HASH' and warn dumper($self->{con});
$self->{con} and eval{ $self->{con}->close; };
warn if $@;
delete $self->{con};
my $wascon = $self->{connected} || $self->{connecting};
$self->{connected} = 0;
$self->{connecting} = 0;
#$self->{reconnect} = 0;
delete $self->{timers}{reconnect};
$self->event('disconnect',@_) if $wascon;
return;
}
sub AnyEvent::Connection::destroyed::AUTOLOAD {}
sub destroy {
my ($self) = @_;
$self->DESTROY;
bless $self, "AnyEvent::Connection::destroyed";
}
sub DESTROY {
my $self = shift;
warn "(".int($self).") Destroying AE::CNN" if $self->{debug};
$self->disconnect;
%$self = ();
}
BEGIN {
no strict 'refs';
for my $m (qw(push_write push_read unshift_read say reply recv command want_command)) {
( run in 1.705 second using v1.01-cache-2.11-cpan-39bf76dae61 )