App-RecordStream
view release on metacpan or search on metacpan
lib/App/RecordStream/Operation/xform.pm view on Meta::CPAN
package App::RecordStream::Operation::xform;
our $VERSION = "4.0.25";
use strict;
use base qw(App::RecordStream::Operation);
use App::RecordStream::Executor::Getopt;
use App::RecordStream::Executor;
sub init {
my $this = shift;
my $args = shift;
my $executor_options = App::RecordStream::Executor::Getopt->new();
my $before = 0;
my $after = 0;
my $post_snippet;
my $pre_snippet;
my $spec = {
'B|before=n' => \$before,
'A|after=n' => \$after,
'C|context=n' => sub { $before = $_[1]; $after = $_[1]; },
'post-snippet=s' => \$post_snippet,
'pre-snippet=s' => \$pre_snippet,
$executor_options->arguments(),
};
$this->parse_options($args, $spec, ['bundling']);
my $expression = $executor_options->get_string($args);
my $executor = $this->create_executor($expression, $post_snippet, $pre_snippet);
$this->{'BEFORE'} = $before;
$this->{'AFTER'} = $after;
$this->{'EXECUTOR'} = $executor;
$this->{'XFORM_REF'} = $executor->get_code_ref('xform');
$this->{'BEFORE_ARRAY'} = [];
$this->{'AFTER_ARRAY'} = [];
$this->{'SPOOLED_INPUT'} = [];
$this->{'SPOOLED_OUTPUT'} = [];
$executor->execute_method('pre_xform');
$this->handle_spools();
}
sub create_executor {
my $this = shift;
my $snippet = shift;
my $post_snippet = shift || '';
my $pre_snippet = shift || '';
my $args = {
xform => {
code => <<" CODE",
\$line++;
$snippet
; # Safe from a trailing comment in \$snippet
\$r
CODE
arg_names => [qw(r filename B A)],
},
post_xform => {
code => $post_snippet,
},
pre_xform => {
code => $pre_snippet,
},
};
my $executor;
eval {
$executor = App::RecordStream::Executor->new($args);
};
if ( $@ ) {
die "FATAL: Problem compiling a snippet: $@";
}
# Initialize the annonymous sub refs to contain $this
$executor->set_executor_method('push_input', sub {
$this->push_input(@_);
});
$executor->set_executor_method('push_output', sub {
$this->push_output(@_);
});
return $executor;
}
sub push_input {
my $this = shift;
push @{$this->{'SPOOLED_INPUT'}}, @_;
}
sub push_output {
my $this = shift;
$this->{'SUPPRESS_R'} = 1;
push @{$this->{'SPOOLED_OUTPUT'}}, @_;
}
sub accept_record {
my $this = shift;
my $record = shift;
my $before = $this->{'BEFORE'};
my $after = $this->{'AFTER'};
if ( $before == 0 && $after == 0 ) {
return $this->run_record_with_context($record);
}
my $before_array = $this->{'BEFORE_ARRAY'};
my $after_array = $this->{'AFTER_ARRAY'};
my $current_record = $this->{'CURRENT_RECORD'};
push @$after_array, $record;
if ( scalar @$after_array > $after ) {
my $new_record = shift @$after_array;
unshift @$before_array, $current_record if ( $current_record );
$current_record = $new_record;
}
if ( scalar @$after_array > $after ) {
my $new_record = shift @$after_array;
pop @$before_array if ( scalar @$before_array > $before );
unshift @$before_array, $current_record if ( $current_record );
$current_record = $new_record;
}
$this->{'CURRENT_RECORD'} = $current_record;
pop @$before_array if ( scalar @$before_array > $before );
if ( !$current_record ) {
return 1;
}
$this->{'CURRENT_RECORD'} = $current_record;
return $this->run_record_with_context($current_record, $before_array, $after_array);
}
sub stream_done {
my $this = shift;
my $after_array = $this->{'AFTER_ARRAY'};
if ( scalar @$after_array > 0 ) {
my $before = $this->{'BEFORE'};
my $before_array = $this->{'BEFORE_ARRAY'};
my $current_record = $this->{'CURRENT_RECORD'};
while ( scalar @$after_array ) {
my $new_record = shift @$after_array;
unshift @$before_array, $current_record if ( $current_record );
$current_record = $new_record;
pop @$before_array if ( scalar @$before_array > $before );
$this->run_record_with_context($current_record, $before_array, $after_array);
}
}
$this->{'EXECUTOR'}->execute_method('post_xform');
$this->handle_spools();
}
sub run_record_with_context {
my $this = shift;
my $record = shift;
my $before = shift;
my $after = shift;
my $value = $this->run_xform_with_record($record, $before, $after);
if ( ! $this->{'SUPPRESS_R'} ) {
if ( ref($value) eq 'ARRAY' ) {
foreach my $new_record (@$value) {
if ( ref($new_record) eq 'HASH' ) {
$this->push_record(App::RecordStream::Record->new($new_record));
}
else {
$this->push_record($new_record);
}
}
}
else {
$this->push_record($value);
}
}
return $this->handle_spools();
}
sub has_spooled_data {
my $this = shift;
return (scalar @{$this->{'SPOOLED_INPUT'}} > 0) || (scalar @{$this->{'SPOOLED_OUTPUT'}} > 0);
}
sub handle_spools {
my $this = shift;
$this->{'SUPPRESS_R'} = 0;
while ( @{$this->{'SPOOLED_OUTPUT'}} ) {
my $new_record = shift @{$this->{'SPOOLED_OUTPUT'}};
if ( ref($new_record) eq 'HASH' ) {
$new_record = App::RecordStream::Record->new($new_record);
}
$this->push_record($new_record);
}
while ( @{$this->{'SPOOLED_INPUT'}} ) {
my $new_record = shift @{$this->{'SPOOLED_INPUT'}};
if ( ref($new_record) eq 'HASH' ) {
$new_record = App::RecordStream::Record->new($new_record);
}
if (! $this->accept_record($new_record) ) {
#we've requested a stop, clear the input and return 0
$this->{'SPOOLED_INPUT'} = [];
return 0;
}
}
return 1;
}
sub run_xform_with_record {
my $this = shift;
my $record = shift;
my $before = shift;
my $after = shift;
if ( $before ) {
$before = [@$before];
$after = [@$after];
}
# Must copy before and after due to autovivification in the case of:
# {{after}} = $A->[0]->{'foo'}
# (which is unintintional vivification into the array in stream_done)
return $this->{'XFORM_REF'}->(
$record,
App::RecordStream::Operation::get_current_filename(),
$before,
$after,
);
}
sub add_help_types {
my $this = shift;
$this->use_help_type('snippet');
$this->use_help_type('keyspecs');
}
sub usage {
my $this = shift;
my $options = [
App::RecordStream::Executor::options_help(),
['A NUM', 'Make NUM records after this one available in $A (closest record to current in first position)'],
['B NUM', 'Make NUM records before this one available in $B (closest record to current in first position)'],
['C NUM', 'Make NUM records after this one available in $A and $B, as per -A NUM and -B NUM'],
['post-snippet SNIP', 'A snippet to run once the stream has completed'],
['pre-snippet SNIP', 'A snippet to run before the stream starts'],
];
my $args_string = $this->options_string($options);
return <<USAGE;
Usage: recs-xform <args> <expr> [<files>]
__FORMAT_TEXT__
<expr> is evaluated as perl on each record of input (or records from
<files>) with \$r set to a App::RecordStream::Record object and \$line set to the current
line number (starting at 1). All records are printed back out (changed as
they may be).
If \$r is set to an ARRAY ref in the expr, then the values of the array will
be treated as records and outputed one to a line. The values of the array
may either be a hash ref or a App::RecordStream::Record object. The
( run in 0.540 second using v1.01-cache-2.11-cpan-39bf76dae61 )