AnyEvent-Connection
view release on metacpan or search on metacpan
lib/AnyEvent/Connection/Raw.pm view on Meta::CPAN
sub AnyEvent::Connection::Raw::destroyed::AUTOLOAD {}
sub DESTROY {
my $self = shift;
warn "(".int($self).") Destroying AE::CNN::Raw" if $self->{debug};
delete $self->{fh};
$self->_call_waiting("destroying connection");
$self->{h} and $self->{h}->destroy;
delete $self->{h};
%$self = ();
return;
}
sub push_write {
my $self = shift;
$self->{h} or return;
for (@_) {
if (!ref and utf8::is_utf8($_)) {
$_ = $_;
utf8::encode $_;
}
}
$self->{h}->push_write(@_);
warn ">> @_ " if $self->{debug};
}
sub push_read {
my $self = shift;
my $cb = pop;
$self->{h} or return;
$self->{h}->timeout($self->{timeout}) if $self->{timeout};
$self->{h}->push_read(@_,sub {
shift->timeout(undef); # disable timeout and remove handle from @_
$cb->($self,@_);
undef $cb;
});
}
sub unshift_read {
my $self = shift;
$self->{h} or return;
$self->{h}->unshift_read(@_);
}
sub say {
my $self = shift;
$self->{h} or return;
for (@_) {
if (!ref and utf8::is_utf8($_)) {
$_ = $_;
utf8::encode $_;
}
}
$self->{h}->push_write("@_$self->{nl}");
warn ">> @_ " if $self->{debug};
return;
}
*reply = \&say;
sub recv {
my ($self,$bytes,%args) = @_;
$args{cb} or croak "no cb for recv at @{[ (caller)[1,2] ]}";
$self->{h} or return $args{cb}(undef,"Not connected");
warn "<+ read $bytes " if $self->{debug};
weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
$self->{h}->unshift_read( chunk => $bytes, sub {
#local *__ANON__ = 'conn.recv.read';
# Also eat CRLF or LF from read buffer
substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\015";
substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\012";
delete $self->{waitingcb}{int $args{cb}};
shift; (delete $args{cb})->(@_);
%args = ();
} );
}
sub command {
my $self = shift;
my $write = shift;
if (utf8::is_utf8($write)) {
utf8::encode $write;
}
my %args = @_;
$args{cb} or croak "no cb for command at @{[ (caller)[1,2] ]}";
$self->{h} or return $args{cb}(undef,"Not connected"),%args = ();
weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
#my $i if 0;
#my $c = ++$i;
warn ">> $write " if $self->{debug};
$self->{h}->push_write("$write$self->{nl}");
#$self->{h}->timeout( $self->{select_timeout} );
warn "<? read " if $self->{debug};
# @rewrite s/sub {/cb 'conn.command.read' {/;
$self->{h}->push_read( regex => qr<\015?\012>, sub {
#local *__ANON__ = 'conn.command.read';
shift;
for (@_) {
chomp;
substr($_,-1,1) = '' if substr($_, -1,1) eq "\015";
}
warn "<< @_ " if $self->{debug};
delete $self->{waitingcb}{int $args{cb}};
delete($args{cb})->(@_);
%args = ();
undef $self;
} );
#sub {
#$self->{state}{handle}->timeout( 0 ) if $self->_qsize < 1;
#diag "<< $c. $write: $_[1] (".$self->_qsize."), timeout ".($self->{state}{handle}->timeout ? 'enabled' : 'disabled');
#$cb->(@_);
#});
}
# Serverside feature
sub want_command {
my $self = shift;
$self->{h} or return;
# @rewrite s/sub {/cb 'conn.wand_command.read' {/;
$self->{h}->push_read( regex => qr<\015?\012>, sub {
#local *__ANON__ = 'conn.want_command.read';
shift;
for (@_) {
chomp;
substr($_,-1,1) = '' if substr($_, -1,1) eq "\015";
}
$self->event( command => @_ );
$self->want_command;
});
}
1;
( run in 1.061 second using v1.01-cache-2.11-cpan-5735350b133 )