AnyEvent-Handle-Throttle

 view release on metacpan or  search on metacpan

lib/AnyEvent/Handle/Throttle.pm  view on Meta::CPAN

package AnyEvent::Handle::Throttle;
{
    use strict;
    use warnings;
    use AnyEvent;
    use Errno qw[EAGAIN EINTR];
    use AnyEvent::Util qw[WSAEWOULDBLOCK];
    use parent 'AnyEvent::Handle';
    our $MAJOR = 0.00; our $MINOR = 2; our $DEV = -5; our $VERSION = sprintf('%1.3f%03d' . ($DEV ? (($DEV < 0 ? '' : '_') . '%03d') : ('')), $MAJOR, $MINOR, abs $DEV);

    sub upload_limit {
        $_[1] ? $_[0]->{upload_limit} = $_[1] : $_[0]->{upload_limit};
    }

    sub download_limit {
        $_[1] ? $_[0]->{download_limit} = $_[1] : $_[0]->{download_limit};
    }
    sub upload_total   { $_[0]->{upload_total} }
    sub download_total { $_[0]->{download_total} }
    sub upload_speed   { $_[0]->{upload_speed} }
    sub download_speed { $_[0]->{download_speed} }
    my ($global_upload_total, $global_download_total,
        $global_upload_limit, $global_download_limit,
        $global_upload_speed, $global_download_speed
    );
    my $global_period = 1;

    sub global_upload_limit {
        $_[1] ? $global_upload_limit = $_[1] : $global_upload_limit;
    }

    sub global_download_limit {
        $_[1] ? $global_download_limit = $_[1] : $global_download_limit;
    }
    sub global_upload_total   {$global_upload_total}
    sub global_download_total {$global_download_total}
    sub global_upload_speed   {$global_upload_speed}
    sub global_download_speed {$global_download_speed}
    my ($global_read_size,     $global_write_size,
        $global__upload_speed, $global__download_speed);
    my $global_reset_cb = sub {
        $global_read_size      = $global_download_limit;
        $global_write_size     = $global_upload_limit || 8 * 1024;
        $global_upload_speed   = $global__upload_speed;
        $global_download_speed = $global__download_speed;
        $global__upload_speed  = $global__download_speed = 0;
    };
    $global_reset_cb->();
    our $global_reset = AE::timer(0, $global_period, $global_reset_cb);

    sub _start {
        my $self  = shift;
        my $reset = sub {
            $self->{read_size}      = $self->{download_limit};
            $self->{write_size}     = $self->{upload_limit} || 8 * 1024;
            $self->{upload_speed}   = $self->{_upload_speed};
            $self->{download_speed} = $self->{_download_speed};
            $self->{_upload_speed}  = $self->{_download_speed} = 0;
        };
        $self->{_period} ||= 1;
        $self->{_reset} = AE::timer(0, $self->{_period}, $reset);
        $reset->();
        $self->SUPER::_start(@_);
    }

    sub start_read {
        my ($self) = @_;
        unless ($self->{_rw} || $self->{_eof} || !$self->{fh}) {
            Scalar::Util::weaken $self;
            $self->{_rw} = AE::io $self->{fh}, 0, sub {
                my ($read) = sort grep {defined} $global_read_size,
                    $self->{read_size};
                my ($period) = sort grep {defined} $global_period,
                    $self->{_period};
                if (defined $read && $read <= 0) {
                    $self->stop_read;
                    return $self->{_pause_read} = AE::timer(
                        $period, 0,
                        sub {
                            delete $self->{_pause_read};
                            $self->start_read;
                        }
                    );
                }
                my $rbuf = \($self->{tls} ? my $buf : $self->{rbuf});
                $$rbuf ||= '';
                my $len = sysread $self->{fh}, $$rbuf, $read || 8192,
                    length $$rbuf;
                if (defined $len) {
                    if ($len > 0) {
                        $self->{read_size} -= $len;
                        $global_read_size  -= $len;
                        $self->{download_total}  += $len;
                        $global_download_total   += $len;
                        $self->{_download_speed} += $len;
                        $global__download_speed  += $len;
                        $self->{_activity} = $self->{_ractivity} = AE::now;
                        if ($self->{tls}) {
                            Net::SSLeay::BIO_write($self->{_rbio}, $$rbuf);
                            &_dotls($self);
                        }
                        else {
                            $self->_drain_rbuf;
                        }
                    }
                    else {
                        delete $self->{_rw};
                        $self->{_eof} = 1;
                        $self->_drain_rbuf;
                    }
                }
                elsif (   $! != EAGAIN
                       && $! != EINTR
                       && $! != WSAEWOULDBLOCK)
                {   return $self->_error($!, 1);
                }
                }
        }
    }

    sub _drain_wbuf {
        my ($self) = @_;
        if (!$self->{_ww} && $self->{wbuf} && length $self->{wbuf}) {
            Scalar::Util::weaken $self;
            my $cb;
            my $poll = sub {
                $self->{_ww} = AE::io $self->{fh}, 1, $cb
                    if length $self->{wbuf};
            };
            $cb = sub {
                my ($write) = sort grep {defined} $global_write_size,
                    $self->{write_size};
                my ($period) = sort grep {defined} $global_period,
                    $self->{_period};
                if (defined $write && $write <= 0) {
                    if (length $self->{wbuf}) {
                        delete $self->{_ww};
                        return $self->{_pause_ww} = AE::timer(
                            0.5, 0,
                            sub {
                                delete $self->{_pause_write};
                                $poll->();
                            }
                        );
                    }
                    return 1;
                }
                my $len = syswrite $self->{fh}, $self->{wbuf}, $write;
                if (defined $len) {
                    $self->{write_size} -= $len;
                    $global_write_size  -= $len;
                    $self->{upload_total}  += $len;
                    $global_upload_total   += $len;
                    $self->{_upload_speed} += $len;
                    $global__upload_speed  += $len;
                    substr $self->{wbuf}, 0, $len, "";
                    $self->{_activity} = $self->{_wactivity} = AE::now;
                    $self->{on_drain}($self)
                        if $self->{low_water_mark}
                            || 0 >= length($self->{wbuf} || '')
                            + length($self->{_tls_wbuf}  || '')
                            && $self->{on_drain};
                    delete $self->{_ww} unless length $self->{wbuf};
                }
                elsif (   $! != EAGAIN
                       && $! != EINTR
                       && $! != WSAEWOULDBLOCK)
                {   $self->_error($!, 1);
                }
            };

            # try to write data immediately
            $cb->() unless $self->{autocork};
            $poll->();
        }
    }
}
1;

=pod

=head1 NAME

AnyEvent::Handle::Throttle - AnyEvent::Handle subclass with user-defined up/down bandwidth cap

=head1 Synopsis

    use AnyEvent;
    use AnyEvent::Handle::Throttle;
    my $condvar = AnyEvent->condvar;
    my $handle;
    $handle = AnyEvent::Handle::Throttle->new(
        upload_limit   => 2,  # Very...
        download_limit => 50, # ...slow
        connect  => ['google.com', 'http'],
        on_error => sub {
            warn "error $_[2]\n";
            $_[0]->destroy;



( run in 1.110 second using v1.01-cache-2.11-cpan-39bf76dae61 )