App-RecordStream
view release on metacpan or search on metacpan
lib/App/RecordStream/Operation/collate/BaseClumperCallback.pm view on Meta::CPAN
use App::RecordStream::Aggregator;
use App::RecordStream::Record;
sub new {
my $class = shift;
my $aggregators = shift;
my $incremental = shift;
my $bucket = shift;
my $record_cb = shift;
my $this = {
'AGGREGATORS' => $aggregators,
'INCREMENTAL' => $incremental,
'BUCKET' => $bucket,
'RECORD_CB' => $record_cb,
};
bless $this, $class;
return $this;
}
sub clumper_callback_begin {
my $this = shift;
my $bucket = shift;
return [$bucket, $this->{'BUCKET'} ? undef : [], App::RecordStream::Aggregator::map_initial($this->{'AGGREGATORS'})];
}
sub clumper_callback_push_record {
my $this = shift;
my $cookie = shift;
my $record = shift;
push @{$cookie->[1]}, $record if(!$this->{'BUCKET'});
$cookie->[2] = App::RecordStream::Aggregator::map_combine($this->{'AGGREGATORS'}, $cookie->[2], $record);
if($this->{'INCREMENTAL'}) {
$this->clumper_callback_end($cookie);
}
}
sub clumper_callback_end {
my $this = shift;
my $cookie = shift;
for my $proto_result ($this->{'BUCKET'} ? ($cookie->[0]) : @{$cookie->[1]}) {
my $result = {
# first, the bucket or original record
%$proto_result,
# then, the aggregators
%{App::RecordStream::Aggregator::map_squish($this->{'AGGREGATORS'}, $cookie->[2])},
};
my $record = App::RecordStream::Record->new();
for my $key (keys(%$result))
{
my $value = $result->{$key};
${$record->guess_key_from_spec($key)} = $value;
}
$this->{'RECORD_CB'}->($record);
}
}
1;
( run in 0.738 second using v1.01-cache-2.11-cpan-39bf76dae61 )