Ryu

 view release on metacpan or  search on metacpan

lib/Ryu/Source.pm  view on Meta::CPAN

=over 4

=item * items to put into one end

=item * processing to attach to the other end

=back

For the first, call L</emit>:

 use Future::AsyncAwait;
 # 1s drifting periodic timer
 while(1) {
  await $loop->delay_future(after => 1);
  $src->emit('');
 }

For the second, this would be L</each>:

 $src->each(sub { print "Had timer tick\n" });

So far, not so useful - the power of this type of reactive programming is in the
ability to chain and combine disparate event sources.

At this point, L<https://rxmarbles.com> is worth a visit - this provides a clear
visual demonstration of how to combine multiple event streams using the chaining
methods. Most of the API here is modelled after similar principles.

First, the L</map> method: this provides a way to transform each item into
something else:

 $src->map(do { my $count = 0; sub { ++$count } })
     ->each(sub { print "Count is now $_\n" })

Next, L</filter> provides an equivalent to Perl's L<grep> functionality:

 $src->map(do { my $count = 0; sub { ++$count } })
     ->filter(sub { $_ % 2 })
     ->each(sub { print "Count is now at an odd number: $_\n" })

You can stack these:

 $src->map(do { my $count = 0; sub { ++$count } })
     ->filter(sub { $_ % 2 })
     ->filter(sub { $_ % 5 })
     ->each(sub { print "Count is now at an odd number which is not divisible by 5: $_\n" })

or:

 $src->map(do { my $count = 0; sub { ++$count } })
     ->map(sub { $_ % 3 ? 'fizz' : $_ })
     ->map(sub { $_ % 5 ? 'buzz' : $_ })
     ->each(sub { print "An imperfect attempt at the fizz-buzz game: $_\n" })

=cut

no indirect;
use sort qw(stable);

use Scalar::Util ();
use Ref::Util ();
use List::Util ();
use List::UtilsBy;
use Encode ();
use Syntax::Keyword::Try;
use Future;
use Future::Queue;
use curry::weak;

use Ryu::Buffer;

use Log::Any qw($log);

=head1 GLOBALS

=head2 $FUTURE_FACTORY

This is a coderef which should return a new L<Future>-compatible instance.

Example overrides might include:

 $Ryu::Source::FUTURE_FACTORY = sub { Mojo::Future->new->set_label($_[1]) };

=cut

our $FUTURE_FACTORY = sub {
    Future->new->set_label($_[1])
};

=head2 %ENCODER

An encoder is a coderef which takes input and returns output.

=cut

our %ENCODER = (
    utf8 => sub {
        sub {
            Encode::encode_utf8($_)
        }
    },
    json => sub {
        require JSON::MaybeXS;
        my $json = JSON::MaybeXS->new(@_);
        sub {
            $json->encode($_)
        }
    },
    csv => sub {
        require Text::CSV;
        my $csv = Text::CSV->new(@_);
        sub {
            die $csv->error_input unless $csv->combine(@$_);
            $csv->string
        }
    },
    base64 => sub {
        require MIME::Base64;
        sub {
            MIME::Base64::encode_base64($_, '');
        }

lib/Ryu/Source.pm  view on Meta::CPAN


=head2 map

A bit like L<perlfunc/map>.

Takes a single parameter - the coderef to execute for each item. This should return
a scalar value which will be used as the next item.

Often useful in conjunction with a C<< do >> block to provide a closure.

Examples:

 $src->map(do {
   my $idx = 0;
   sub {
    [ @$_, ++$idx ]
   }
 })

=cut

sub map : method {
    my ($self, $code) = @_;

    my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
    $self->each_while_source(sub {
        $src->emit(Scalar::Util::blessed($_)
            ? (scalar $_->$code)
            : !ref($code)
            ? $_->{$code}
            : scalar $_->$code
        )
    }, $src);
}

=head2 flat_map

Similar to L</map>, but will flatten out some items:

=over 4

=item * an arrayref will be expanded out to emit the individual elements

=item * for a L<Ryu::Source>, passes on any emitted elements

=back

This also means you can "merge" items from a series of sources.

Note that this is not recursive - an arrayref of arrayrefs will be expanded out
into the child arrayrefs, but no further.

Failure on any input source will cause this source to be marked as failed as well.

=cut

sub flat_map {
    my ($self, $code) = splice @_, 0, 2;

    # Upgrade ->flat_map(method => args...) to a coderef
    if(!Ref::Util::is_plain_coderef($code)) {
        my $method = $code;
        my @args = @_;
        $code = sub { $_->$method(@args) }
    }

    my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);

    Scalar::Util::weaken(my $weak_sauce = $src);
    my $add = sub {
        my $v = shift;
        my $src = $weak_sauce or return;

        my $k = "$v";
        $src->{waiting}{$k} = $v->on_ready(sub {
            my ($f) = @_;
            return unless my $src = $weak_sauce;

            # Any failed input source should propagate failure immediately
            if($f->is_failed) {
                # Clear out our waitlist, since we don't want to hold those references any more
                delete $src->{waiting};
                $src->fail($f->failure) unless $src->is_ready;
                return;
            }

            delete $src->{waiting}{$k};
            $src->finish unless %{$src->{waiting}};
        });
        $log->tracef("Added %s which will bring our count to %d", $k, 0 + keys %{$src->{waiting}});
    };

    $add->($self->_completed);
    $self->each_while_source(sub {
        my $src = $weak_sauce or return;
        for ($code->($_)) {
            my $item = $_;
            if(Ref::Util::is_plain_arrayref($item)) {
                $log->tracef("Have an arrayref of %d items", 0 + @$item);
                for(@$item) {
                    last if $src->is_ready;
                    $src->emit($_);
                }
            } elsif(Scalar::Util::blessed($item) && $item->isa(__PACKAGE__)) {
                $log->tracef("This item is a source");
                $src->on_ready(sub {
                    return if $item->is_ready;
                    $log->tracef("Marking %s as ready because %s was", $item->describe, $src->describe);
                    shift->on_ready($item->_completed);
                });
                $add->($item->_completed);
                $item->each_while_source(sub {
                    my $src = $weak_sauce or return;
                    $src->emit($_)
                }, $src)->on_ready(sub {
                    undef $item;
                });
            }
        }
    }, $src);
    $src
}

=head2 split

Splits the input on the given delimiter.

By default, will split into characters.

Note that each item will be processed separately - the buffer won't be
retained across items, see L</by_line> for that.

=cut

sub split : method {
    my ($self, $delim) = @_;
    $delim //= qr//;

    my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
    $self->each_while_source(sub { $src->emit($_) for split $delim, $_ }, $src);
}

=head2 chunksize

Splits input into fixed-size chunks.

Note that output is always guaranteed to be a full chunk - if there is partial input
at the time the input stream finishes, those extra bytes will be discarded.

=cut

sub chunksize : method {
    my ($self, $size) = @_;
    die 'need positive chunk size parameter' unless $size && $size > 0;

    my $buffer = '';
    my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
    $self->each_while_source(sub {



( run in 4.552 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )