Data-Enumerable-Lazy

 view release on metacpan or  search on metacpan

lib/Data/Enumerable/Lazy.pm  view on Meta::CPAN

we want to iterate over all entries in the table, and we want the data to be
retrieved in batches by 10 elements in order to reduce the number of queries.
We don't want to compute the number of steps in advance, as the number might
be inaccurate: let's assume we're paginating over some new tweets and the new
entries might be created on the flight.

  use DBI;
  my $dbh = setup_dbh(); # Some config

  my $last_id = -1;
  my $limit = 10;
  my $offset = 0;
  my $tweet_enum = Data::Enumerable::Lazy->new({
    on_has_next => sub {
      my $sth = $dbh->prepare('SELECT count(1) from Tweets where id > ?');
      $sth->execute($last_id);
      my ($cnt) = $sth->fetchrow_array;
      return int($cnt) > 0;
    },
    on_next => sub {
      my ($self) = @_;
      my $sth = $dbh->prepare('SELECT * from Tweets ORDER BY id LIMIT ? OFFSET ?');
      $sth->execute($lmit, $offset);
      $offset += $limit;
      my @tweets = $sth->fetchrow_array;
      $last_id = $tweets[-1]->{id};
      $self->yield(Data::Enumerable::Lazy->from_list(@tweets));
    },
    is_finite => 1,
  });

  while ($tweet_enum->has_next) {
    my $tweet = $tweet_enum->next;
    # do something with this tweet
  }

In this example a tweet consumer is abstracted from any DBI bookkeeping and
consumes tweet entries one by one without any prior knowledge about the table
size and might work on a rapidly growing dataset.

In order to reduce the number of queries, we query the data in batches by 10
elements max.

=head2 Redis queue consumer

  use Redis;

  my $redis = Redis->new;
  my $queue_enum = Data::Enumerable::Lazy->new({
    on_has_next => sub { 1 },
    on_next => sub {
      # Blocking right POP
      $redis->brpop();
    },
  });

  while (my $queue_item = $queue_enum->next) {
    # do something with the queue item
  }

In this example the client is blocked until there is an element available in
the queue, but it's hidden away from the clients who consume the data item by
item.

=head2 Kafka example

Kafka consumer wrapper is another example of a lazy calculation application.
Lazy enumerables are very naturally co-operated with streaming data, like
Kafka. In this example we're fetching batches of messages from Kafka topic,
grep out corrupted ones and proceed with the mesages.

  use Kafka qw($DEFAULT_MAX_BYTES);
  use Kafka::Connection;
  use Kafka::Consumer;

  my $kafka_consumer = Kafka::Consumer->new(
    Connection => Kafka::Connection->new( host => 'localhost', ),
  );

  my $partition = 0;
  my $offset = 0;
  my $kafka_enum = Data::Enumerable::Lazy->new({
    on_has_next => sub { 1 },
    on_next => sub {
      my ($self) = @_;
      # Fetch messages in batch
      my $messages = $kafka_consumer->fetch({
        'topic',
        $partition,
        $offset,
        $DEFAULT_MAX_BYTES
      });
      if ($messages) {
        # Note the grep function applied: we're filtering away corrupted messages
        $self->yield(Data::Enumerable::Lazy->from_list(@$messages))->grep(sub { $_[0]->valid });
      } else {
        # If there are no more messages, we return an empty enum, this is
        # another handy use-case for nested enums.
        $self->yield(Data::Enumerable::Lazy->empty);
      }
    },
  });

  while (my $message = $kafka_enum->next) {
    # handle the message
  }

=cut

=head1 INSTALLATION

To install this module type the following:
  perl Makefile.PL
  make
  make test
  make install

=cut

use Carp;
use List::Util;



( run in 1.604 second using v1.01-cache-2.11-cpan-df04353d9ac )