Algorithm-TokenBucket
view release on metacpan or search on metacpan
lib/Algorithm/TokenBucket.pm view on Meta::CPAN
package Algorithm::TokenBucket;
use 5.008;
use warnings;
use strict;
our $VERSION = 0.38;
use Time::HiRes qw/time/;
=head1 NAME
Algorithm::TokenBucket - Token bucket rate limiting algorithm
=head1 SYNOPSIS
use Algorithm::TokenBucket;
# configure a bucket to limit a stream up to 100 items per hour
# with bursts of 5 items max
my $bucket = Algorithm::TokenBucket->new(100 / 3600, 5);
# wait until we are allowed to process 3 items
until ($bucket->conform(3)) {
sleep 0.1;
# do things
}
# process 3 items because we now can
process(3);
# leak (flush) bucket
$bucket->count(3); # same as $bucket->count(1) for 1..3;
if ($bucket->conform(10)) {
die;
# because a bucket with the burst size of 5
# will never conform to 10
}
my $time = Time::HiRes::time;
while (Time::HiRes::time - $time < 7200) { # two hours
# be bursty
if ($bucket->conform(5)) {
process(5);
$bucket->count(5);
}
}
# we're likely to have processed 200 items (and hogged CPU)
Storable::store $bucket, 'bucket.stored';
my $bucket1 =
Algorithm::TokenBucket->new(@{Storable::retrieve('bucket.stored')});
=head1 DESCRIPTION
The Token Bucket algorithm is a flexible way of imposing a rate limit
against a stream of items. It is also very easy to combine several
rate-limiters in an C<AND> or C<OR> fashion.
Each bucket has a constant memory footprint because the algorithm is based
on the C<information rate>. Other rate limiters may keep track of
I<ALL> incoming items in memory. It allows them to be more accurate.
FYI, the C<conform>, C<count>, C<information rate>, and C<burst size> terms
are taken from the L<metering primitives|http://linux-ip.net/gl/tcng/node62.html>
page of the L<Linux Traffic Control - Next Generation|http://linux-ip.net/gl/tcng/>
system documentation.
=head1 INTERFACE
=cut
use fields qw/info_rate burst_size _tokens _last_check_time/;
=head2 METHODS
=over 4
=item new($$;$$)
The constructor requires at least the C<rate of information> in items per
second and the C<burst size> in items as its input parameters. It can also
take the current token counter and last check time but this usage is mostly
intended for restoring a saved bucket. See L</state()>.
=cut
sub new {
my $class = shift;
fields::new($class)->_init(@_);
}
sub _init {
my Algorithm::TokenBucket $self = shift;
@$self{qw/info_rate burst_size _tokens _last_check_time/} = @_;
$self->{_last_check_time} ||= time;
$self->{_tokens} ||= 0;
return $self;
}
=item state()
Returns the state of the bucket as a list. Use it for storing purposes.
Buckets also natively support freezing and thawing with L<Storable> by
providing C<STORABLE_*> callbacks.
=cut
sub state {
my Algorithm::TokenBucket $self = shift;
return @$self{qw/info_rate burst_size _tokens _last_check_time/};
}
use constant PACK_FORMAT => "d4"; # "F4" is not 5.6 compatible
sub STORABLE_freeze {
my ($self, $cloning) = @_;
return pack(PACK_FORMAT(), $self->state);
}
sub STORABLE_thaw {
my ($self, $cloning, $state) = @_;
return $self->_init(unpack(PACK_FORMAT(), $state));
}
sub _token_flow {
my Algorithm::TokenBucket $self = shift;
my $time = time;
$self->{_tokens} +=
($time - $self->{_last_check_time}) * $self->{info_rate};
if ($self->{_tokens} > $self->{burst_size}) {
$self->{_tokens} = $self->{burst_size};
}
$self->{_last_check_time} = $time;
}
=item conform($)
This method returns true if the bucket contains at least I<N> tokens and
false otherwise. In the case that it is true, it is allowed to transmit or
process I<N> items (not exactly right because I<N> can be fractional) from
the stream. A bucket never conforms to an I<N> greater than C<burst size>.
=cut
sub conform {
my Algorithm::TokenBucket $self = shift;
my $size = shift;
$self->_token_flow;
return $self->{_tokens} >= $size;
}
=item count($)
This method removes I<N> (or all if there are fewer than I<N> available)
tokens from the bucket. It does not return a meaningful value.
=cut
sub count {
my Algorithm::TokenBucket $self = shift;
my $size = shift;
$self->_token_flow;
($self->{_tokens} -= $size) < 0 and $self->{_tokens} = 0;
}
=item until($)
This method returns the number of seconds until I<N> tokens can be removed
from the bucket. It is especially useful in multitasking environments like
L<POE> where you cannot busy-wait. One can safely schedule the next
C<< conform($N) >> check in C<< until($N) >> seconds instead of checking
repeatedly.
Note that C<until()> does not take into account C<burst size>. This means
that a bucket will not conform to I<N> even after sleeping for C<< until($N) >>
seconds if I<N> is greater than C<burst size>.
=cut
sub until {
my Algorithm::TokenBucket $self = shift;
my $size = shift;
$self->_token_flow;
if ($self->{_tokens} >= $size) {
# can conform() right now
return 0;
} else {
my $needed = $size - $self->{_tokens};
return ($needed / $self->{info_rate});
}
}
=item get_token_count()
Returns the current number of tokens in the bucket. This method may be
useful for inspection or debugging purposes. You should not examine
the state of the bucket for rate limiting purposes.
This number will frequently be fractional so it is not exactly a
"count".
=cut
sub get_token_count {
my Algorithm::TokenBucket $self = shift;
$self->_token_flow;
return $self->{_tokens};
}
1;
__END__
=back
=head1 EXAMPLES
Imagine a rate limiter for a mail sending application. We would like to
allow 2 mails per minute but no more than 20 mails per hour.
my $rl1 = Algorithm::TokenBucket->new(2/60, 1);
my $rl2 = Algorithm::TokenBucket->new(20/3600, 10);
# "bursts" of 10 to ease the lag but $rl1 enforces
# 2 per minute, so it won't flood
while (my $mail = get_next_mail) {
until ($rl1->conform(1) && $rl2->conform(1)) {
busy_wait;
}
$mail->take_off;
$rl1->count(1); $rl2->count(1);
}
Now, let's fix the CPU-hogging example from L</SYNOPSIS> using
the L</until($)> method.
my $bucket = Algorithm::TokenBucket->new(100 / 3600, 5);
my $time = Time::HiRes::time;
while (Time::HiRes::time - $time < 7200) { # two hours
# be bursty
Time::HiRes::sleep $bucket->until(5);
if ($bucket->conform(5)) { # should always be true
process(5);
$bucket->count(5);
}
}
# we're likely to have processed 200 items (without hogging the CPU)
=head1 BUGS
Documentation lacks the actual algorithm description. See links or read
the source (there are about 20 lines of sparse Perl in several subs).
C<until($N)> does not return infinity if C<$N> is greater than C<burst
size>. Sleeping for infinity seconds is both useless and hard to debug.
=head1 ACKNOWLEDGMENTS
Yuval Kogman contributed the L</until($)> method, proper L<Storable> support
and other things.
Alexey Shrub contributed the L</get_token_count()> method.
Paul Cochrane contributed various documentation and infrastructure fixes.
=head1 COPYRIGHT AND LICENSE
This software is copyright (C) 2016 by Alex Kapranoff.
This is free software; you can redistribute it and/or modify it under
the terms GNU General Public License version 3.
=head1 AUTHOR
Alex Kapranoff, E<lt>alex@kapranoff.ruE<gt>
=head1 SEE ALSO
=over 4
=item https://web.archive.org/web/20050320184218/http://www.eecs.harvard.edu/cs143/assignments/pa1/
=item http://en.wikipedia.org/wiki/Token_bucket
=item http://linux-ip.net/gl/tcng/node54.html
=item http://linux-ip.net/gl/tcng/node62.html
=item L<Schedule::RateLimit>
=item L<Algorithm::FloodControl>
=item L<Object::RateLimiter>
=back
=cut
( run in 0.371 second using v1.01-cache-2.11-cpan-172d661cebc )