AnyEvent-Tools

 view release on metacpan or  search on metacpan

lib/AnyEvent/Tools/Buffer.pm  view on Meta::CPAN

{
    my $class = shift;
    croak "usage: buffer(on_flush => sub { ... }, ...)" if @_ % 2;


    my (%opts) = @_;

    my $self = bless {
        queue       => [],
        exists      => {},
        timer       => undef,
        lock        => 0,
        do_flush    => 0,
        unique_cb   => undef,
    } => ref($class) || $class;

    $self->on_flush($opts{on_flush});
    $self->size($opts{size} || 0);
    $self->interval($opts{interval} || 0);
    $self->unique_cb($opts{unique_cb});

    return $self;
}

sub interval
{
    my ($self, $ival) = @_;
    return $self->{interval} if @_ == 1;
    undef $self->{timer} unless $ival;
    return $self->{interval} = $ival;
}

sub on_flush
{
    my ($self, $cb) = @_;
    croak "callback must be CODEREF" if $cb and 'CODE' ne ref $cb;
    return $self->{on_flush} = $cb;
}

lib/AnyEvent/Tools/Buffer.pm  view on Meta::CPAN


sub flush
{
    my ($self) = @_;
    return unless @{ $self->{queue} };
    return unless $self->{on_flush};
    if ($self->{lock}) {
        $self->{do_flush} = 1;
        return;
    }
    undef $self->{timer};
    my $queue = $self->{queue};
    $self->{queue} = [];
    $self->{exists} = {};
    my $guard = guard sub {

        return unless $self;  # it can be destroyed

        $self->{lock} = 0;

        if ($self->{do_flush}) {

lib/AnyEvent/Tools/Buffer.pm  view on Meta::CPAN



sub _check_buffer
{
    my ($self) = @_;

    return if $self->{lock};
    return unless $self->{on_flush};

    unless (@{ $self->{queue} }) {
        undef $self->{timer};
        return;
    }

    if ($self->size) {
        if (@{ $self->{queue} } >= $self->size) {
            $self->flush;
            return;
        }
    }

    return if $self->{timer};
    return unless $self->interval;
    $self->{timer} = AE::timer $self->interval, 0 => sub { $self->flush };
    return;
}

1;

t/01_mutex.t  view on Meta::CPAN

}


{
    my $mutex = mutex;

    my ($counter, $total) = (0, 0);

    my $cv = condvar AnyEvent;

    my ($timer1, $timer2, $timer3);
    $timer1 = AE::timer 0, 0.2 => sub {
        $total++;
        if ($mutex->is_locked) {
            $counter++;
        }
    };

    $timer2 = AE::timer 1, 0 => sub {
        $mutex->lock(sub {
            my ($g) = @_;
            undef $timer2;
            my $timer;
            $timer = AE::timer 2, 0 => sub {
                undef $g;
                undef $timer;
            };
        });
        return;
    };

    $timer3 = AE::timer 5, 0 => sub {
        $cv->send;
    };

    $cv->recv;

    ok $counter < 13 && $counter > 8,
        "Mutex was locked correct time ($counter/$total)";
}

{
    my $cv = condvar AnyEvent;
    my $mutex = mutex;
    my $idle;
    my %res;

    $mutex->lock(sub {
        my $start_time = time;
        my $mutex_guard = shift;
        my $timer;
        $timer = AE::timer 0.1, 0 => sub {
            $res{1} = { start => $start_time, stop => time};
            undef $timer;
            undef $mutex_guard;
        };
    });
    $mutex->lock(sub {
        my $start_time = time;
        my $mutex_guard = shift;
        my $timer;
        $timer = AE::timer 0.1, 0 => sub {
            $res{2} = { start => $start_time, stop => time};
            undef $timer;
            undef $mutex_guard;
        };
    });
    $mutex->lock(sub {
        my $start_time = time;
        my $mutex_guard = shift;
        my $timer;
        $timer = AE::timer 0.1, 0 => sub {
            $res{3} = { start => $start_time, stop => time};
            undef $timer;
            undef $mutex_guard;
        };
    });

    $idle = AE::timer 0, 0.05 => sub {
        return unless 3 == keys %res;
        undef $idle;
        $cv->send;
    };

    $cv->recv;

    ok abs($res{1}{start} - $res{2}{start}) > 0.09,
        "First and second processes followed sequentially";
    ok $res{1}{stop} < $res{2}{start},

t/01_mutex.t  view on Meta::CPAN


{
    my $cv = condvar AnyEvent;
    my $error;

    my $mutex = mutex;
    my $counter = 0;

    $mutex->lock(sub {
        my ($guard) = @_;
        my $timer;
        $timer = AE::timer .1, 0 => sub {
            undef $guard;
            undef $timer;
            $counter++;
        };
    });

    my $mguard = $mutex->lock(sub {
        $error = 1;
    });

    $mutex->lock(sub {
        $counter++;
    });

    my $timer;
    $timer = AE::timer .05, 0 => sub {
        undef $timer;
        undef $mguard;
    };

    my $timer2 = AE::timer 0.5, 0 => sub {
        $cv->send;
    };

    $cv->recv;

    ok !$error, "Cancel lock request";
    ok $counter == 2, "All lock requests were handled";
}

t/02_rw_mutex.t  view on Meta::CPAN

    use_ok 'AnyEvent';
    use_ok 'AnyEvent::Tools', ':mutex';
}

{
    my $mutex = rw_mutex;
    my $cv = condvar AnyEvent;

    my $counter = 0;
    my $done_counter = 0;
    my $timer;

    $timer = AE::timer 0.13, 0 => sub { $cv->send };

    $mutex->rlock(sub {
        my ($g) = @_;
        my $t;
        my $mcounter = 0;

        $t = AE::timer 0.01, 0.01 => sub {
            $mcounter++;
            if ($mcounter++ >= 10) {
                undef $t;
                undef $g;
                $done_counter++;
                $cv->send if $done_counter == 2;
                return;
            }
            $counter++;
        };
    });

    $mutex->rlock(sub {
        my ($g) = @_;
        my $t;
        my $mcounter = 0;

        $t = AE::timer 0.01, 0.01 => sub {
            $mcounter++;
            if ($mcounter++ >= 10) {
                undef $t;
                undef $g;
                $done_counter++;
                $cv->send if $done_counter == 2;
                return;
            }
            $counter++;
        };

t/02_rw_mutex.t  view on Meta::CPAN

{
    my $mutex = rw_mutex;
    my $cv = condvar AnyEvent;
    my %res;

    my $time = time;
    $mutex->rlock(sub {
        my ($g) = @_;
        $res{'first-start'} = time - $time;
        my $t;
        $t = AE::timer 0.3, 0 => sub {
            $res{'first-stop'} = time - $time;
            undef $g;
            undef $t;
        };
    });
    $mutex->rlock(sub {
        $res{'second'} = time - $time;
    });

    $mutex->wlock(sub {
        my ($g) = @_;
        $res{'third-start'} = time - $time;
        my $t;
        $t = AE::timer 0.2, 0 => sub {
            $res{'third-stop'} = time - $time;
            undef $g;
            undef $t;
        };
    });
    $mutex->rlock(sub {
        my ($g) = @_;
        $res{'fourth-start'} = time - $time;
        my $t;
        $t = AE::timer 0.2, 0 => sub {
            $res{'fourth-stop'} = time - $time;
            undef $g;
            undef $t;
            $cv->send;
        };
    });
    $mutex->rlock(sub {
        $res{'fifth'} = time - $time;
    });

t/02_rw_mutex.t  view on Meta::CPAN

{
    my $cv = condvar AnyEvent;
    my $mutex = rw_mutex;
    $mutex->rlock_limit(2);
    my @res;

    for my $step (1 .. 20) {
        $mutex->rlock(sub {
            my ($g) = @_;
            my $t;
            $t = AE::timer .1, 0, sub {
                push @res, time;
                undef $t;
                undef $g;

                $cv->send if $step == 20;
            };
        });
    }

    $cv->recv;

t/03_repeat.t  view on Meta::CPAN

    $cv->recv;

    ok $count == 0, "Repeat 0 times";
}


{
    my $cv = condvar AnyEvent;
    my $count = 0;
    async_repeat 0, sub { $count++ };
    my $timer_end;
    $timer_end = AE::timer 0.5, 0 => sub {
        undef $timer_end;
        $cv->send;
    };
    $cv->recv;
    ok $count == 0, "Repeat 0 times without endfucntion";
}

{
    my $cv = condvar AnyEvent;
    my $count = 0;
    async_repeat 10, sub { $count++ }, sub { $cv->send };

t/03_repeat.t  view on Meta::CPAN

    $cv->recv;

    ok $count == 10, "Repeat 10 times without endfucntion";
}

{
    my $cv = condvar AnyEvent;
    my $count = 0;
    async_repeat 10, sub {
        my ($g, $no, $first, $last) = @_;
        my $timer;
        $timer = AE::timer 0.05, 0 => sub {
            undef $g;
            undef $timer;
            $count++;
            $cv->send if $last;
        };

    };
    $cv->recv;

    ok $count == 10,
        "Repeat 10 times with catching guards and without endfucntion";
}

{
    my %res;
    my $cv = condvar AnyEvent;
    my $count = 0;
    my $repeat_guard;
    $repeat_guard = async_repeat 10, sub {
        my ($g, $no, $first, $last) = @_;
        my $timer;
        my $time = time;
        $timer = AE::timer 0.05, 0 => sub {
            $res{$count} = {
                time       => time - $time,
                start_time => $time,
                no         => $count
            };

            $count++;
            undef $g;
            undef $timer;
            $cv->send if $last;
        };
    };

    $cv->recv;


    ok 9 == grep({ $res{$_}{start_time} - $res{$_ - 1}{start_time} > 0.045  }
        1 .. 9), "Hold guard test";

    ok 10 == grep({ $res{$_}{time} >= .045 } 0 .. 9),
        "All timers have done";

}

{
    my $cv = condvar AnyEvent;
    my $end_called = 0;
    my $count = 0;
    my $repeat_guard;
    $repeat_guard = async_repeat 15, sub {
        undef $repeat_guard if $count == 9;
        $count++;
    }, sub {
        $end_called = 1;
    };

    my $timer = AE::timer 0.5, 0 => sub { $cv->send };
    $cv->recv;

    ok $count == 10, "Main guard is undefined before local guard";
    ok $end_called == 0,
        "Finish callback won't be called if repeating is canceled";
}

{
    my $cv = condvar AnyEvent;
    my $count = 0;
    my $repeat_guard;
    $repeat_guard = async_repeat 15, sub {
        my ($g) = @_;
        undef $g;
        undef $repeat_guard if $count == 9;
        $count++;
    };

    my $timer = AE::timer 0.5, 0 => sub { $cv->send };
    $cv->recv;

    ok $count == 10, "Main guard is undefined after local guard";
}

{
    my $cv = condvar AnyEvent;
    my $repeat_guard;
    my $end_called = 0;
    $repeat_guard = async_repeat 15,
        sub {
            my ($g, $idx, $first, $last) = @_;
            undef $repeat_guard if $last;
            undef $g;
        },
        sub {
            $end_called = 1
        };

    my $timer = AE::timer 0.5, 0 => sub { $cv->send };
    $cv->recv;

    ok $end_called == 0, "Cancel repeating inside the last iteration";
}


{
    my $cv = condvar AnyEvent;
    my $count = 0;
    my $repeat_guard;
    $repeat_guard = async_repeat 15, sub {
        my ($g, $no, $first, $last) = @_;
        my $timer;
        my $time = time;
        $timer = AE::timer 0.05, 0 => sub {
            undef $repeat_guard if $count == 9;
            $count++;
            undef $g;
            undef $timer;

        };
    };

    my $timer = AE::timer 0.05 * 16, 0 => sub { $cv->send };
    $cv->recv;

    diag $count unless ok $count == 10,
                            "Cancel repeating with catching guards";
}

{
    my $cv = condvar AnyEvent;
    my $count = 0;
    my $repeat_guard;
    $repeat_guard = async_repeat 15, sub {
        my ($g, $no, $first, $last) = @_;
        my $timer;
        my $time = time;
        $timer = AE::timer 0.05, 0 => sub {
            undef $g;
            undef $timer;
            undef $repeat_guard if $count == 9;
            $count++;

        };
    };

    my $timer = AE::timer 0.05 * 16, 0 => sub { $cv->send };
    $cv->recv;


    diag $count unless  ok $count == 10,
                "Cancel repeating with catching guards, after freeing guard";
}


{
    my $cv = condvar AnyEvent;
    my %res;
    async_repeat 4,
        sub {
            my ($g, $no, $first, $last) = @_;
            my $timer;
            my $time = time;
            $res{$no}{start} = $time;

            $timer = AE::timer 0.05, 0 => sub {
                $res{$no}{finish} = time;
                $res{$no}{time} = $res{$no}{finish} - $time;
                undef $timer;
                undef $g;

            };
        },
        sub {
            $res{finish} = time;
            $cv->send;
        };

    $cv->recv;

t/04_foreach_array.t  view on Meta::CPAN

    $cv->recv;

    ok $count == 0, "async_for with empty array";
}

{
    my $cv = condvar AnyEvent;
    my $count = 0;

    async_for [], sub { $count++ };
    my $t = AE::timer 0.5, 0 => sub { $cv->send };
    $cv->recv;

    ok $count == 0, "async_for with empty array, without endfucntion";
}

{
    my $cv = condvar AnyEvent;
    my %res;
    my $number = 0;

t/04_foreach_array.t  view on Meta::CPAN

        sub {
            my ($g, $value, $index, $first, $last) = @_;
            $res{$index} = {
                value   => $value,
                first   => $first,
                last    => $last,
                called  => $number++,
                time    => time,
            };

            my $timer;

            $timer = AE::timer .05, 0 => sub {
                undef $timer;
                undef $g;
            };
        },
        sub {
            $cv->send;
        };


    $cv->recv;

t/06_pool.t  view on Meta::CPAN


    my $busy = 0;
    my $cnt = 1;
    my $idle;

    $idle = AE::idle  sub {
        $pool->get(sub {
            my ($guard, $object) = @_;
            $busy++;
            push @res, { b => $busy, t => time };
            my $timer;
            $timer = AE::timer 0.1, 0 => sub {
                $busy--;
                undef $timer;
                undef $guard;

                if (@res >= 40) {
                    undef $idle;
                    $cv->send;
                }
            };
        });

        undef $idle if $cnt++ >= 40;

t/06_pool.t  view on Meta::CPAN


{
    my $cv = condvar AnyEvent;
    my $pool = pool qw( a b );
    my $order = 0;
    my @res;
    my $dtime = 0;

    my $ano = $pool->push('c');
    my $t;
    $t = AE::timer 0.7, 0 => sub {
        $pool->delete($ano => sub { $dtime = time });
        undef $t;
    };

    for (0 .. 10) {
        $pool->get(sub {
            my ($guard, $object) = @_;
            my $timer;
            $timer = AE::timer 0.5, 0 => sub {
                push @res, { obj => $object, time => time, order => $order++ };
                undef $timer;
                undef $guard;
                $cv->send if @res == 11;
            };
        });
    }


    $cv->recv;

    ok 2 == grep({ $_->{obj} eq 'c' } @res), "delete method works fine";

t/07_buffer.t  view on Meta::CPAN

    my $cv = condvar AnyEvent;
    my $number = 1;
    my $count = 0;
    my $b;
    my $idle;
    $b = buffer
        size => 5,
        on_flush => sub {
            my ($g, $a) = @_;
            if ($count++ == 3) {
                my $timer;
                $timer = AE::timer 0.0005, 0 => sub {
                    $b->unshift_back($a);
                    undef $g;
                    undef $timer;
                };
                return;
            }
            push @res, $a;
            if (@res == 5) {
                undef $idle;
                $cv->send;
            }
        };



( run in 0.502 second using v1.01-cache-2.11-cpan-49f99fa48dc )