AnyEvent-Connection

 view release on metacpan or  search on metacpan

lib/AnyEvent/Connection/Raw.pm  view on Meta::CPAN

	$self->{timeout} ||= 30;
	$self->{h} = AnyEvent::Handle->new(
		fh        => $self->{fh},
		autocork  => 1,
		keepalive => 1,
		on_eof    => $self->{cb}{eof},
		on_error  => $self->{cb}{err},
	);
	$self;
}

sub destroy {
	my ($self) = @_;
	$self->DESTROY;
	bless $self, "AnyEvent::Connection::Raw::destroyed";
}
*close = \&destroy;
sub AnyEvent::Connection::Raw::destroyed::AUTOLOAD {}
sub DESTROY {
	my $self = shift;
	warn "(".int($self).") Destroying AE::CNN::Raw" if $self->{debug};
	delete $self->{fh};
	$self->_call_waiting("destroying connection");
	$self->{h} and $self->{h}->destroy;
	delete $self->{h};
	%$self = ();
	return;
}

sub push_write {
	my $self = shift;
	$self->{h} or return;
	for (@_) {
		if (!ref and utf8::is_utf8($_)) {
			$_ = $_;
			utf8::encode $_;
		}
	}
	$self->{h}->push_write(@_);
	warn ">> @_  " if $self->{debug};
}

sub push_read {
	my $self = shift;
	my $cb = pop;
	$self->{h} or return;
	$self->{h}->timeout($self->{timeout}) if $self->{timeout};
	$self->{h}->push_read(@_,sub {
		shift->timeout(undef); # disable timeout and remove handle from @_
		$cb->($self,@_);
		undef $cb;
	});
}

sub unshift_read {
	my $self = shift;
	$self->{h} or return;
	$self->{h}->unshift_read(@_);
}

sub say {
	my $self = shift;
	$self->{h} or return;
	for (@_) {
		if (!ref and utf8::is_utf8($_)) {
			$_ = $_;
			utf8::encode $_;
		}
	}
	$self->{h}->push_write("@_$self->{nl}");
	warn ">> @_  " if $self->{debug};
	return;
}
*reply = \&say;

sub recv {
	my ($self,$bytes,%args) = @_;
	$args{cb}  or croak "no cb for recv at @{[ (caller)[1,2] ]}";
	$self->{h} or return $args{cb}(undef,"Not connected");
	warn "<+ read $bytes " if $self->{debug};
	weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
	$self->{h}->unshift_read( chunk => $bytes, sub {
		#local *__ANON__ = 'conn.recv.read';
		# Also eat CRLF or LF from read buffer
		substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\015";
		substr( $self->{h}{rbuf}, 0, 1 ) = '' if substr( $self->{h}{rbuf}, 0, 1 ) eq "\012";
		delete $self->{waitingcb}{int $args{cb}};
		shift; (delete $args{cb})->(@_);
		%args = ();
	} );
}

sub command {
	my $self = shift;
	my $write = shift;
	if (utf8::is_utf8($write)) {
		utf8::encode $write;
	}
	my %args = @_;
	$args{cb}  or croak "no cb for command at @{[ (caller)[1,2] ]}";
	$self->{h} or return $args{cb}(undef,"Not connected"),%args = ();
	weaken( $self->{waitingcb}{int $args{cb}} = $args{cb} );
	
	#my $i if 0;
	#my $c = ++$i;
	warn ">> $write  " if $self->{debug};
	$self->{h}->push_write("$write$self->{nl}");
	#$self->{h}->timeout( $self->{select_timeout} );
	warn "<? read  " if $self->{debug};
	# @rewrite s/sub {/cb 'conn.command.read' {/;
	$self->{h}->push_read( regex => qr<\015?\012>, sub {
		#local *__ANON__ = 'conn.command.read';
		shift;
		for (@_) {
			chomp;
			substr($_,-1,1) = '' if substr($_, -1,1) eq "\015";
		}
		warn "<< @_  " if $self->{debug};
		delete $self->{waitingcb}{int $args{cb}};
		delete($args{cb})->(@_);
		%args = ();



( run in 0.391 second using v1.01-cache-2.11-cpan-d7a12ab2c7f )