Data-Enumerable-Lazy
view release on metacpan or search on metacpan
lib/Data/Enumerable/Lazy.pm view on Meta::CPAN
we want to iterate over all entries in the table, and we want the data to be
retrieved in batches by 10 elements in order to reduce the number of queries.
We don't want to compute the number of steps in advance, as the number might
be inaccurate: let's assume we're paginating over some new tweets and the new
entries might be created on the flight.
use DBI;
my $dbh = setup_dbh(); # Some config
my $last_id = -1;
my $limit = 10;
my $offset = 0;
my $tweet_enum = Data::Enumerable::Lazy->new({
on_has_next => sub {
my $sth = $dbh->prepare('SELECT count(1) from Tweets where id > ?');
$sth->execute($last_id);
my ($cnt) = $sth->fetchrow_array;
return int($cnt) > 0;
},
on_next => sub {
my ($self) = @_;
my $sth = $dbh->prepare('SELECT * from Tweets ORDER BY id LIMIT ? OFFSET ?');
$sth->execute($lmit, $offset);
$offset += $limit;
my @tweets = $sth->fetchrow_array;
$last_id = $tweets[-1]->{id};
$self->yield(Data::Enumerable::Lazy->from_list(@tweets));
},
is_finite => 1,
});
while ($tweet_enum->has_next) {
my $tweet = $tweet_enum->next;
# do something with this tweet
}
In this example a tweet consumer is abstracted from any DBI bookkeeping and
consumes tweet entries one by one without any prior knowledge about the table
size and might work on a rapidly growing dataset.
In order to reduce the number of queries, we query the data in batches by 10
elements max.
=head2 Redis queue consumer
use Redis;
my $redis = Redis->new;
my $queue_enum = Data::Enumerable::Lazy->new({
on_has_next => sub { 1 },
on_next => sub {
# Blocking right POP
$redis->brpop();
},
});
while (my $queue_item = $queue_enum->next) {
# do something with the queue item
}
In this example the client is blocked until there is an element available in
the queue, but it's hidden away from the clients who consume the data item by
item.
=head2 Kafka example
Kafka consumer wrapper is another example of a lazy calculation application.
Lazy enumerables are very naturally co-operated with streaming data, like
Kafka. In this example we're fetching batches of messages from Kafka topic,
grep out corrupted ones and proceed with the mesages.
use Kafka qw($DEFAULT_MAX_BYTES);
use Kafka::Connection;
use Kafka::Consumer;
my $kafka_consumer = Kafka::Consumer->new(
Connection => Kafka::Connection->new( host => 'localhost', ),
);
my $partition = 0;
my $offset = 0;
my $kafka_enum = Data::Enumerable::Lazy->new({
on_has_next => sub { 1 },
on_next => sub {
my ($self) = @_;
# Fetch messages in batch
my $messages = $kafka_consumer->fetch({
'topic',
$partition,
$offset,
$DEFAULT_MAX_BYTES
});
if ($messages) {
# Note the grep function applied: we're filtering away corrupted messages
$self->yield(Data::Enumerable::Lazy->from_list(@$messages))->grep(sub { $_[0]->valid });
} else {
# If there are no more messages, we return an empty enum, this is
# another handy use-case for nested enums.
$self->yield(Data::Enumerable::Lazy->empty);
}
},
});
while (my $message = $kafka_enum->next) {
# handle the message
}
=cut
=head1 INSTALLATION
To install this module type the following:
perl Makefile.PL
make
make test
make install
=cut
use Carp;
use List::Util;
( run in 1.604 second using v1.01-cache-2.11-cpan-df04353d9ac )