REST-Neo4p

 view release on metacpan or  search on metacpan

lib/REST/Neo4p/Batch.pm  view on Meta::CPAN

    REST::Neo4p::NotSuppException->throw("Batch mode not supported on Neo4j server v4.0+");
  }
  my @errors;
  REST::Neo4p::CommException->throw("Not connected\n") unless REST::Neo4p->connected;
  warn 'Agent already in batch_mode on batch() call' if ($agent->batch_mode);
  REST::Neo4p::LocalException->throw("batch requires argument 'keep_objs' or 'discard_objs'\n") unless ($action && grep(/^$action$/,qw/keep_objs discard_objs/));
  $agent->batch_mode(1);
  $coderef->();
  my $tmpfh = $agent->execute_batch_chunk;
  my $jsonr = JSON::XS->new->utf8;
  my $buf;
  $tmpfh->read($buf, $BUFSIZE);
  $jsonr->incr_parse($buf);
  my $res = j_parse($jsonr);
  die "j_parse: expecting BATCH stream" unless ($res->[0] eq 'BATCH');
  my $str = $res->[1]->();
  while (my $obj = drop($str)) {
    use experimental qw/smartmatch/;
    $obj = $obj->[1];
    given ($obj) {
      when (!!ref($obj)) {
	if ($obj->{status} !~ m/^2../) {
	  warn "Error at id ".$obj->{id}." from ".$obj->{from}.": status ".$obj->{status} if $REST::Neo4p::VERBOSE;
	  push @errors, REST::Neo4p::Neo4jException->new(
	    code=>$obj->{status},
	    message => 'Server returned '.$obj->{status}.' at job id '.$obj->{id}.' from '.$obj->{from}, neo4j_message=>$obj->{message}
	   );
	}
	elsif (!$obj->{status}) {
	  $obj->{status} = 599;
	  warn "Error at id ".$obj->{id}." from ".$obj->{from}.": status ".$obj->{status} if $REST::Neo4p::VERBOSE;
	  push @errors, REST::Neo4p::Neo4jException->new(
	    code=>$obj->{status},
	    message => 'Server returned no status at job id '.$obj->{id}.' from '.$obj->{from}, neo4j_message=>$obj->{message}
	   );
	}
	else {
	  _register_object($obj) if $action eq 'keep_objs';
	}
      }
      when ('PENDING') {
	$tmpfh->read($buf,$BUFSIZE);
	$jsonr->incr_parse($buf)
      }
      when (!defined) {
	last;
      }
      default {
	die "j_parse: batch response ended prematurely";
      }
    }

  }
  $agent->batch_mode(undef);
  return @errors;
}

# create new nodes, relationships as they are encountered
#
# TODO: handling indexes, queries? Prevent queries in batch mode?
# TODO: use JSON streaming from file

sub _register_object {
  my $decoded_batch_resp = shift;
  my ($id, $from, $body) = @{$decoded_batch_resp}{qw(id from body)};
  return unless $body;
  return if ($decoded_batch_resp->{status} !~ m/^2../); # ignore an error here
  my $obj;
  if ($body->{template}) {
    $obj = REST::Neo4p::Index->new_from_json_response($body);
  }
  elsif ($body->{from} and $body->{from} =~ /properties/) {
    1; # ignore
  }
  elsif ($body->{self} and $body->{self} =~ m|node/[0-9]+$|) {
    $obj = REST::Neo4p::Node->new_from_json_response($body);
  }
  elsif ($body->{self} and $body->{self} =~ m|relationship/[0-9]+$|) {
    $obj = REST::Neo4p::Relationship->new_from_json_response($body);
  }
  else {
    warn "Don't understand object in batch response: id ".$id if $REST::Neo4p::VERBOSE;
  }
  if ($obj) {
    my $batch_objs = $REST::Neo4p::Entity::ENTITY_TABLE->{batch_objs};
    if ( my $batch_obj = delete $batch_objs->{ "{$id}" } ) {
      $$batch_obj = $$obj;
    }
  }
  return;
}

=head1 NAME

REST::Neo4p::Batch - Mixin for batch processing

=head1 SYNOPSIS

 use REST::Neo4p;
 use REST::Neo4p::Batch;
 use List::MoreUtils qw(pairwise);

 my @bunch = map { "new_node_$_" } (1..100);
 my @nodes;
 batch {
  my $idx = REST::Neo4p::Index->new('node','bunch');
  @nodes = map { REST::Neo4p::Node->new({name => $_}) } @bunch;
  pairwise { $idx->add_entry($a, name => $b) } @nodes, @bunch;
  $nodes[$_]->relate_to($nodes[$_+1],'next_node') for (0..$#nodes-1);
 } 'keep_objs';

 $idx = REST::Neo4p->get_index_by_name('node','bunch');
 ($the_99th_node) = $nodes[98];
 ($points_to_100th_node) = $the_99th_node->get_outgoing_relationships;
 ($the_100th_node) = $idx->find_entries( name => 'new_node_100');


=head1 DESCRIPTION

REST::Neo4p::Batch adds some syntactic sugar allowing ordinary
REST::Neo4p code to be processed through the Neo4j REST batch API.



( run in 1.581 second using v1.01-cache-2.11-cpan-39bf76dae61 )