App-RecordStream

 view release on metacpan or  search on metacpan

lib/App/RecordStream/Operation/collate.pm  view on Meta::CPAN

    my $map_snippet = App::RecordStream::DomainLanguage::Snippet->new($map_string);
    my $reduce_snippet = App::RecordStream::DomainLanguage::Snippet->new($reduce_string);
    my $squish_snippet = App::RecordStream::DomainLanguage::Snippet->new($squish_string);

    $aggregator_objects->{$name} = App::RecordStream::DomainLanguage::Library::map_reduce_aggregator($map_snippet, $reduce_snippet, $squish_snippet);
  }

  for(my $i = 0; $i < @ii_aggregators; 1) {
    my $name = $ii_aggregators[$i++];
    my $initial_string = $ii_aggregators[$i++];
    my $combine_string = $ii_aggregators[$i++];
    my $squish_string = $ii_aggregators[$i++];

    my $initial_snippet = App::RecordStream::DomainLanguage::Snippet->new($initial_string);
    my $combine_snippet = App::RecordStream::DomainLanguage::Snippet->new($combine_string);
    my $squish_snippet = App::RecordStream::DomainLanguage::Snippet->new($squish_string);

    $aggregator_objects->{$name} = App::RecordStream::DomainLanguage::Library::inject_into_aggregator($initial_snippet, $combine_snippet, $squish_snippet);
  }

  $clumper_options->check_options(App::RecordStream::Operation::collate::BaseClumperCallback->new($aggregator_objects, $incremental, $bucket, sub { $this->push_record($_[0]); }));
}

sub build_dlaggregator {
  my $dlaggregators_ref = shift;
  my $string = shift;

  my $name;
  if($string =~ s/^([^=]*)=//) {
    $name = $1;
  }
  else {
    die "Bad domain language aggregator option (missing '=' to separate name and code): " . $string;
  }

  $dlaggregators_ref->{$name} = App::RecordStream::DomainLanguage::Snippet->new($string)->evaluate_as('AGGREGATOR');
}

sub accept_record {
  my $this   = shift;
  my $record = shift;

  $this->{'CLUMPER_OPTIONS'}->accept_record($record);
}

sub stream_done {
  my $this = shift;

  $this->{'CLUMPER_OPTIONS'}->stream_done();
}

sub print_usage {
  my $this    = shift;
  my $message = shift;

  if ( $message && UNIVERSAL::isa($message, 'CODE') ) {
    $message->();
    exit 1;
  }

  $this->SUPER::print_usage($message);
}

sub add_help_types {
  my $this = shift;
  $this->use_help_type('keyspecs');
  $this->use_help_type('keygroups');
  $this->use_help_type('keys');
  $this->use_help_type('domainlanguage');
  $this->use_help_type('clumping');
  $this->add_help_type(
    'aggregators',
    sub { print App::RecordStream::Aggregator->list_implementations(); },
    'List the aggregators'
  );
  $this->add_help_type(
    'more',
    sub { $this->more_help() },
    'Larger help documentation'
  );
}

sub usage {
  my $this = shift;

  my $options = [
    [ 'dlaggregator|-A ...', 'Specify a domain language aggregate.  See "Domain Language Integration" below.'],
    [ 'aggregator|-a <aggregators>', 'Colon separated list of aggregate field specifiers.  See "Aggregates" section below.'],
    [ 'mr-agg <name> <map> <reduce> <squish>', 'Specify a map reduce aggregator via 3 snippets, similar to mr_agg() from the domain language.'],
    [ 'ii-agg <name> <initial> <combine> <squish>', 'Specify an inject into aggregator via 3 snippets, similar to ii_agg() from the domain language.'],
    [ 'incremental', 'Output a record every time an input record is added to a clump (instead of every time a clump is flushed).'],
    [ '[no]-bucket', 'With --bucket outputs one record per clump, with --no-bucket outputs one record for each record that went into the clump.'],
    $this->{'CLUMPER_OPTIONS'}->main_usage(),

    [ 'list-aggregators|--list', 'Bail and output a list of aggregators' ],
    [ 'show-aggregator <aggregator>', 'Bail and output this aggregator\'s detailed usage.'],
    $this->{'CLUMPER_OPTIONS'}->help_usage(),
  ];

  my $args_string = $this->options_string($options);

  return <<USAGE
Usage: recs-collate <args> [<files>]
   __FORMAT_TEXT__
   Take records, grouped togther by --keys, and compute statistics (like
   average, count, sum, concat, etc) within those groups.

   For starting with collate, try doing single --key collates with some number
   of aggregators (list available in --list-agrregators)
   __FORMAT_TEXT__

Arguments:
$args_string

Examples:
   Count clumps of adjacent lines with matching x fields.
      recs-collate --adjacent --key x --aggregator count
   Count number of each x field value in the entire file.
      recs-collate --key x --aggregator count
   Finds the maximum latency for each date, hour pair
      recs-collate --key date,hour --aggregator worst_latency=max,latency



( run in 0.617 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )