Data-Enumerable-Lazy
view release on metacpan or search on metacpan
use DBI;
my $dbh = setup_dbh(); # Some config
my $last_id = -1;
my $limit = 10;
my $offset = 0;
my $tweet_enum = Data::Enumerable::Lazy->new({
on_has_next => sub {
my $sth = $dbh->prepare('SELECT count(1) from Tweets where id > ?');
$sth->execute($last_id);
my ($cnt) = $sth->fetchrow_array;
return int($cnt) > 0;
},
on_next => sub {
my ($self) = @_;
my $sth = $dbh->prepare('SELECT * from Tweets ORDER BY id LIMIT ? OFFSET ?');
$sth->execute($lmit, $offset);
$offset += $limit;
my @tweets = $sth->fetchrow_array;
$last_id = $tweets[-1]->{id};
$self->yield(Data::Enumerable::Lazy->from_list(@tweets));
},
is_finite => 1,
});
while ($tweet_enum->has_next) {
my $tweet = $tweet_enum->next;
# do something with this tweet
}
In this example a tweet consumer is abstracted from any DBI bookkeeping
and consumes tweet entries one by one without any prior knowledge about
the table size and might work on a rapidly growing dataset.
In order to reduce the number of queries, we query the data in batches
by 10 elements max.
Redis queue consumer
use Redis;
my $redis = Redis->new;
my $queue_enum = Data::Enumerable::Lazy->new({
on_has_next => sub { 1 },
on_next => sub {
# Blocking right POP
$redis->brpop();
},
});
while (my $queue_item = $queue_enum->next) {
# do something with the queue item
}
In this example the client is blocked until there is an element
available in the queue, but it's hidden away from the clients who
consume the data item by item.
Kafka example
Kafka consumer wrapper is another example of a lazy calculation
application. Lazy enumerables are very naturally co-operated with
streaming data, like Kafka. In this example we're fetching batches of
messages from Kafka topic, grep out corrupted ones and proceed with the
mesages.
use Kafka qw($DEFAULT_MAX_BYTES);
use Kafka::Connection;
use Kafka::Consumer;
my $kafka_consumer = Kafka::Consumer->new(
Connection => Kafka::Connection->new( host => 'localhost', ),
);
my $partition = 0;
my $offset = 0;
my $kafka_enum = Data::Enumerable::Lazy->new({
on_has_next => sub { 1 },
on_next => sub {
my ($self) = @_;
# Fetch messages in batch
my $messages = $kafka_consumer->fetch({
'topic',
$partition,
$offset,
$DEFAULT_MAX_BYTES
});
if ($messages) {
# Note the grep function applied: we're filtering away corrupted messages
$self->yield(Data::Enumerable::Lazy->from_list(@$messages))->grep(sub { $_[0]->valid });
} else {
# If there are no more messages, we return an empty enum, this is
# another handy use-case for nested enums.
$self->yield(Data::Enumerable::Lazy->empty);
}
},
});
while (my $message = $kafka_enum->next) {
# handle the message
}
INSTALLATION
To install this module type the following: perl Makefile.PL make make
test make install
OPTIONS
on_next($self, $element) :: CodeRef -> Data::Enumerable::Lazy | Any
`on_next' is a code ref, a callback which is being called every time the
generator is in demand for a new bit of data. Enumerable buffers up the
result of the previous calculation and if there are no more elements
left in the buffer, `on_next()' would be called.
`$element' is defined when the current collection is a contuniation of
another enumerable. I.e.:
my $enum = Data::Enumerable::Lazy->from_list(1, 2, 3);
my $enum2 = $enum->continue({
on_next => sub { my ($self, $i) = @_; $self->yield($i * $i) }
});
$enum2->to_list; # generates 1, 4, 9
In this case $i would be defined and it comes from the original
( run in 2.115 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )