AnyEvent-Riak

 view release on metacpan or  search on metacpan

lib/AnyEvent/Riak.pm  view on Meta::CPAN

  $command =~ s/.*://;

  my ($request_name, $request_code, $response_name, $response_code) = _command_to_req($command);
  my $method = sub { shift->_run_cmd($request_name, $request_code, $response_name, $response_code, @_) };

  # Save this method for future calls
  no strict 'refs';
  *$AUTOLOAD = $method;

  goto $method;
}

sub _command_to_req {
    my ($command) = @_;
    my $request_name  = 'Rpb' . ucfirst(_to_camel($command)) . 'Req';
    my $response_name = 'Rpb' . ucfirst(_to_camel($command)) . 'Resp';
    my $request_code  = $message_codes->{$request_name};
    my $response_code = $message_codes->{$response_name};
    defined $request_code && defined $response_code
      or croak "unknown method '$command'";
    return $request_name, $request_code, $response_name, $response_code;
}

sub _run_cmd {
    my $callback = pop;
    defined $callback && ref($callback) eq 'CODE'
      or croak "last parameter must be a CoderRef callback";
    my ( $self, $request_name, $request_code, $response_name, $expected_response_code, $args ) = @_;

    my $body = '';
    if (defined $args) {
        eval { $body = "$request_name"->encode($args); 1 }
          or return $callback->(undef, { error_code => -1, error_message => $@ });
    }

    my $handle = $self->_handle;
    $handle->on_error(sub {
        my ($handle, $fatal, $message) = @_;
        $fatal or $handle->destroy(); # force destroy even if non fatal
        $callback->(undef, { error_code => $!,
                             error_message => $message }) });

    $handle->timeout_reset;
    $handle->timeout($self->timeout);
    $handle->on_timeout(sub { $callback->(undef, { error_code => -1,
                                                   error_message => 'timeout' }) });
    $handle->push_write(  pack('N', bytes::length($body) + 1)
                        . pack('c', $request_code) . $body
                       );
    $handle->timeout_reset;

    $handle->push_read( chunk => 4, sub {
         my $len = unpack "N", $_[1];
         $_[0]->timeout_reset;
         $_[0]->unshift_read( chunk => $len, sub {
             $_[0]->timeout_reset;
             $_[0]->timeout(0);
             my ( $response_code, $response_body ) = unpack( 'c a*', $_[1] );

             if ($response_code == $message_codes->{RpbErrorResp}) {
                 my $decoded_message = RpbErrorResp->decode($response_body);
                 return $callback->(undef, { error_code => $decoded_message->errcode,
                                             error_message => $decoded_message->errmsg });
             }
             if ($response_code != $expected_response_code) {
                 return $callback->(undef, {
                   error_code => -2,
                   error_message =>   "wrong response (got: '$response_code', "
                                    . "expected: '$expected_response_code')" });
             }

             # my ($ret, $more_to_come) = ( 1, );

             my $result;
             if ($response_name) {
                 $result = $response_name->decode($response_body);
                 ref($result) eq $response_name
                   or return $callback->(undef, {
                     error_code => -2,
                     error_message =>   "wrong response (got: '" . ref($result) . "', "
                                      . "expected: '$response_name')" });
             } else {
                 $result = 1;
             }
             return $callback->($result);
         });
     });
    $handle->timeout_reset;
    return;
}

sub _to_camel {
    my ($str) = @_;
    $str =~ s/_([a-z])/uc($1)/ge;
    return $str;
}



# Now, some methods that are not generic

sub set_bucket {
    my ($request_name, $request_code, $response_name, $response_code) = _command_to_req('set_bucket');
    shift->_run_cmd($request_name, $request_code, undef, $response_code, @_);
}

sub reset_bucket {
    my ($request_name, $request_code, $response_name, $response_code) = _command_to_req('reset_bucket');
    shift->_run_cmd($request_name, $request_code, undef, $response_code, @_);
}

sub get_bucket_type {
    my $request_name = 'RpbGetBucketTypeReq';
    my $response_name = 'RpbGetBucketResp';
    shift->_run_cmd($request_name, $message_codes->{$request_name},
                    $response_name, $message_codes->{$response_name}, @_);
}

sub set_bucket_type {
    croak "not implemented yet";
}




( run in 1.675 second using v1.01-cache-2.11-cpan-e93a5daba3e )