ETL-Yertl
view release on metacpan or search on metacpan
lib/ETL/Yertl/InputSeries.pm view on Meta::CPAN
#pod
#pod =head1 SEE ALSO
#pod
#pod L<ETL::Yertl::FormatStream>
#pod
#pod =cut
use ETL::Yertl;
use base 'IO::Async::Notifier';
use Carp qw( croak );
use Scalar::Util qw( weaken );
sub configure {
my ( $self, %args ) = @_;
# Args to pass to streams as we create them
# XXX: This is why I would prefer already having the streams
# created!
for my $arg ( qw( format ) ) {
$self->{stream_args}{ $arg } = delete $args{ $arg } if $args{ $arg };
}
lib/ETL/Yertl/InputSeries.pm view on Meta::CPAN
sub _add_to_loop {
my ( $self ) = @_;
$self->can_event( "on_doc" )
or croak "Expected either an on_doc callback or to be able to ->on_doc";
$self->_shift_stream;
}
sub _shift_stream {
my ( $self ) = @_;
weaken $self;
my $stream = shift @{ $self->{streams} };
my $fh;
if ( !ref $stream ) {
open $fh, '<', $stream or die "Could not open $stream for reading: $!";
}
elsif ( ref $stream eq 'GLOB' ) {
$fh = $stream;
}
else {
die "Unknown stream type '$stream': Should be path or filehandle";
lib/ETL/Yertl/Transform.pm view on Meta::CPAN
#pod transform_doc => sub { ... },
#pod ) >> $output;
#pod
#pod =head1 SEE ALSO
#pod
#pod L<ETL::Yertl>, L<ETL::Yertl::FormatStream>
#pod
#pod =cut
use ETL::Yertl;
use Scalar::Util qw( weaken );
use Carp qw( croak );
use base 'IO::Async::Notifier';
# override pipe, <<, and >> to set on_read_doc and on_write_doc
# handlers appropriately
use overload
'>>' => \&_set_output,
'<<' => \&_set_input,
'|' => \&_pipe,
lib/ETL/Yertl/Transform.pm view on Meta::CPAN
#pod later, so that transforms can be given new sources/destinations.
#pod
#pod =cut
sub configure {
my ( $self, %args ) = @_;
if ( $args{source} ) {
# Register ourselves with the source
my $source = $self->{source} = delete $args{source};
weaken $self;
$source->configure(
on_doc => sub {
my ( $source, $doc, $eof ) = @_;
local $_ = $doc;
my @docs = $self->invoke_event( transform_doc => $doc );
# ; say "Writing docs from return: " . join ", ", @docs;
# ; use Data::Dumper;
# ; say STDERR Dumper( \@docs );
$self->write( $_ ) for grep { $_ } @docs;
return;
( run in 0.935 second using v1.01-cache-2.11-cpan-65fba6d93b7 )