Data-Enumerable-Lazy

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

      use DBI;
      my $dbh = setup_dbh(); # Some config

      my $last_id = -1;
      my $limit = 10;
      my $offset = 0;
      my $tweet_enum = Data::Enumerable::Lazy->new({
        on_has_next => sub {
          my $sth = $dbh->prepare('SELECT count(1) from Tweets where id > ?');
          $sth->execute($last_id);
          my ($cnt) = $sth->fetchrow_array;
          return int($cnt) > 0;
        },
        on_next => sub {
          my ($self) = @_;
          my $sth = $dbh->prepare('SELECT * from Tweets ORDER BY id LIMIT ? OFFSET ?');
          $sth->execute($lmit, $offset);
          $offset += $limit;
          my @tweets = $sth->fetchrow_array;
          $last_id = $tweets[-1]->{id};
          $self->yield(Data::Enumerable::Lazy->from_list(@tweets));
        },
        is_finite => 1,
      });

      while ($tweet_enum->has_next) {
        my $tweet = $tweet_enum->next;
        # do something with this tweet
      }

    In this example a tweet consumer is abstracted from any DBI bookkeeping
    and consumes tweet entries one by one without any prior knowledge about
    the table size and might work on a rapidly growing dataset.

    In order to reduce the number of queries, we query the data in batches
    by 10 elements max.

  Redis queue consumer
      use Redis;

      my $redis = Redis->new;
      my $queue_enum = Data::Enumerable::Lazy->new({
        on_has_next => sub { 1 },
        on_next => sub {
          # Blocking right POP
          $redis->brpop();
        },
      });

      while (my $queue_item = $queue_enum->next) {
        # do something with the queue item
      }

    In this example the client is blocked until there is an element
    available in the queue, but it's hidden away from the clients who
    consume the data item by item.

  Kafka example
    Kafka consumer wrapper is another example of a lazy calculation
    application. Lazy enumerables are very naturally co-operated with
    streaming data, like Kafka. In this example we're fetching batches of
    messages from Kafka topic, grep out corrupted ones and proceed with the
    mesages.

      use Kafka qw($DEFAULT_MAX_BYTES);
      use Kafka::Connection;
      use Kafka::Consumer;

      my $kafka_consumer = Kafka::Consumer->new(
        Connection => Kafka::Connection->new( host => 'localhost', ),
      );

      my $partition = 0;
      my $offset = 0;
      my $kafka_enum = Data::Enumerable::Lazy->new({
        on_has_next => sub { 1 },
        on_next => sub {
          my ($self) = @_;
          # Fetch messages in batch
          my $messages = $kafka_consumer->fetch({
            'topic',
            $partition,
            $offset,
            $DEFAULT_MAX_BYTES
          });
          if ($messages) {
            # Note the grep function applied: we're filtering away corrupted messages
            $self->yield(Data::Enumerable::Lazy->from_list(@$messages))->grep(sub { $_[0]->valid });
          } else {
            # If there are no more messages, we return an empty enum, this is
            # another handy use-case for nested enums.
            $self->yield(Data::Enumerable::Lazy->empty);
          }
        },
      });

      while (my $message = $kafka_enum->next) {
        # handle the message
      }

INSTALLATION
    To install this module type the following: perl Makefile.PL make make
    test make install

OPTIONS
  on_next($self, $element) :: CodeRef -> Data::Enumerable::Lazy | Any
    `on_next' is a code ref, a callback which is being called every time the
    generator is in demand for a new bit of data. Enumerable buffers up the
    result of the previous calculation and if there are no more elements
    left in the buffer, `on_next()' would be called.

    `$element' is defined when the current collection is a contuniation of
    another enumerable. I.e.:

      my $enum = Data::Enumerable::Lazy->from_list(1, 2, 3);
      my $enum2 = $enum->continue({
        on_next => sub { my ($self, $i) = @_; $self->yield($i * $i) }
      });
      $enum2->to_list; # generates 1, 4, 9

    In this case $i would be defined and it comes from the original



( run in 2.115 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )