App-RecordStream

 view release on metacpan or  search on metacpan

lib/App/RecordStream/Operation/join.pm  view on Meta::CPAN

    "left"             => \$left,
    "right"            => \$right,
    "inner"            => \$inner,
    "outer"            => \$outer,
    "operation=s"      => \$operation,
    "accumulate-right" => \$accumulate_right,
  };

  $this->parse_options($args, $spec);

  my $inputkey = shift @$args
    or die "You must provide inputkey\n";

  my $dbkey = shift @$args
    or die "You must provide dbkey\n";

  my $dbfile = shift @$args
    or die "You must provide dbfile\n";

  $this->{'ACCUMULATE_RIGHT'} = $accumulate_right;
  $this->{'DB_KEY'}           = $dbkey;
  $this->{'INPUT_KEY'}        = $inputkey;
  $this->{'KEEP_LEFT'}        = $left || $outer;
  $this->{'KEEP_RIGHT'}       = $right || $outer;


  if ( $operation ) {
    $this->{'OPERATION'} = App::RecordStream::Executor->transform_code($operation);
  }

  $this->create_db($dbfile, $dbkey);

  $this->{'KEYS_PRINTED'} = {};
}

sub create_db {
  my $this = shift;
  my $file = shift;
  my $key  = shift;

  my $db_stream = App::RecordStream::InputStream->new('FILE' => $file);
  my %db;
  my $record;

  while($record = $db_stream->get_record()) {
    my $value = $this->value_for_key($record, $key);

    $db{$value} = [] unless ( $db{$value} );
    push @{$db{$value}}, $record;
  }

  $this->{'DB'} = \%db;
}

sub value_for_key {
  my $this   = shift;
  my $record = shift;
  my $key    = shift;

  return join "\x1E", # ASCII record separator (RS)
          map { ${$record->guess_key_from_spec($_, 0)} }
        split /,/, $key;
}

sub accept_record {
  my $this   = shift;
  my $record = shift;

  my $value = $this->value_for_key($record, $this->{'INPUT_KEY'});

  my $db = $this->{'DB'};

  if(my $db_records = $db->{$value}) {
    foreach my $db_record (@$db_records) {
      if ($this->{'ACCUMULATE_RIGHT'}) {
        if ($this->{'OPERATION'}) {
          $this->run_expression($db_record, $record);
        }
        else {
          foreach my $this_key (keys %$record) {
            if (!exists($db_record->{$this_key})) {
              $db_record->{$this_key} = $record->{$this_key};
            }
          }
        }
      }
      else {
        if ($this->{'OPERATION'}) {
          my $output_record = App::RecordStream::Record->new(%$db_record);
          $this->run_expression($output_record, $record);
          $this->push_record($output_record);
        }
        else {
          $this->push_record(App::RecordStream::Record->new(%$record, %$db_record));
        }

        if ($this->{'KEEP_LEFT'}) {
          $this->{'KEYS_PRINTED'}->{$value} = 1;
        }
      }
    }
  }
  elsif ($this->{'KEEP_RIGHT'}) {
    $this->push_record($record);
  }

  return 1;
}

# TODO: shove down into executor
sub run_expression {
  my $__MY__this = shift;
  my $d    = shift;
  my $i    = shift;

  no strict;
  no warnings;
  eval $__MY__this->{'OPERATION'};

  if ( $@ ) {
    warn "Code died with $@\n";



( run in 0.684 second using v1.01-cache-2.11-cpan-39bf76dae61 )