Ryu
view release on metacpan or search on metacpan
lib/Ryu/Source.pm view on Meta::CPAN
=over 4
=item * items to put into one end
=item * processing to attach to the other end
=back
For the first, call L</emit>:
use Future::AsyncAwait;
# 1s drifting periodic timer
while(1) {
await $loop->delay_future(after => 1);
$src->emit('');
}
For the second, this would be L</each>:
$src->each(sub { print "Had timer tick\n" });
So far, not so useful - the power of this type of reactive programming is in the
ability to chain and combine disparate event sources.
At this point, L<https://rxmarbles.com> is worth a visit - this provides a clear
visual demonstration of how to combine multiple event streams using the chaining
methods. Most of the API here is modelled after similar principles.
First, the L</map> method: this provides a way to transform each item into
something else:
$src->map(do { my $count = 0; sub { ++$count } })
->each(sub { print "Count is now $_\n" })
Next, L</filter> provides an equivalent to Perl's L<grep> functionality:
$src->map(do { my $count = 0; sub { ++$count } })
->filter(sub { $_ % 2 })
->each(sub { print "Count is now at an odd number: $_\n" })
You can stack these:
$src->map(do { my $count = 0; sub { ++$count } })
->filter(sub { $_ % 2 })
->filter(sub { $_ % 5 })
->each(sub { print "Count is now at an odd number which is not divisible by 5: $_\n" })
or:
$src->map(do { my $count = 0; sub { ++$count } })
->map(sub { $_ % 3 ? 'fizz' : $_ })
->map(sub { $_ % 5 ? 'buzz' : $_ })
->each(sub { print "An imperfect attempt at the fizz-buzz game: $_\n" })
=cut
no indirect;
use sort qw(stable);
use Scalar::Util ();
use Ref::Util ();
use List::Util ();
use List::UtilsBy;
use Encode ();
use Syntax::Keyword::Try;
use Future;
use Future::Queue;
use curry::weak;
use Ryu::Buffer;
use Log::Any qw($log);
=head1 GLOBALS
=head2 $FUTURE_FACTORY
This is a coderef which should return a new L<Future>-compatible instance.
Example overrides might include:
$Ryu::Source::FUTURE_FACTORY = sub { Mojo::Future->new->set_label($_[1]) };
=cut
our $FUTURE_FACTORY = sub {
Future->new->set_label($_[1])
};
=head2 %ENCODER
An encoder is a coderef which takes input and returns output.
=cut
our %ENCODER = (
utf8 => sub {
sub {
Encode::encode_utf8($_)
}
},
json => sub {
require JSON::MaybeXS;
my $json = JSON::MaybeXS->new(@_);
sub {
$json->encode($_)
}
},
csv => sub {
require Text::CSV;
my $csv = Text::CSV->new(@_);
sub {
die $csv->error_input unless $csv->combine(@$_);
$csv->string
}
},
base64 => sub {
require MIME::Base64;
sub {
MIME::Base64::encode_base64($_, '');
}
lib/Ryu/Source.pm view on Meta::CPAN
=head2 map
A bit like L<perlfunc/map>.
Takes a single parameter - the coderef to execute for each item. This should return
a scalar value which will be used as the next item.
Often useful in conjunction with a C<< do >> block to provide a closure.
Examples:
$src->map(do {
my $idx = 0;
sub {
[ @$_, ++$idx ]
}
})
=cut
sub map : method {
my ($self, $code) = @_;
my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
$self->each_while_source(sub {
$src->emit(Scalar::Util::blessed($_)
? (scalar $_->$code)
: !ref($code)
? $_->{$code}
: scalar $_->$code
)
}, $src);
}
=head2 flat_map
Similar to L</map>, but will flatten out some items:
=over 4
=item * an arrayref will be expanded out to emit the individual elements
=item * for a L<Ryu::Source>, passes on any emitted elements
=back
This also means you can "merge" items from a series of sources.
Note that this is not recursive - an arrayref of arrayrefs will be expanded out
into the child arrayrefs, but no further.
Failure on any input source will cause this source to be marked as failed as well.
=cut
sub flat_map {
my ($self, $code) = splice @_, 0, 2;
# Upgrade ->flat_map(method => args...) to a coderef
if(!Ref::Util::is_plain_coderef($code)) {
my $method = $code;
my @args = @_;
$code = sub { $_->$method(@args) }
}
my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
Scalar::Util::weaken(my $weak_sauce = $src);
my $add = sub {
my $v = shift;
my $src = $weak_sauce or return;
my $k = "$v";
$src->{waiting}{$k} = $v->on_ready(sub {
my ($f) = @_;
return unless my $src = $weak_sauce;
# Any failed input source should propagate failure immediately
if($f->is_failed) {
# Clear out our waitlist, since we don't want to hold those references any more
delete $src->{waiting};
$src->fail($f->failure) unless $src->is_ready;
return;
}
delete $src->{waiting}{$k};
$src->finish unless %{$src->{waiting}};
});
$log->tracef("Added %s which will bring our count to %d", $k, 0 + keys %{$src->{waiting}});
};
$add->($self->_completed);
$self->each_while_source(sub {
my $src = $weak_sauce or return;
for ($code->($_)) {
my $item = $_;
if(Ref::Util::is_plain_arrayref($item)) {
$log->tracef("Have an arrayref of %d items", 0 + @$item);
for(@$item) {
last if $src->is_ready;
$src->emit($_);
}
} elsif(Scalar::Util::blessed($item) && $item->isa(__PACKAGE__)) {
$log->tracef("This item is a source");
$src->on_ready(sub {
return if $item->is_ready;
$log->tracef("Marking %s as ready because %s was", $item->describe, $src->describe);
shift->on_ready($item->_completed);
});
$add->($item->_completed);
$item->each_while_source(sub {
my $src = $weak_sauce or return;
$src->emit($_)
}, $src)->on_ready(sub {
undef $item;
});
}
}
}, $src);
$src
}
=head2 split
Splits the input on the given delimiter.
By default, will split into characters.
Note that each item will be processed separately - the buffer won't be
retained across items, see L</by_line> for that.
=cut
sub split : method {
my ($self, $delim) = @_;
$delim //= qr//;
my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
$self->each_while_source(sub { $src->emit($_) for split $delim, $_ }, $src);
}
=head2 chunksize
Splits input into fixed-size chunks.
Note that output is always guaranteed to be a full chunk - if there is partial input
at the time the input stream finishes, those extra bytes will be discarded.
=cut
sub chunksize : method {
my ($self, $size) = @_;
die 'need positive chunk size parameter' unless $size && $size > 0;
my $buffer = '';
my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
$self->each_while_source(sub {
( run in 4.552 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )