Arcus-Client

 view release on metacpan or  search on metacpan

lib/Arcus/Client.pm  view on Meta::CPAN

    }
    return $key;
  }
}

my $ENCODE = sub {
  my ($conf, $value, $flags) = @_;
  # serialization
  if (ref $value) {
    $value = eval { $conf->{serialize_methods}->[0]->($value) };
    if ($@) {
      warn "failed to serialize.";
    } else {
      $flags |= F_STORABLE;
    }
  }
  # compression
  if (defined($value) and
      defined($conf->{compress_methods}) and
      $conf->{compress_threshold} > 0 and
      $conf->{compress_threshold} <= length($value)) {
    my $c_val = eval { $conf->{compress_methods}->[0]->($value) };
    if ($@) {
      warn "failed to compress.";
    } elsif (length($c_val) < length($value) * $conf->{compress_ratio}) {
      $value = $c_val;
      $flags |= F_COMPRESS;
    }
  }
  return ($value, $flags);
};

my $DECODE = sub {
  my ($conf, $value, $flags) = @_;
  if (defined($flags)) {
    if (defined($value) && ($flags & F_COMPRESS)) {
      $value = eval { $conf->{compress_methods}->[1]->($value) };
      warn "failed to decompress." if ($@);
    }
    if (defined($value) && ($flags & F_STORABLE)) {
      $value = eval { $conf->{serialize_methods}->[1]->($value) };
      warn "failed to deserialize." if ($@);
    }
  }
  return ($value);
};

sub new {
  my ($class, $args) = @_;
  $args->{connect_timeout} //= 1.0; # second
  $args->{io_timeout}      //= 0.8; # second
  $args->{max_thread} //= 64;
  $args->{nowait} = 0; # TODO: Fix a C Client's noreply flags (issue: #3)
  $args->{hash_namespace} = 1;
  $args->{namespace} //= "";
  $args->{serialize_methods} //= [ \&Storable::nfreeze, \&Storable::thaw ];
  $args->{compress_threshold} //= -1;
  $args->{compress_ratio} //= 0.8;
  $args->{compress_methods} //= [ \&Compress::Zlib::memGzip,
                                  \&Compress::Zlib::memGunzip ] if $HAVE_ZLIB;
  my $arcus = $class->SUPER::new($args);
  bless($arcus, $class);
  $arcus_info{$$arcus} = $args;
  POSIX::AtFork->add_to_child(sub {
    $arcus->connect_proxy();
  });
  return $arcus;
}

sub DESTROY {
  my $arcus = shift;
  $arcus->SUPER::DESTROY;
  if ($arcus_info{$$arcus}) {
    delete $arcus_info{$$arcus};
  }
}

sub CLONE {
  my $class = shift;
  foreach my $arcus (keys %arcus_info) {
    $class->SUPER::new($arcus);
  }
}

for my $method ( qw/set add replace/ ) {
  no strict 'refs';
  my $super = 'SUPER::'.$method;
  *{$method} = sub {
    my ($arcus, $key, $value, $exptime) = @_;
    my ($conf, $flags) = ($arcus_info{$$arcus}, 0);
    return undef unless $conf;
    $key = $SANITIZE->($key);
    ($value, $flags) = $ENCODE->($conf, $value, $flags);
    return $arcus->$super($key, $value, $exptime, $flags);
  };
}

sub cas {
  my ($arcus, $key, $cas, $value, $exptime) = @_;
  my ($conf, $flags) = ($arcus_info{$$arcus}, 0);
  return undef unless $conf;
  $key = $SANITIZE->($key);
  ($value, $flags) = $ENCODE->($conf, $value, $flags);
  return $arcus->SUPER::cas($key, $cas, $value, $exptime, $flags);
}

sub cas_multi {
  my ($arcus, @kvs) = @_;
  my $ctx = wantarray;
  my $conf = $arcus_info{$$arcus};
  return undef unless $conf;
  my (@skvs, @ref);
  foreach my $elem (@kvs) {
    my ($key, $cas, $value) = @{$elem}[0, 1, 2];
    my $flags = 0;
    my $exptime = $elem->[3] ? $elem->[3] : 0;
    $key = $SANITIZE->($key);
    ($value, $flags) = $ENCODE->($conf, $value, $flags);
    push(@skvs, [$key, $cas, $value, $exptime, $flags]);
  }
  @ref = $arcus->SUPER::cas_multi(@skvs);
  return unless defined($ctx);
  return @ref if $ctx;
  my %href;
  foreach my $index (0..$#kvs) {
    $href{$kvs[$index]->[0]} = $ref[$index] if defined($ref[$index]);
  }
  return \%href;
}

