AnyEvent-Handle-Throttle
view release on metacpan or search on metacpan
lib/AnyEvent/Handle/Throttle.pm view on Meta::CPAN
package AnyEvent::Handle::Throttle;
{
use strict;
use warnings;
use AnyEvent;
use Errno qw[EAGAIN EINTR];
use AnyEvent::Util qw[WSAEWOULDBLOCK];
use parent 'AnyEvent::Handle';
our $MAJOR = 0.00; our $MINOR = 2; our $DEV = -5; our $VERSION = sprintf('%1.3f%03d' . ($DEV ? (($DEV < 0 ? '' : '_') . '%03d') : ('')), $MAJOR, $MINOR, abs $DEV);
sub upload_limit {
$_[1] ? $_[0]->{upload_limit} = $_[1] : $_[0]->{upload_limit};
}
sub download_limit {
$_[1] ? $_[0]->{download_limit} = $_[1] : $_[0]->{download_limit};
}
sub upload_total { $_[0]->{upload_total} }
sub download_total { $_[0]->{download_total} }
sub upload_speed { $_[0]->{upload_speed} }
sub download_speed { $_[0]->{download_speed} }
my ($global_upload_total, $global_download_total,
$global_upload_limit, $global_download_limit,
$global_upload_speed, $global_download_speed
);
my $global_period = 1;
sub global_upload_limit {
$_[1] ? $global_upload_limit = $_[1] : $global_upload_limit;
}
sub global_download_limit {
$_[1] ? $global_download_limit = $_[1] : $global_download_limit;
}
sub global_upload_total {$global_upload_total}
sub global_download_total {$global_download_total}
sub global_upload_speed {$global_upload_speed}
sub global_download_speed {$global_download_speed}
my ($global_read_size, $global_write_size,
$global__upload_speed, $global__download_speed);
my $global_reset_cb = sub {
$global_read_size = $global_download_limit;
$global_write_size = $global_upload_limit || 8 * 1024;
$global_upload_speed = $global__upload_speed;
$global_download_speed = $global__download_speed;
$global__upload_speed = $global__download_speed = 0;
};
$global_reset_cb->();
our $global_reset = AE::timer(0, $global_period, $global_reset_cb);
sub _start {
my $self = shift;
my $reset = sub {
$self->{read_size} = $self->{download_limit};
$self->{write_size} = $self->{upload_limit} || 8 * 1024;
$self->{upload_speed} = $self->{_upload_speed};
$self->{download_speed} = $self->{_download_speed};
$self->{_upload_speed} = $self->{_download_speed} = 0;
};
$self->{_period} ||= 1;
$self->{_reset} = AE::timer(0, $self->{_period}, $reset);
$reset->();
$self->SUPER::_start(@_);
}
sub start_read {
my ($self) = @_;
unless ($self->{_rw} || $self->{_eof} || !$self->{fh}) {
Scalar::Util::weaken $self;
$self->{_rw} = AE::io $self->{fh}, 0, sub {
my ($read) = sort grep {defined} $global_read_size,
$self->{read_size};
my ($period) = sort grep {defined} $global_period,
$self->{_period};
if (defined $read && $read <= 0) {
$self->stop_read;
return $self->{_pause_read} = AE::timer(
$period, 0,
sub {
delete $self->{_pause_read};
$self->start_read;
}
);
}
my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
$$rbuf ||= '';
my $len = sysread $self->{fh}, $$rbuf, $read || 8192,
length $$rbuf;
if (defined $len) {
if ($len > 0) {
$self->{read_size} -= $len;
$global_read_size -= $len;
$self->{download_total} += $len;
$global_download_total += $len;
$self->{_download_speed} += $len;
$global__download_speed += $len;
$self->{_activity} = $self->{_ractivity} = AE::now;
if ($self->{tls}) {
Net::SSLeay::BIO_write($self->{_rbio}, $$rbuf);
&_dotls($self);
}
else {
$self->_drain_rbuf;
}
}
else {
delete $self->{_rw};
$self->{_eof} = 1;
$self->_drain_rbuf;
}
}
elsif ( $! != EAGAIN
&& $! != EINTR
&& $! != WSAEWOULDBLOCK)
{ return $self->_error($!, 1);
}
}
}
}
sub _drain_wbuf {
my ($self) = @_;
if (!$self->{_ww} && $self->{wbuf} && length $self->{wbuf}) {
Scalar::Util::weaken $self;
my $cb;
my $poll = sub {
$self->{_ww} = AE::io $self->{fh}, 1, $cb
if length $self->{wbuf};
};
$cb = sub {
my ($write) = sort grep {defined} $global_write_size,
$self->{write_size};
my ($period) = sort grep {defined} $global_period,
$self->{_period};
if (defined $write && $write <= 0) {
if (length $self->{wbuf}) {
delete $self->{_ww};
return $self->{_pause_ww} = AE::timer(
0.5, 0,
sub {
delete $self->{_pause_write};
$poll->();
}
);
}
return 1;
}
my $len = syswrite $self->{fh}, $self->{wbuf}, $write;
if (defined $len) {
$self->{write_size} -= $len;
$global_write_size -= $len;
$self->{upload_total} += $len;
$global_upload_total += $len;
$self->{_upload_speed} += $len;
$global__upload_speed += $len;
substr $self->{wbuf}, 0, $len, "";
$self->{_activity} = $self->{_wactivity} = AE::now;
$self->{on_drain}($self)
if $self->{low_water_mark}
|| 0 >= length($self->{wbuf} || '')
+ length($self->{_tls_wbuf} || '')
&& $self->{on_drain};
delete $self->{_ww} unless length $self->{wbuf};
}
elsif ( $! != EAGAIN
&& $! != EINTR
&& $! != WSAEWOULDBLOCK)
{ $self->_error($!, 1);
}
};
# try to write data immediately
$cb->() unless $self->{autocork};
$poll->();
}
}
}
1;
=pod
=head1 NAME
AnyEvent::Handle::Throttle - AnyEvent::Handle subclass with user-defined up/down bandwidth cap
=head1 Synopsis
use AnyEvent;
use AnyEvent::Handle::Throttle;
my $condvar = AnyEvent->condvar;
my $handle;
$handle = AnyEvent::Handle::Throttle->new(
upload_limit => 2, # Very...
download_limit => 50, # ...slow
connect => ['google.com', 'http'],
on_error => sub {
warn "error $_[2]\n";
$_[0]->destroy;
( run in 1.110 second using v1.01-cache-2.11-cpan-39bf76dae61 )