AnyEvent-Connection
view release on metacpan or search on metacpan
lib/AnyEvent/Connection.pm view on Meta::CPAN
=head1 CONNECT METHODS
When connected, there are some methods, that proxied to raw connection or to AE::Handle
=over 4
=item push_write
See AE::Handle::push_write
=item push_read
See AE::Handle::push_read
=item unshift_read
See AE::Handle::unshift_read
=item say
Same as push_write + newline
=item reply
Same as push_write + newline
=back
For next methods there is a feature.
Callback will be called in any way, either by successful processing or by error or object destruction
=over 4
=item recv($bytes, %args, cb => $cb->())
Similar to
$fh->push_read(chunk => $bytes, $cb->());
=item command($data, %args, cb => $cb->());
Similar to
$fh->push_write($data);
$fh->push_read(line => $cb->());
=back
=cut
sub new {
my $self = shift->SUPER::new(@_);
$self->init(@_);
return $self;
}
sub init {
my $self = shift;
$self->{debug} ||= 0;
$self->{connected} = 0;
$self->{connecting} = 0;
$self->{reconnect} = 1 unless defined $self->{reconnect};
$self->{timeout} ||= 3;
$self->{timers} = {};
$self->{rawcon} ||= 'AnyEvent::Connection::Raw';
#warn "Init $self";
}
#sub connected {
# warn "Connected";
# shift->event(connected => ());
#}
sub connect {
my $self = shift;
$self->{connecting} and return;
$self->{connecting} = 1;
weaken $self;
croak "Only client can connect but have $self->{type}" if $self->{type} and $self->{type} ne 'client';
$self->{type} = 'client';
warn "Connecting to $self->{host}:$self->{port}..." if $self->{debug};
# @rewrite s/sub {/cb connect {/;
$self->{_}{con}{cb} = sub {
pop;
delete $self->{_}{con};
if (my $fh = shift) {
warn "Connected @_" if $self->{debug};
$self->{con} = $self->{rawcon}->new(
fh => $fh,
timeout => $self->{timeout},
debug => $self->{debug},
);
$self->{con}->reg_cb(
disconnect => sub {
warn "Disconnected $self->{host}:$self->{port} @_" if $self->{debug};
$self->disconnect(@_);
$self->_reconnect_after();
},
);
$self->{connected} = 1;
#warn "Send connected event";
$self->event(connected => $self->{con}, @_);
} else {
warn "Not connected $self->{host}:$self->{port}: $!" if $self->{debug};
$self->event(connfail => "$!");
$self->_reconnect_after();
}
};
$self->{_}{con}{pre} = sub { $self->{timeout} };
$self->{_}{con}{grd} =
AnyEvent::Socket::tcp_connect
$self->{host}, $self->{port},
$self->{_}{con}{cb}, $self->{_}{con}{pre}
;
}
sub accept {
croak "Not implemented yet";
}
sub _reconnect_after {
weaken( my $self = shift );
$self->{reconnect} or return $self->{connecting} = 0;
$self->{timers}{reconnect} = AnyEvent->timer(
after => $self->{reconnect},
cb => sub {
$self or return;
delete $self->{timers}{reconnect};
$self->{connecting} = 0;
$self->connect;
}
);
}
sub periodic_stop;
sub periodic {
weaken( my $self = shift );
my $interval = shift;
my $cb = shift;
#warn "Create periodic $interval";
$self->{timers}{int $cb} = AnyEvent->timer(
after => $interval,
interval => $interval,
cb => sub {
local *periodic_stop = sub {
warn "Stopping periodic ".int $cb;
delete $self->{timers}{int $cb}; undef $cb
};
$self or return;
$cb->();
},
);
defined wantarray and return AnyEvent::Util::guard(sub {
delete $self->{timers}{int $cb};
undef $cb;
});
return;
}
sub after {
weaken( my $self = shift );
my $interval = shift;
my $cb = shift;
#warn "Create after $interval";
$self->{timers}{int $cb} = AnyEvent->timer(
after => $interval,
cb => sub {
$self or return;
delete $self->{timers}{int $cb};
$cb->();
undef $cb;
},
);
defined wantarray and return AnyEvent::Util::guard(sub {
delete $self->{timers}{int $cb};
undef $cb;
});
return;
}
sub reconnect {
my $self = shift;
$self->disconnect;
$self->connect;
}
sub disconnect {
my $self = shift;
#$self->{con} or return;
#warn "Disconnecting $self->{connected} || $self->{connecting} || $self->{reconnect} by @{[ (caller)[1,2] ]}";
ref $self->{con} eq 'HASH' and warn dumper($self->{con});
$self->{con} and eval{ $self->{con}->close; };
warn if $@;
delete $self->{con};
my $wascon = $self->{connected} || $self->{connecting};
$self->{connected} = 0;
$self->{connecting} = 0;
#$self->{reconnect} = 0;
delete $self->{timers}{reconnect};
$self->event('disconnect',@_) if $wascon;
return;
}
sub AnyEvent::Connection::destroyed::AUTOLOAD {}
sub destroy {
my ($self) = @_;
$self->DESTROY;
bless $self, "AnyEvent::Connection::destroyed";
}
sub DESTROY {
my $self = shift;
warn "(".int($self).") Destroying AE::CNN" if $self->{debug};
$self->disconnect;
%$self = ();
}
BEGIN {
no strict 'refs';
for my $m (qw(push_write push_read unshift_read say reply recv command want_command)) {
*$m = sub {
my $self = shift;
$self->{connected} or return $self->event( error => "Not connected for $m" );
$self->{con}->$m(@_);
};
}
}
=head1 AUTHOR
Mons Anderson, C<< <mons at cpan.org> >>
=head1 BUGS
Please report any bugs or feature requests to C<bug-anyevent-connection at rt.cpan.org>, or through
the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=AnyEvent-Connection>. I will be notified, and then you'll
automatically be notified of progress on your bug as I make changes.
=head1 SUPPORT
You can find documentation for this module with the perldoc command.
perldoc AnyEvent::Connection
You can also look for information at:
=over 4
=item * RT: CPAN's request tracker
L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=AnyEvent-Connection>
=item * AnnoCPAN: Annotated CPAN documentation
L<http://annocpan.org/dist/AnyEvent-Connection>
=item * CPAN Ratings
L<http://cpanratings.perl.org/d/AnyEvent-Connection>
=item * Search CPAN
L<http://search.cpan.org/dist/AnyEvent-Connection/>
=back
=head1 ACKNOWLEDGEMENTS
( run in 0.893 second using v1.01-cache-2.11-cpan-d59ab9ce9b0 )