for my $method ( qw/set_multi add_multi replace_multi/ ) {
  no strict 'refs';
  my $super = 'SUPER::'.$method;
  *{$method} = sub {
    my ($arcus, @kvs) = @_;
    my $ctx = wantarray;
    my $conf = $arcus_info{$$arcus};
    return undef unless $conf;
    my (@skvs, @ref);
    foreach my $elem (@kvs) {
      my ($key, $value) = @{$elem}[0, 1];
      my $flags = 0;
      my $exptime = $elem->[2] ? $elem->[2] : 0;
      $key = $SANITIZE->($key);
      ($value, $flags) = $ENCODE->($conf, $value, $flags);
      push(@skvs, [$key, $value, $exptime, $flags]);
    }
    @ref = $arcus->$super(@skvs);
    return unless defined($ctx);
    return @ref if $ctx;
    my %href;
    foreach my $index (0..$#kvs) {
      $href{$kvs[$index]->[0]} = $ref[$index] if defined($ref[$index]);
    }
    return \%href;
  }
}

for my $method ( qw/append prepend/ ) {
  no strict 'refs';
  my $super = 'SUPER::'.$method;
  *{$method} = sub {
    my ($arcus, $key, $value) = @_;
    my $conf = $arcus_info{$$arcus};
    return undef unless $conf;
    $key = $SANITIZE->($key);
    return $arcus->$super($key, $value);
  };
}

for my $method ( qw/append_multi prepend_multi/ ) {
  no strict 'refs';
  my $super = 'SUPER::'.$method;
  *{$method} = sub {
    my ($arcus, @kvs) = @_;
    my $ctx = wantarray;
    my $conf = $arcus_info{$$arcus};
    return undef unless $conf;
    my (@skvs, @ref);
    foreach my $elem (@kvs) {
      my ($key, $value) = @{$elem}[0, 1];
      my $flags = 0;
      my $exptime = $elem->[2] ? $elem->[2] : 0;
      $key = $SANITIZE->($key);
      push(@skvs, [$key, $value, $exptime, $flags]);
    }
    @ref = $arcus->$super(@skvs);
    return unless defined($ctx);
    return @ref if $ctx;
    my %href;
    foreach my $index (0..$#kvs) {
      $href{$kvs[$index]->[0]} = $ref[$index] if defined($ref[$index]);
    }
    return \%href;
  }
}

for my $method ( qw/incr decr/ ) {
  no strict 'refs';
  my $super = 'SUPER::'.$method;
  *{$method} = sub {
    my ($arcus, $key, $offset) = @_;
    my $conf = $arcus_info{$$arcus};
    return undef unless $conf;
    $key = $SANITIZE->($key);
    return $arcus->$super($key, $offset);
  };
}

#for my $method ( qw/incr_multi decr_multi/ ) {
#  no strict 'refs';
#  my $super = substr('SUPER::'.$method, 0, -6);
#  *{$method} = sub {
#    my ($arcus, @arr) = @_;
#    my $ctx = wantarray;
#    my (@ref, @keys);
#    my $conf = $arcus_info{$$arcus};
#    return undef unless $conf;
#    foreach my $elem (@arr) {
#      my ($key, $offset);
#      if (ref $elem) {
#        ($key, $offset) = @{$elem}[0, 1];
#      }
#      else {
#        $key = $elem;
#      }
#      $key = $SANITIZE->($key);
#      push(@ref, $arcus->$super($key, $offset));
#      push(@keys, $key);
#    }
#    return unless defined($ctx);
#    return @ref if $ctx;
#    my %href = map { $keys[$_] => $ref[$_] } 0..$#arr;
#    return \%href;
#  }
#}

sub get {
  my ($arcus, $key) = @_;
  my $conf = $arcus_info{$$arcus};
  return undef unless $conf and defined($key);
  $key = $SANITIZE->($key);
  my ($value, $flags) = $arcus->SUPER::get($key);
  return undef unless defined($value) and defined($flags);
  ($value) = $DECODE->($conf, $value, $flags);
  return $value;
}

