Data-Riak-Fast

 view release on metacpan or  search on metacpan

lib/Data/Riak/Fast/MapReduce.pm  view on Meta::CPAN

                  for(var i in v) {
                    for(var w in v[i]) {
                      if(w in r) r[w] += v[i][w];
                      else r[w] = v[i][w];
                    }
                  }
                  return [r];
                }
                ",
            ),
        ]
    });

    my $results = $mr->mapreduce;

=head2 inputs

Inputs to this query.  There are few allowable forms.

For a single bucket:

  inputs => "bucketname"

For a bucket and key (or many!):

  inputs => [ [ "bucketname", "keyname" ] ]

  inputs => [ [ "bucketname", "keyname" ], [ "bucketname", "keyname2" ] ]
  
And finally:

  inputs => [ [ "bucketname", "keyname", "keyData" ] ]

=cut

has inputs => (
    is => 'ro',
    isa => 'ArrayRef | Str | HashRef',
    required => 1
);

=head2 phases

An arrayref of phases that will be executed in order.  The phases should be
one of L<Data::Riak::Fast::MapReduce::Phase::Link>,
L<Data::Riak::Fast::MapReduce::Phase::Map>, or L<Data::Riak::Fast::MapReduce::Phase::Reduce>.

=cut

has phases => (
    is => 'ro',
    isa => 'ArrayRef[Data::Riak::Fast::MapReduce::Phase]',
    required => 1
);

=head1 METHOD
=head2 mapreduce

Execute the mapreduce query.

To enable streaming, do the following:

    my $results = $mr->mapreduce(chunked => 1);

=cut

sub mapreduce {
    my ($self, %options) = @_;
  
    return $self->riak->send_request({
        content_type => 'application/json',
        method => 'POST',
        uri => 'mapred',
        data => encode_json({
            inputs => $self->inputs,
            query => [ map { { $_->phase => $_->pack } } @{ $self->phases } ]
        }),
        ($options{'chunked'}
            ? (query => { chunked => 'true' })
            : ()),
    });
}

__PACKAGE__->meta->make_immutable;
no Mouse;

1;

__END__



( run in 0.702 second using v1.01-cache-2.11-cpan-524268b4103 )