view release on metacpan or search on metacpan
lib/AnyEvent/Tools/Buffer.pm view on Meta::CPAN
{
my $class = shift;
croak "usage: buffer(on_flush => sub { ... }, ...)" if @_ % 2;
my (%opts) = @_;
my $self = bless {
queue => [],
exists => {},
timer => undef,
lock => 0,
do_flush => 0,
unique_cb => undef,
} => ref($class) || $class;
$self->on_flush($opts{on_flush});
$self->size($opts{size} || 0);
$self->interval($opts{interval} || 0);
$self->unique_cb($opts{unique_cb});
return $self;
}
sub interval
{
my ($self, $ival) = @_;
return $self->{interval} if @_ == 1;
undef $self->{timer} unless $ival;
return $self->{interval} = $ival;
}
sub on_flush
{
my ($self, $cb) = @_;
croak "callback must be CODEREF" if $cb and 'CODE' ne ref $cb;
return $self->{on_flush} = $cb;
}
lib/AnyEvent/Tools/Buffer.pm view on Meta::CPAN
sub flush
{
my ($self) = @_;
return unless @{ $self->{queue} };
return unless $self->{on_flush};
if ($self->{lock}) {
$self->{do_flush} = 1;
return;
}
undef $self->{timer};
my $queue = $self->{queue};
$self->{queue} = [];
$self->{exists} = {};
my $guard = guard sub {
return unless $self; # it can be destroyed
$self->{lock} = 0;
if ($self->{do_flush}) {
lib/AnyEvent/Tools/Buffer.pm view on Meta::CPAN
sub _check_buffer
{
my ($self) = @_;
return if $self->{lock};
return unless $self->{on_flush};
unless (@{ $self->{queue} }) {
undef $self->{timer};
return;
}
if ($self->size) {
if (@{ $self->{queue} } >= $self->size) {
$self->flush;
return;
}
}
return if $self->{timer};
return unless $self->interval;
$self->{timer} = AE::timer $self->interval, 0 => sub { $self->flush };
return;
}
1;
t/01_mutex.t view on Meta::CPAN
}
{
my $mutex = mutex;
my ($counter, $total) = (0, 0);
my $cv = condvar AnyEvent;
my ($timer1, $timer2, $timer3);
$timer1 = AE::timer 0, 0.2 => sub {
$total++;
if ($mutex->is_locked) {
$counter++;
}
};
$timer2 = AE::timer 1, 0 => sub {
$mutex->lock(sub {
my ($g) = @_;
undef $timer2;
my $timer;
$timer = AE::timer 2, 0 => sub {
undef $g;
undef $timer;
};
});
return;
};
$timer3 = AE::timer 5, 0 => sub {
$cv->send;
};
$cv->recv;
ok $counter < 13 && $counter > 8,
"Mutex was locked correct time ($counter/$total)";
}
{
my $cv = condvar AnyEvent;
my $mutex = mutex;
my $idle;
my %res;
$mutex->lock(sub {
my $start_time = time;
my $mutex_guard = shift;
my $timer;
$timer = AE::timer 0.1, 0 => sub {
$res{1} = { start => $start_time, stop => time};
undef $timer;
undef $mutex_guard;
};
});
$mutex->lock(sub {
my $start_time = time;
my $mutex_guard = shift;
my $timer;
$timer = AE::timer 0.1, 0 => sub {
$res{2} = { start => $start_time, stop => time};
undef $timer;
undef $mutex_guard;
};
});
$mutex->lock(sub {
my $start_time = time;
my $mutex_guard = shift;
my $timer;
$timer = AE::timer 0.1, 0 => sub {
$res{3} = { start => $start_time, stop => time};
undef $timer;
undef $mutex_guard;
};
});
$idle = AE::timer 0, 0.05 => sub {
return unless 3 == keys %res;
undef $idle;
$cv->send;
};
$cv->recv;
ok abs($res{1}{start} - $res{2}{start}) > 0.09,
"First and second processes followed sequentially";
ok $res{1}{stop} < $res{2}{start},
t/01_mutex.t view on Meta::CPAN
{
my $cv = condvar AnyEvent;
my $error;
my $mutex = mutex;
my $counter = 0;
$mutex->lock(sub {
my ($guard) = @_;
my $timer;
$timer = AE::timer .1, 0 => sub {
undef $guard;
undef $timer;
$counter++;
};
});
my $mguard = $mutex->lock(sub {
$error = 1;
});
$mutex->lock(sub {
$counter++;
});
my $timer;
$timer = AE::timer .05, 0 => sub {
undef $timer;
undef $mguard;
};
my $timer2 = AE::timer 0.5, 0 => sub {
$cv->send;
};
$cv->recv;
ok !$error, "Cancel lock request";
ok $counter == 2, "All lock requests were handled";
}
t/02_rw_mutex.t view on Meta::CPAN
use_ok 'AnyEvent';
use_ok 'AnyEvent::Tools', ':mutex';
}
{
my $mutex = rw_mutex;
my $cv = condvar AnyEvent;
my $counter = 0;
my $done_counter = 0;
my $timer;
$timer = AE::timer 0.13, 0 => sub { $cv->send };
$mutex->rlock(sub {
my ($g) = @_;
my $t;
my $mcounter = 0;
$t = AE::timer 0.01, 0.01 => sub {
$mcounter++;
if ($mcounter++ >= 10) {
undef $t;
undef $g;
$done_counter++;
$cv->send if $done_counter == 2;
return;
}
$counter++;
};
});
$mutex->rlock(sub {
my ($g) = @_;
my $t;
my $mcounter = 0;
$t = AE::timer 0.01, 0.01 => sub {
$mcounter++;
if ($mcounter++ >= 10) {
undef $t;
undef $g;
$done_counter++;
$cv->send if $done_counter == 2;
return;
}
$counter++;
};
t/02_rw_mutex.t view on Meta::CPAN
{
my $mutex = rw_mutex;
my $cv = condvar AnyEvent;
my %res;
my $time = time;
$mutex->rlock(sub {
my ($g) = @_;
$res{'first-start'} = time - $time;
my $t;
$t = AE::timer 0.3, 0 => sub {
$res{'first-stop'} = time - $time;
undef $g;
undef $t;
};
});
$mutex->rlock(sub {
$res{'second'} = time - $time;
});
$mutex->wlock(sub {
my ($g) = @_;
$res{'third-start'} = time - $time;
my $t;
$t = AE::timer 0.2, 0 => sub {
$res{'third-stop'} = time - $time;
undef $g;
undef $t;
};
});
$mutex->rlock(sub {
my ($g) = @_;
$res{'fourth-start'} = time - $time;
my $t;
$t = AE::timer 0.2, 0 => sub {
$res{'fourth-stop'} = time - $time;
undef $g;
undef $t;
$cv->send;
};
});
$mutex->rlock(sub {
$res{'fifth'} = time - $time;
});
t/02_rw_mutex.t view on Meta::CPAN
{
my $cv = condvar AnyEvent;
my $mutex = rw_mutex;
$mutex->rlock_limit(2);
my @res;
for my $step (1 .. 20) {
$mutex->rlock(sub {
my ($g) = @_;
my $t;
$t = AE::timer .1, 0, sub {
push @res, time;
undef $t;
undef $g;
$cv->send if $step == 20;
};
});
}
$cv->recv;
t/03_repeat.t view on Meta::CPAN
$cv->recv;
ok $count == 0, "Repeat 0 times";
}
{
my $cv = condvar AnyEvent;
my $count = 0;
async_repeat 0, sub { $count++ };
my $timer_end;
$timer_end = AE::timer 0.5, 0 => sub {
undef $timer_end;
$cv->send;
};
$cv->recv;
ok $count == 0, "Repeat 0 times without endfucntion";
}
{
my $cv = condvar AnyEvent;
my $count = 0;
async_repeat 10, sub { $count++ }, sub { $cv->send };
t/03_repeat.t view on Meta::CPAN
$cv->recv;
ok $count == 10, "Repeat 10 times without endfucntion";
}
{
my $cv = condvar AnyEvent;
my $count = 0;
async_repeat 10, sub {
my ($g, $no, $first, $last) = @_;
my $timer;
$timer = AE::timer 0.05, 0 => sub {
undef $g;
undef $timer;
$count++;
$cv->send if $last;
};
};
$cv->recv;
ok $count == 10,
"Repeat 10 times with catching guards and without endfucntion";
}
{
my %res;
my $cv = condvar AnyEvent;
my $count = 0;
my $repeat_guard;
$repeat_guard = async_repeat 10, sub {
my ($g, $no, $first, $last) = @_;
my $timer;
my $time = time;
$timer = AE::timer 0.05, 0 => sub {
$res{$count} = {
time => time - $time,
start_time => $time,
no => $count
};
$count++;
undef $g;
undef $timer;
$cv->send if $last;
};
};
$cv->recv;
ok 9 == grep({ $res{$_}{start_time} - $res{$_ - 1}{start_time} > 0.045 }
1 .. 9), "Hold guard test";
ok 10 == grep({ $res{$_}{time} >= .045 } 0 .. 9),
"All timers have done";
}
{
my $cv = condvar AnyEvent;
my $end_called = 0;
my $count = 0;
my $repeat_guard;
$repeat_guard = async_repeat 15, sub {
undef $repeat_guard if $count == 9;
$count++;
}, sub {
$end_called = 1;
};
my $timer = AE::timer 0.5, 0 => sub { $cv->send };
$cv->recv;
ok $count == 10, "Main guard is undefined before local guard";
ok $end_called == 0,
"Finish callback won't be called if repeating is canceled";
}
{
my $cv = condvar AnyEvent;
my $count = 0;
my $repeat_guard;
$repeat_guard = async_repeat 15, sub {
my ($g) = @_;
undef $g;
undef $repeat_guard if $count == 9;
$count++;
};
my $timer = AE::timer 0.5, 0 => sub { $cv->send };
$cv->recv;
ok $count == 10, "Main guard is undefined after local guard";
}
{
my $cv = condvar AnyEvent;
my $repeat_guard;
my $end_called = 0;
$repeat_guard = async_repeat 15,
sub {
my ($g, $idx, $first, $last) = @_;
undef $repeat_guard if $last;
undef $g;
},
sub {
$end_called = 1
};
my $timer = AE::timer 0.5, 0 => sub { $cv->send };
$cv->recv;
ok $end_called == 0, "Cancel repeating inside the last iteration";
}
{
my $cv = condvar AnyEvent;
my $count = 0;
my $repeat_guard;
$repeat_guard = async_repeat 15, sub {
my ($g, $no, $first, $last) = @_;
my $timer;
my $time = time;
$timer = AE::timer 0.05, 0 => sub {
undef $repeat_guard if $count == 9;
$count++;
undef $g;
undef $timer;
};
};
my $timer = AE::timer 0.05 * 16, 0 => sub { $cv->send };
$cv->recv;
diag $count unless ok $count == 10,
"Cancel repeating with catching guards";
}
{
my $cv = condvar AnyEvent;
my $count = 0;
my $repeat_guard;
$repeat_guard = async_repeat 15, sub {
my ($g, $no, $first, $last) = @_;
my $timer;
my $time = time;
$timer = AE::timer 0.05, 0 => sub {
undef $g;
undef $timer;
undef $repeat_guard if $count == 9;
$count++;
};
};
my $timer = AE::timer 0.05 * 16, 0 => sub { $cv->send };
$cv->recv;
diag $count unless ok $count == 10,
"Cancel repeating with catching guards, after freeing guard";
}
{
my $cv = condvar AnyEvent;
my %res;
async_repeat 4,
sub {
my ($g, $no, $first, $last) = @_;
my $timer;
my $time = time;
$res{$no}{start} = $time;
$timer = AE::timer 0.05, 0 => sub {
$res{$no}{finish} = time;
$res{$no}{time} = $res{$no}{finish} - $time;
undef $timer;
undef $g;
};
},
sub {
$res{finish} = time;
$cv->send;
};
$cv->recv;
t/04_foreach_array.t view on Meta::CPAN
$cv->recv;
ok $count == 0, "async_for with empty array";
}
{
my $cv = condvar AnyEvent;
my $count = 0;
async_for [], sub { $count++ };
my $t = AE::timer 0.5, 0 => sub { $cv->send };
$cv->recv;
ok $count == 0, "async_for with empty array, without endfucntion";
}
{
my $cv = condvar AnyEvent;
my %res;
my $number = 0;
t/04_foreach_array.t view on Meta::CPAN
sub {
my ($g, $value, $index, $first, $last) = @_;
$res{$index} = {
value => $value,
first => $first,
last => $last,
called => $number++,
time => time,
};
my $timer;
$timer = AE::timer .05, 0 => sub {
undef $timer;
undef $g;
};
},
sub {
$cv->send;
};
$cv->recv;
t/06_pool.t view on Meta::CPAN
my $busy = 0;
my $cnt = 1;
my $idle;
$idle = AE::idle sub {
$pool->get(sub {
my ($guard, $object) = @_;
$busy++;
push @res, { b => $busy, t => time };
my $timer;
$timer = AE::timer 0.1, 0 => sub {
$busy--;
undef $timer;
undef $guard;
if (@res >= 40) {
undef $idle;
$cv->send;
}
};
});
undef $idle if $cnt++ >= 40;
t/06_pool.t view on Meta::CPAN
{
my $cv = condvar AnyEvent;
my $pool = pool qw( a b );
my $order = 0;
my @res;
my $dtime = 0;
my $ano = $pool->push('c');
my $t;
$t = AE::timer 0.7, 0 => sub {
$pool->delete($ano => sub { $dtime = time });
undef $t;
};
for (0 .. 10) {
$pool->get(sub {
my ($guard, $object) = @_;
my $timer;
$timer = AE::timer 0.5, 0 => sub {
push @res, { obj => $object, time => time, order => $order++ };
undef $timer;
undef $guard;
$cv->send if @res == 11;
};
});
}
$cv->recv;
ok 2 == grep({ $_->{obj} eq 'c' } @res), "delete method works fine";
t/07_buffer.t view on Meta::CPAN
my $cv = condvar AnyEvent;
my $number = 1;
my $count = 0;
my $b;
my $idle;
$b = buffer
size => 5,
on_flush => sub {
my ($g, $a) = @_;
if ($count++ == 3) {
my $timer;
$timer = AE::timer 0.0005, 0 => sub {
$b->unshift_back($a);
undef $g;
undef $timer;
};
return;
}
push @res, $a;
if (@res == 5) {
undef $idle;
$cv->send;
}
};