AnyEvent-Riak
view release on metacpan or search on metacpan
lib/AnyEvent/Riak.pm view on Meta::CPAN
$command =~ s/.*://;
my ($request_name, $request_code, $response_name, $response_code) = _command_to_req($command);
my $method = sub { shift->_run_cmd($request_name, $request_code, $response_name, $response_code, @_) };
# Save this method for future calls
no strict 'refs';
*$AUTOLOAD = $method;
goto $method;
}
sub _command_to_req {
my ($command) = @_;
my $request_name = 'Rpb' . ucfirst(_to_camel($command)) . 'Req';
my $response_name = 'Rpb' . ucfirst(_to_camel($command)) . 'Resp';
my $request_code = $message_codes->{$request_name};
my $response_code = $message_codes->{$response_name};
defined $request_code && defined $response_code
or croak "unknown method '$command'";
return $request_name, $request_code, $response_name, $response_code;
}
sub _run_cmd {
my $callback = pop;
defined $callback && ref($callback) eq 'CODE'
or croak "last parameter must be a CoderRef callback";
my ( $self, $request_name, $request_code, $response_name, $expected_response_code, $args ) = @_;
my $body = '';
if (defined $args) {
eval { $body = "$request_name"->encode($args); 1 }
or return $callback->(undef, { error_code => -1, error_message => $@ });
}
my $handle = $self->_handle;
$handle->on_error(sub {
my ($handle, $fatal, $message) = @_;
$fatal or $handle->destroy(); # force destroy even if non fatal
$callback->(undef, { error_code => $!,
error_message => $message }) });
$handle->timeout_reset;
$handle->timeout($self->timeout);
$handle->on_timeout(sub { $callback->(undef, { error_code => -1,
error_message => 'timeout' }) });
$handle->push_write( pack('N', bytes::length($body) + 1)
. pack('c', $request_code) . $body
);
$handle->timeout_reset;
$handle->push_read( chunk => 4, sub {
my $len = unpack "N", $_[1];
$_[0]->timeout_reset;
$_[0]->unshift_read( chunk => $len, sub {
$_[0]->timeout_reset;
$_[0]->timeout(0);
my ( $response_code, $response_body ) = unpack( 'c a*', $_[1] );
if ($response_code == $message_codes->{RpbErrorResp}) {
my $decoded_message = RpbErrorResp->decode($response_body);
return $callback->(undef, { error_code => $decoded_message->errcode,
error_message => $decoded_message->errmsg });
}
if ($response_code != $expected_response_code) {
return $callback->(undef, {
error_code => -2,
error_message => "wrong response (got: '$response_code', "
. "expected: '$expected_response_code')" });
}
# my ($ret, $more_to_come) = ( 1, );
my $result;
if ($response_name) {
$result = $response_name->decode($response_body);
ref($result) eq $response_name
or return $callback->(undef, {
error_code => -2,
error_message => "wrong response (got: '" . ref($result) . "', "
. "expected: '$response_name')" });
} else {
$result = 1;
}
return $callback->($result);
});
});
$handle->timeout_reset;
return;
}
sub _to_camel {
my ($str) = @_;
$str =~ s/_([a-z])/uc($1)/ge;
return $str;
}
# Now, some methods that are not generic
sub set_bucket {
my ($request_name, $request_code, $response_name, $response_code) = _command_to_req('set_bucket');
shift->_run_cmd($request_name, $request_code, undef, $response_code, @_);
}
sub reset_bucket {
my ($request_name, $request_code, $response_name, $response_code) = _command_to_req('reset_bucket');
shift->_run_cmd($request_name, $request_code, undef, $response_code, @_);
}
sub get_bucket_type {
my $request_name = 'RpbGetBucketTypeReq';
my $response_name = 'RpbGetBucketResp';
shift->_run_cmd($request_name, $message_codes->{$request_name},
$response_name, $message_codes->{$response_name}, @_);
}
sub set_bucket_type {
croak "not implemented yet";
}
( run in 1.675 second using v1.01-cache-2.11-cpan-e93a5daba3e )