App-RecordStream
view release on metacpan or search on metacpan
lib/App/RecordStream/Operation/collate.pm view on Meta::CPAN
my $map_snippet = App::RecordStream::DomainLanguage::Snippet->new($map_string);
my $reduce_snippet = App::RecordStream::DomainLanguage::Snippet->new($reduce_string);
my $squish_snippet = App::RecordStream::DomainLanguage::Snippet->new($squish_string);
$aggregator_objects->{$name} = App::RecordStream::DomainLanguage::Library::map_reduce_aggregator($map_snippet, $reduce_snippet, $squish_snippet);
}
for(my $i = 0; $i < @ii_aggregators; 1) {
my $name = $ii_aggregators[$i++];
my $initial_string = $ii_aggregators[$i++];
my $combine_string = $ii_aggregators[$i++];
my $squish_string = $ii_aggregators[$i++];
my $initial_snippet = App::RecordStream::DomainLanguage::Snippet->new($initial_string);
my $combine_snippet = App::RecordStream::DomainLanguage::Snippet->new($combine_string);
my $squish_snippet = App::RecordStream::DomainLanguage::Snippet->new($squish_string);
$aggregator_objects->{$name} = App::RecordStream::DomainLanguage::Library::inject_into_aggregator($initial_snippet, $combine_snippet, $squish_snippet);
}
$clumper_options->check_options(App::RecordStream::Operation::collate::BaseClumperCallback->new($aggregator_objects, $incremental, $bucket, sub { $this->push_record($_[0]); }));
}
sub build_dlaggregator {
my $dlaggregators_ref = shift;
my $string = shift;
my $name;
if($string =~ s/^([^=]*)=//) {
$name = $1;
}
else {
die "Bad domain language aggregator option (missing '=' to separate name and code): " . $string;
}
$dlaggregators_ref->{$name} = App::RecordStream::DomainLanguage::Snippet->new($string)->evaluate_as('AGGREGATOR');
}
sub accept_record {
my $this = shift;
my $record = shift;
$this->{'CLUMPER_OPTIONS'}->accept_record($record);
}
sub stream_done {
my $this = shift;
$this->{'CLUMPER_OPTIONS'}->stream_done();
}
sub print_usage {
my $this = shift;
my $message = shift;
if ( $message && UNIVERSAL::isa($message, 'CODE') ) {
$message->();
exit 1;
}
$this->SUPER::print_usage($message);
}
sub add_help_types {
my $this = shift;
$this->use_help_type('keyspecs');
$this->use_help_type('keygroups');
$this->use_help_type('keys');
$this->use_help_type('domainlanguage');
$this->use_help_type('clumping');
$this->add_help_type(
'aggregators',
sub { print App::RecordStream::Aggregator->list_implementations(); },
'List the aggregators'
);
$this->add_help_type(
'more',
sub { $this->more_help() },
'Larger help documentation'
);
}
sub usage {
my $this = shift;
my $options = [
[ 'dlaggregator|-A ...', 'Specify a domain language aggregate. See "Domain Language Integration" below.'],
[ 'aggregator|-a <aggregators>', 'Colon separated list of aggregate field specifiers. See "Aggregates" section below.'],
[ 'mr-agg <name> <map> <reduce> <squish>', 'Specify a map reduce aggregator via 3 snippets, similar to mr_agg() from the domain language.'],
[ 'ii-agg <name> <initial> <combine> <squish>', 'Specify an inject into aggregator via 3 snippets, similar to ii_agg() from the domain language.'],
[ 'incremental', 'Output a record every time an input record is added to a clump (instead of every time a clump is flushed).'],
[ '[no]-bucket', 'With --bucket outputs one record per clump, with --no-bucket outputs one record for each record that went into the clump.'],
$this->{'CLUMPER_OPTIONS'}->main_usage(),
[ 'list-aggregators|--list', 'Bail and output a list of aggregators' ],
[ 'show-aggregator <aggregator>', 'Bail and output this aggregator\'s detailed usage.'],
$this->{'CLUMPER_OPTIONS'}->help_usage(),
];
my $args_string = $this->options_string($options);
return <<USAGE
Usage: recs-collate <args> [<files>]
__FORMAT_TEXT__
Take records, grouped togther by --keys, and compute statistics (like
average, count, sum, concat, etc) within those groups.
For starting with collate, try doing single --key collates with some number
of aggregators (list available in --list-agrregators)
__FORMAT_TEXT__
Arguments:
$args_string
Examples:
Count clumps of adjacent lines with matching x fields.
recs-collate --adjacent --key x --aggregator count
Count number of each x field value in the entire file.
recs-collate --key x --aggregator count
Finds the maximum latency for each date, hour pair
recs-collate --key date,hour --aggregator worst_latency=max,latency
( run in 0.617 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )