Data-EventStream

 view release on metacpan or  search on metacpan

lib/Data/EventStream/Window.pm  view on Meta::CPAN


Data::EventStream::Window - Perl extension for event processing

=head1 VERSION

This document describes Data::EventStream::Window version 0.13

=head1 DESCRIPTION

This class represents time window for which aggregator aggregates data.
Normally window objects are passed to aggregators' callbacks and user has no need to build them himself.

=head1 METHODS

=cut

=head2 $class->new

Create a new Window object. L<Data::EventStream> will do it for you.

=cut

t/lib/TestStream.pm  view on Meta::CPAN

has events => ( is => 'ro', isa => 'ArrayRef', required => 1 );

has time_sub => ( is => 'ro' );

has start_time => ( is => 'ro' );

has expected_time_length => ( is => 'ro' );

has expected_length => ( is => 'ro' );

has no_callbacks => ( is => 'ro', default => 0, );

has filter => ( is => 'ro', );

sub _store_observed_value {
    my ( $hr, $key, $value ) = @_;
    if ( defined $hr->{$key} ) {
        if ( ref $hr->{$key} eq 'ARRAY' ) {
            push @{ $hr->{$key} }, $value;
        }
        else {

t/lib/TestStream.pm  view on Meta::CPAN

    my %outs;
    my %resets;

    my $add_aggregator = sub {
        my ( $name, $agg_params ) = @_;
        $aggregator{$name} = $test->aggregator_class->new( $test->new_params );
        $es->add_aggregator(
            $aggregator{$name},
            %$agg_params,
            (
                $test->no_callbacks ? () : (
                    on_enter => sub {
                        _store_observed_value( \%ins, $name, $_[0]->value );
                    },
                    on_leave => sub {
                        _store_observed_value( \%outs, $name, $_[0]->value );
                    },
                    on_reset => sub {
                        _store_observed_value( \%resets, $name, $_[0]->value );
                    },
                )

t/lib/TestStream.pm  view on Meta::CPAN

    my $i = 1;
    for my $ev ( @{ $test->events } ) {
        my $title =
            "event $i: "
          . ( defined $ev->{time} ? "time=$ev->{time} " : "" )
          . ( defined $ev->{val}  ? " val=$ev->{val}"   : "" );
        subtest $title => sub {
            $DB::single = 1 if $ev->{debug};
            $es->set_time( $ev->{time} ) if $ev->{time} and not defined $ev->{val};
            $es->add_event( { time => $ev->{time}, val => $ev->{val} } ) if defined $ev->{val};
            unless ( $test->no_callbacks ) {
                eq_or_diff \%ins, $ev->{ins} // {}, "got expected ins";
                %ins = ();
                eq_or_diff \%outs, $ev->{outs} // {}, "got expected outs";
                %outs = ();
                eq_or_diff \%resets, $ev->{resets} // {}, "got expected resets";
                %resets = ();
            }

            if ( $ev->{vals} ) {
                my %vals;

t/statistics_continuous.t  view on Meta::CPAN

            },
        },
    },
);

TestStream->new(
    aggregator_class  => 'Data::EventStream::Statistics::Continuous',
    new_params        => { value_sub => sub { $_[0]->{val} }, time_sub => sub { $_[0]->{time} }, },
    aggregator_params => \%params,
    events            => \@events,
    no_callbacks      => 1,
    time_sub => sub { $_[0]->{time} },
)->run;

done_testing;

t/statistics_discrete.t  view on Meta::CPAN

            },
        },
    },
);

TestStream->new(
    aggregator_class  => 'Data::EventStream::Statistics::Discrete',
    new_params        => { value_sub => sub { $_[0]->{val} }, },
    aggregator_params => \%params,
    events            => \@events,
    no_callbacks      => 1,
)->run;

done_testing;



( run in 1.493 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )