App-RecordStream
view release on metacpan or search on metacpan
lib/App/RecordStream/Operation/multiplex/BaseClumperCallback.pm view on Meta::CPAN
package App::RecordStream::Operation::multiplex::BaseClumperCallback;
use strict;
use warnings;
use App::RecordStream::Operation;
use App::RecordStream::Record;
use App::RecordStream::Stream::Sub;
sub new {
my $class = shift;
my $script = shift;
my $args = shift;
my $line_key = shift;
my $record_cb = shift;
my $line_cb = shift;
my $this = {
'SCRIPT' => $script,
'ARGS' => $args,
'LINE_KEY' => $line_key,
'RECORD_CB' => $record_cb,
'LINE_CB' => $line_cb,
};
bless $this, $class;
return $this;
}
sub clumper_callback_begin {
my $this = shift;
my $bucket = shift;
my $record_cb = $this->{'RECORD_CB'};
my $record_cb2 = sub {
my $r = shift;
return $record_cb->(App::RecordStream::Record->new(%$r, %$bucket));
};
my $next = App::RecordStream::Stream::Sub->new($record_cb2, $this->{'LINE_CB'});
my @args = @{$this->{'ARGS'}};
my $op = App::RecordStream::Operation::create_operation($this->{'SCRIPT'}, \@args, $next);
if(@args) {
die "Extra options to multiplex operation.";
}
if(!$op->wants_input()) {
die "Multiplex operation must want input.";
}
return $op;
}
sub clumper_callback_push_record {
my $this = shift;
my $op = shift;
my $record = shift;
my $line_key = $this->{'LINE_KEY'};
if(defined($line_key)) {
$op->accept_line(${$record->guess_key_from_spec($line_key)});
}
else {
$op->accept_record($record);
}
}
sub clumper_callback_end {
my $this = shift;
my $op = shift;
$op->finish();
}
1;
( run in 0.483 second using v1.01-cache-2.11-cpan-39bf76dae61 )