ETL-Yertl

 view release on metacpan or  search on metacpan

lib/ETL/Yertl/InputSeries.pm  view on Meta::CPAN

#pod
#pod =head1 SEE ALSO
#pod
#pod L<ETL::Yertl::FormatStream>
#pod
#pod =cut

use ETL::Yertl;
use base 'IO::Async::Notifier';
use Carp qw( croak );
use Scalar::Util qw( weaken );

sub configure {
    my ( $self, %args ) = @_;

    # Args to pass to streams as we create them
    # XXX: This is why I would prefer already having the streams
    # created!
    for my $arg ( qw( format ) ) {
        $self->{stream_args}{ $arg } = delete $args{ $arg } if $args{ $arg };
    }

lib/ETL/Yertl/InputSeries.pm  view on Meta::CPAN


sub _add_to_loop {
    my ( $self ) = @_;
    $self->can_event( "on_doc" )
        or croak "Expected either an on_doc callback or to be able to ->on_doc";
    $self->_shift_stream;
}

sub _shift_stream {
    my ( $self ) = @_;
    weaken $self;
    my $stream = shift @{ $self->{streams} };
    my $fh;
    if ( !ref $stream ) {
        open $fh, '<', $stream or die "Could not open $stream for reading: $!";
    }
    elsif ( ref $stream eq 'GLOB' ) {
        $fh = $stream;
    }
    else {
        die "Unknown stream type '$stream': Should be path or filehandle";

lib/ETL/Yertl/Transform.pm  view on Meta::CPAN

#pod         transform_doc => sub { ... },
#pod     ) >> $output;
#pod
#pod =head1 SEE ALSO
#pod
#pod L<ETL::Yertl>, L<ETL::Yertl::FormatStream>
#pod
#pod =cut

use ETL::Yertl;
use Scalar::Util qw( weaken );
use Carp qw( croak );

use base 'IO::Async::Notifier';

# override pipe, <<, and >> to set on_read_doc and on_write_doc
# handlers appropriately
use overload
    '>>' => \&_set_output,
    '<<' => \&_set_input,
    '|' => \&_pipe,

lib/ETL/Yertl/Transform.pm  view on Meta::CPAN

#pod later, so that transforms can be given new sources/destinations.
#pod
#pod =cut

sub configure {
    my ( $self, %args ) = @_;

    if ( $args{source} ) {
        # Register ourselves with the source
        my $source = $self->{source} = delete $args{source};
        weaken $self;
        $source->configure(
            on_doc => sub {
                my ( $source, $doc, $eof ) = @_;
                local $_ = $doc;
                my @docs = $self->invoke_event( transform_doc => $doc );
                # ; say "Writing docs from return: " . join ", ", @docs;
                # ; use Data::Dumper;
                # ; say STDERR Dumper( \@docs );
                $self->write( $_ ) for grep { $_ } @docs;
                return;



( run in 0.935 second using v1.01-cache-2.11-cpan-65fba6d93b7 )