AnyEvent
view release on metacpan or search on metacpan
lib/AnyEvent/Handle.pm view on Meta::CPAN
=item json => L<JSON>, L<JSON::PP> or L<JSON::XS> object
This is the json coder object used by the C<json> read and write types.
If you don't supply it, then AnyEvent::Handle will create and use a
suitable one (on demand), which will write and expect UTF-8 encoded
JSON texts (either using L<JSON::XS> or L<JSON>). The written texts are
guaranteed not to contain any newline character.
For security reasons, this encoder will likely I<not> handle numbers and
strings, only arrays and objects/hashes. The reason is that originally
JSON was self-delimited, but Dougles Crockford thought it was a splendid
idea to redefine JSON incompatibly, so this is no longer true.
For protocols that used back-to-back JSON texts, this might lead to
run-ins, where two or more JSON texts will be interpreted as one JSON
text.
For this reason, if the default encoder uses L<JSON::XS>, it will default
to not allowing anything but arrays and objects/hashes, at least for the
forseeable future (it will change at some point). This might or might not
be true for the L<JSON> module, so this might cause a security issue.
If you depend on either behaviour, you should create your own json object
and pass it in explicitly.
=item cbor => L<CBOR::XS> object
This is the cbor coder object used by the C<cbor> read and write types.
If you don't supply it, then AnyEvent::Handle will create and use a
suitable one (on demand), which will write CBOR without using extensions,
if possible.
Note that you are responsible to depend on the L<CBOR::XS> module if you
want to use this functionality, as AnyEvent does not have a dependency on
it itself.
=back
=cut
sub new {
my $class = shift;
my $self = bless { @_ }, $class;
if ($self->{fh}) {
$self->_start;
return unless $self->{fh}; # could be gone by now
} elsif ($self->{connect}) {
require AnyEvent::Socket;
$self->{peername} = $self->{connect}[0]
unless exists $self->{peername};
$self->{_skip_drain_rbuf} = 1;
{
Scalar::Util::weaken (my $self = $self);
$self->{_connect} =
AnyEvent::Socket::tcp_connect (
$self->{connect}[0],
$self->{connect}[1],
sub {
my ($fh, $host, $port, $retry) = @_;
delete $self->{_connect}; # no longer needed
if ($fh) {
$self->{fh} = $fh;
delete $self->{_skip_drain_rbuf};
$self->_start;
$self->{on_connect}
and $self->{on_connect}($self, $host, $port, sub {
delete @$self{qw(fh _tw _rtw _wtw _ww _rw _eof _queue rbuf _wbuf tls _tls_rbuf _tls_wbuf)};
$self->{_skip_drain_rbuf} = 1;
&$retry;
});
} else {
if ($self->{on_connect_error}) {
$self->{on_connect_error}($self, "$!");
$self->destroy if $self;
} else {
$self->_error ($!, 1);
}
}
},
sub {
local $self->{fh} = $_[0];
$self->{on_prepare}
? $self->{on_prepare}->($self)
: ()
}
);
}
} else {
Carp::croak "AnyEvent::Handle: either an existing fh or the connect parameter must be specified";
}
$self
}
sub _start {
my ($self) = @_;
# too many clueless people try to use udp and similar sockets
# with AnyEvent::Handle, do them a favour.
my $type = getsockopt $self->{fh}, Socket::SOL_SOCKET (), Socket::SO_TYPE ();
Carp::croak "AnyEvent::Handle: only stream sockets supported, anything else will NOT work!"
if Socket::SOCK_STREAM () != (unpack "I", $type) && defined $type;
AnyEvent::fh_unblock $self->{fh};
lib/AnyEvent/Handle.pm view on Meta::CPAN
Reset the activity timeout, as if data was received or sent.
These methods are cheap to call.
=cut
for my $dir ("", "r", "w") {
my $timeout = "${dir}timeout";
my $tw = "_${dir}tw";
my $on_timeout = "on_${dir}timeout";
my $activity = "_${dir}activity";
my $cb;
*$on_timeout = sub {
$_[0]{$on_timeout} = $_[1];
};
*$timeout = sub {
my ($self, $new_value) = @_;
$new_value >= 0
or Carp::croak "AnyEvent::Handle->$timeout called with negative timeout ($new_value), caught";
$self->{$timeout} = $new_value;
delete $self->{$tw}; &$cb;
};
*{"${dir}timeout_reset"} = sub {
$_[0]{$activity} = AE::now;
};
# main workhorse:
# reset the timeout watcher, as neccessary
# also check for time-outs
$cb = sub {
my ($self) = @_;
if ($self->{$timeout} && $self->{fh}) {
my $NOW = AE::now;
# when would the timeout trigger?
my $after = $self->{$activity} + $self->{$timeout} - $NOW;
# now or in the past already?
if ($after <= 0) {
$self->{$activity} = $NOW;
if ($self->{$on_timeout}) {
$self->{$on_timeout}($self);
} else {
$self->_error (Errno::ETIMEDOUT);
}
# callback could have changed timeout value, optimise
return unless $self->{$timeout};
# calculate new after
$after = $self->{$timeout};
}
Scalar::Util::weaken $self;
return unless $self; # ->error could have destroyed $self
$self->{$tw} ||= AE::timer $after, 0, sub {
delete $self->{$tw};
$cb->($self);
};
} else {
delete $self->{$tw};
}
}
}
#############################################################################
=back
=head2 WRITE QUEUE
AnyEvent::Handle manages two queues per handle, one for writing and one
for reading.
The write queue is very simple: you can add data to its end, and
AnyEvent::Handle will automatically try to get rid of it for you.
When data could be written and the write buffer is shorter then the low
water mark, the C<on_drain> callback will be invoked once.
=over 4
=item $handle->on_drain ($cb)
Sets the C<on_drain> callback or clears it (see the description of
C<on_drain> in the constructor).
This method may invoke callbacks (and therefore the handle might be
destroyed after it returns).
=cut
sub on_drain {
my ($self, $cb) = @_;
$self->{on_drain} = $cb;
$cb->($self)
if $cb && $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf});
}
=item $handle->push_write ($data)
Queues the given scalar to be written. You can push as much data as
you want (only limited by the available memory and C<wbuf_max>), as
C<AnyEvent::Handle> buffers it independently of the kernel.
This method may invoke callbacks (and therefore the handle might be
destroyed after it returns).
=cut
sub _drain_wbuf {
my ($self) = @_;
if (!$self->{_ww} && length $self->{wbuf}) {
Scalar::Util::weaken $self;
my $cb = sub {
my $len = syswrite $self->{fh}, $self->{wbuf};
if (defined $len) {
substr $self->{wbuf}, 0, $len, "";
$self->{_activity} = $self->{_wactivity} = AE::now;
$self->{on_drain}($self)
if $self->{low_water_mark} >= (length $self->{wbuf}) + (length $self->{_tls_wbuf})
&& $self->{on_drain};
delete $self->{_ww} unless length $self->{wbuf};
} elsif ($! != EAGAIN && $! != EINTR && $! != EWOULDBLOCK && $! != WSAEWOULDBLOCK) {
$self->_error ($!, 1);
}
};
# try to write data immediately
$cb->() unless $self->{autocork};
# if still data left in wbuf, we need to poll
$self->{_ww} = AE::io $self->{fh}, 1, $cb
if length $self->{wbuf};
if (
defined $self->{wbuf_max}
&& $self->{wbuf_max} < length $self->{wbuf}
) {
$self->_error (Errno::ENOSPC, 1), return;
}
};
}
our %WH;
# deprecated
sub register_write_type($$) {
$WH{$_[0]} = $_[1];
}
sub push_write {
my $self = shift;
if (@_ > 1) {
my $type = shift;
@_ = ($WH{$type} ||= _load_func "$type\::anyevent_write_type"
or Carp::croak "unsupported/unloadable type '$type' passed to AnyEvent::Handle::push_write")
->($self, @_);
}
# we downgrade here to avoid hard-to-track-down bugs,
# and diagnose the problem earlier and better.
if ($self->{tls}) {
utf8::downgrade $self->{_tls_wbuf} .= $_[0];
&_dotls ($self) if $self->{fh};
} else {
lib/AnyEvent/Handle.pm view on Meta::CPAN
Instead of one of the predefined types, you can also specify the name
of a package. AnyEvent will try to load the package and then expects to
find a function named C<anyevent_read_type> inside. If it isn't found, it
progressively tries to load the parent package until it either finds the
function (good) or runs out of packages (bad).
Whenever this type is used, C<push_read> will invoke the function with the
handle object, the original callback and the remaining arguments.
The function is supposed to return a callback (usually a closure) that
works as a plain read callback (see C<< ->push_read ($cb) >>), so you can
mentally treat the function as a "configurable read type to read callback"
converter.
It should invoke the original callback when it is done reading (remember
to pass C<$handle> as first argument as all other callbacks do that,
although there is no strict requirement on this).
For examples, see the source of this module (F<perldoc -m
AnyEvent::Handle>, search for C<register_read_type>)).
=item $handle->stop_read
=item $handle->start_read
In rare cases you actually do not want to read anything from the
socket. In this case you can call C<stop_read>. Neither C<on_read> nor
any queued callbacks will be executed then. To start reading again, call
C<start_read>.
Note that AnyEvent::Handle will automatically C<start_read> for you when
you change the C<on_read> callback or push/unshift a read callback, and it
will automatically C<stop_read> for you when neither C<on_read> is set nor
there are any read requests in the queue.
In older versions of this module (<= 5.3), these methods had no effect,
as TLS does not support half-duplex connections. In current versions they
work as expected, as this behaviour is required to avoid certain resource
attacks, where the program would be forced to read (and buffer) arbitrary
amounts of data before being able to send some data. The drawback is that
some readings of the the SSL/TLS specifications basically require this
attack to be working, as SSL/TLS implementations might stall sending data
during a rehandshake.
As a guideline, during the initial handshake, you should not stop reading,
and as a client, it might cause problems, depending on your application.
=cut
sub stop_read {
my ($self) = @_;
delete $self->{_rw};
}
sub start_read {
my ($self) = @_;
unless ($self->{_rw} || $self->{_eof} || !$self->{fh}) {
Scalar::Util::weaken $self;
$self->{_rw} = AE::io $self->{fh}, 0, sub {
my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
my $len = sysread $self->{fh}, $$rbuf, $self->{read_size}, length $$rbuf;
if ($len > 0) {
$self->{_activity} = $self->{_ractivity} = AE::now;
if ($self->{tls}) {
Net::SSLeay::BIO_write ($self->{_rbio}, $$rbuf);
&_dotls ($self);
} else {
$self->_drain_rbuf;
}
if ($len == $self->{read_size}) {
$self->{read_size} *= 2;
$self->{read_size} = $self->{max_read_size} || MAX_READ_SIZE
if $self->{read_size} > ($self->{max_read_size} || MAX_READ_SIZE);
}
} elsif (defined $len) {
delete $self->{_rw};
$self->{_eof} = 1;
$self->_drain_rbuf;
} elsif ($! != EAGAIN && $! != EINTR && $! != EWOULDBLOCK && $! != WSAEWOULDBLOCK) {
return $self->_error ($!, 1);
}
};
}
}
our $ERROR_SYSCALL;
our $ERROR_WANT_READ;
sub _tls_error {
my ($self, $err) = @_;
return $self->_error ($!, 1)
if $err == Net::SSLeay::ERROR_SYSCALL ();
my $err = Net::SSLeay::ERR_error_string (Net::SSLeay::ERR_get_error ());
# reduce error string to look less scary
$err =~ s/^error:[0-9a-fA-F]{8}:[^:]+:([^:]+):/\L$1: /;
if ($self->{_on_starttls}) {
(delete $self->{_on_starttls})->($self, undef, $err);
&_freetls;
} else {
&_freetls;
$self->_error (Errno::EPROTO, 1, $err);
}
}
# poll the write BIO and send the data if applicable
# also decode read data if possible
# this is basiclaly our TLS state machine
( run in 1.076 second using v1.01-cache-2.11-cpan-39bf76dae61 )