Data-EventStream
view release on metacpan or search on metacpan
lib/Data/EventStream/Window.pm view on Meta::CPAN
Data::EventStream::Window - Perl extension for event processing
=head1 VERSION
This document describes Data::EventStream::Window version 0.13
=head1 DESCRIPTION
This class represents time window for which aggregator aggregates data.
Normally window objects are passed to aggregators' callbacks and user has no need to build them himself.
=head1 METHODS
=cut
=head2 $class->new
Create a new Window object. L<Data::EventStream> will do it for you.
=cut
t/lib/TestStream.pm view on Meta::CPAN
has events => ( is => 'ro', isa => 'ArrayRef', required => 1 );
has time_sub => ( is => 'ro' );
has start_time => ( is => 'ro' );
has expected_time_length => ( is => 'ro' );
has expected_length => ( is => 'ro' );
has no_callbacks => ( is => 'ro', default => 0, );
has filter => ( is => 'ro', );
sub _store_observed_value {
my ( $hr, $key, $value ) = @_;
if ( defined $hr->{$key} ) {
if ( ref $hr->{$key} eq 'ARRAY' ) {
push @{ $hr->{$key} }, $value;
}
else {
t/lib/TestStream.pm view on Meta::CPAN
my %outs;
my %resets;
my $add_aggregator = sub {
my ( $name, $agg_params ) = @_;
$aggregator{$name} = $test->aggregator_class->new( $test->new_params );
$es->add_aggregator(
$aggregator{$name},
%$agg_params,
(
$test->no_callbacks ? () : (
on_enter => sub {
_store_observed_value( \%ins, $name, $_[0]->value );
},
on_leave => sub {
_store_observed_value( \%outs, $name, $_[0]->value );
},
on_reset => sub {
_store_observed_value( \%resets, $name, $_[0]->value );
},
)
t/lib/TestStream.pm view on Meta::CPAN
my $i = 1;
for my $ev ( @{ $test->events } ) {
my $title =
"event $i: "
. ( defined $ev->{time} ? "time=$ev->{time} " : "" )
. ( defined $ev->{val} ? " val=$ev->{val}" : "" );
subtest $title => sub {
$DB::single = 1 if $ev->{debug};
$es->set_time( $ev->{time} ) if $ev->{time} and not defined $ev->{val};
$es->add_event( { time => $ev->{time}, val => $ev->{val} } ) if defined $ev->{val};
unless ( $test->no_callbacks ) {
eq_or_diff \%ins, $ev->{ins} // {}, "got expected ins";
%ins = ();
eq_or_diff \%outs, $ev->{outs} // {}, "got expected outs";
%outs = ();
eq_or_diff \%resets, $ev->{resets} // {}, "got expected resets";
%resets = ();
}
if ( $ev->{vals} ) {
my %vals;
t/statistics_continuous.t view on Meta::CPAN
},
},
},
);
TestStream->new(
aggregator_class => 'Data::EventStream::Statistics::Continuous',
new_params => { value_sub => sub { $_[0]->{val} }, time_sub => sub { $_[0]->{time} }, },
aggregator_params => \%params,
events => \@events,
no_callbacks => 1,
time_sub => sub { $_[0]->{time} },
)->run;
done_testing;
t/statistics_discrete.t view on Meta::CPAN
},
},
},
);
TestStream->new(
aggregator_class => 'Data::EventStream::Statistics::Discrete',
new_params => { value_sub => sub { $_[0]->{val} }, },
aggregator_params => \%params,
events => \@events,
no_callbacks => 1,
)->run;
done_testing;
( run in 1.228 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )