App-RecordStream
view release on metacpan or search on metacpan
lib/App/RecordStream/Operation/join.pm view on Meta::CPAN
"left" => \$left,
"right" => \$right,
"inner" => \$inner,
"outer" => \$outer,
"operation=s" => \$operation,
"accumulate-right" => \$accumulate_right,
};
$this->parse_options($args, $spec);
my $inputkey = shift @$args
or die "You must provide inputkey\n";
my $dbkey = shift @$args
or die "You must provide dbkey\n";
my $dbfile = shift @$args
or die "You must provide dbfile\n";
$this->{'ACCUMULATE_RIGHT'} = $accumulate_right;
$this->{'DB_KEY'} = $dbkey;
$this->{'INPUT_KEY'} = $inputkey;
$this->{'KEEP_LEFT'} = $left || $outer;
$this->{'KEEP_RIGHT'} = $right || $outer;
if ( $operation ) {
$this->{'OPERATION'} = App::RecordStream::Executor->transform_code($operation);
}
$this->create_db($dbfile, $dbkey);
$this->{'KEYS_PRINTED'} = {};
}
sub create_db {
my $this = shift;
my $file = shift;
my $key = shift;
my $db_stream = App::RecordStream::InputStream->new('FILE' => $file);
my %db;
my $record;
while($record = $db_stream->get_record()) {
my $value = $this->value_for_key($record, $key);
$db{$value} = [] unless ( $db{$value} );
push @{$db{$value}}, $record;
}
$this->{'DB'} = \%db;
}
sub value_for_key {
my $this = shift;
my $record = shift;
my $key = shift;
return join "\x1E", # ASCII record separator (RS)
map { ${$record->guess_key_from_spec($_, 0)} }
split /,/, $key;
}
sub accept_record {
my $this = shift;
my $record = shift;
my $value = $this->value_for_key($record, $this->{'INPUT_KEY'});
my $db = $this->{'DB'};
if(my $db_records = $db->{$value}) {
foreach my $db_record (@$db_records) {
if ($this->{'ACCUMULATE_RIGHT'}) {
if ($this->{'OPERATION'}) {
$this->run_expression($db_record, $record);
}
else {
foreach my $this_key (keys %$record) {
if (!exists($db_record->{$this_key})) {
$db_record->{$this_key} = $record->{$this_key};
}
}
}
}
else {
if ($this->{'OPERATION'}) {
my $output_record = App::RecordStream::Record->new(%$db_record);
$this->run_expression($output_record, $record);
$this->push_record($output_record);
}
else {
$this->push_record(App::RecordStream::Record->new(%$record, %$db_record));
}
if ($this->{'KEEP_LEFT'}) {
$this->{'KEYS_PRINTED'}->{$value} = 1;
}
}
}
}
elsif ($this->{'KEEP_RIGHT'}) {
$this->push_record($record);
}
return 1;
}
# TODO: shove down into executor
sub run_expression {
my $__MY__this = shift;
my $d = shift;
my $i = shift;
no strict;
no warnings;
eval $__MY__this->{'OPERATION'};
if ( $@ ) {
warn "Code died with $@\n";
( run in 0.684 second using v1.01-cache-2.11-cpan-39bf76dae61 )