sub get_multi {
  my ($arcus, @keys) = @_;
  my $conf = $arcus_info{$$arcus};
  return undef unless $conf;
  my @skeys;
  my %kmap;
  for my $key (@keys) {
    my $skey = $SANITIZE->($key) if defined($key);
    next unless defined($skey);

    push(@skeys, $skey);
    $kmap{$skey} = $key;
  }
  my $result = $arcus->SUPER::get_multi(@skeys);
  my %href;
  while (my ($key, $arr) = each %{$result}) {
    my ($value, $flags) = @{$arr}[0, 1];
    ($value) = $DECODE->($conf, $value, $flags);
    $href{$kmap{$key}} = $value if defined($value);
  }
  return \%href;
}

sub gets {
  my ($arcus, $key) = @_;
  my $conf = $arcus_info{$$arcus};
  return undef unless $conf and defined($key);
  $key = $SANITIZE->($key);
  my ($cas, $value, $flags) = $arcus->SUPER::gets($key);
  return undef unless defined($cas) and defined($value) and defined($flags);
  ($value) = $DECODE->($conf, $value, $flags);
  return [$cas, $value];
}

sub gets_multi {
  my ($arcus, @keys) = @_;
  my $conf = $arcus_info{$$arcus};
  return undef unless $conf;
  my @skeys;
  my %kmap;
  for my $key (@keys) {
    my $skey = $SANITIZE->($key) if defined($key);
    next unless defined($skey);

    push(@skeys, $skey);
    $kmap{$skey} = $key;
  }
  my $result = $arcus->SUPER::gets_multi(@skeys);
  my %href;
  while (my ($key, $arr) = each %{$result}) {
    my ($cas, $value, $flags) = @{$arr}[0, 1, 2];
    ($value) = $DECODE->($conf, $value, $flags);
    $href{$kmap{$key}} = [$cas, $value] if defined($cas) and defined($value);
  }
  return \%href;
}

sub get_or_set {
  my($arcus, $key, $callback, $expire) = @_;
  my $value = $arcus->get($key);
  unless (defined($value)) {
    my $ret_expire;
    ($value, $ret_expire) = $callback->();
    $arcus->set($key, $value, $expire || $ret_expire);
  }
  return $value;
}

sub delete {
  my ($arcus, $key) = @_;
  my $conf = $arcus_info{$$arcus};
  return undef unless $conf;
  $key = $SANITIZE->($key);
  return $arcus->SUPER::delete($key);
}

#sub delete_multi {
#  my ($arcus, @keys) = @_;
#  my $ctx = wantarray;
#  my @ref;
#  my $conf = $arcus_info{$$arcus};
#  return undef unless $conf;
#  foreach my $key (@keys) {
#    $key = $SANITIZE->($key);
#    push(@ref, $arcus->SUPER::delete($key));
#  }
#  return unless defined($ctx);
#  return @ref if $ctx;
#  my %href = map { $keys[$_] => $ref[$_] } 0..$#keys;
#  return \%href;
#}

# This is necessary to use plack framework
sub remove {
  my ($self, $key, $exptime) = @_;
  return $self->delete($key, $exptime);
}

1;
__END__

=head1 NAME

Arcus::Client - Perl client for arcus cache cluster

=head1 SYNOPSIS

  use Arcus::Client;

  my $cache = Arcus::Client->new({
    zk_address => [ "localhost:2181", "localhost:2182", "localhost:2183" ],
    service_code => "test",
    namespace => "my:",
    connect_timeout => 1.5,
    io_timeout => 0.7,
    compress_threshold => 100_000,
    compress_ratio => 0.9,
    compress_methods => [ \&IO::Compress::Gzip::gzip,
                          \&IO::Uncompress::Gunzip::gunzip ],
    serialize_methods => [ \&Storable::freeze, \&Storable::thaw ],
  });

  # Get server versions
  my $versions = $cache->server_versions;
  while (my ($server, $version) = each %$versions) {
    #...
  }

  # Store scalar values
  $cache->add("sadd1", "v1");
  $cache->add_multi(["saddr2", "v2"], ["sadd3", "v3", 100]);

  $cache->set("sset1", "v1");
  $cache->set_multi(["sset2", "v2"], ["sset3", "v3", 10]);

  $cache->replace("sset1", 10);
  $cache->replace_multi(["sset2", "r2"],["sset3", "r3"]);

  # Store arbitrary Perl data structures
  $cache->set("hset1", {a => 1, b => 2});
  $cache->set_multi(["hset2", {c => 3}], ["lset1", [0, 1, 2]]);

  # Append/Prepend to values
  $cache->prepend("sadd1", "pre1");
  $cache->prepend_multi(["sadd2", "pre2"], ["sadd3", "pre3"]);



( run in 0.596 second using v1.01-cache-2.11-cpan-98e64b0badf )