Mojo-Redis

 view release on metacpan or  search on metacpan

lib/Mojo/Redis/Connection.pm  view on Meta::CPAN

package Mojo::Redis::Connection;
use Mojo::Base 'Mojo::EventEmitter';

use File::Spec::Functions 'file_name_is_absolute';
use Mojo::IOLoop;
use Mojo::Promise;

use constant DEBUG                     => $ENV{MOJO_REDIS_DEBUG};
use constant CONNECT_TIMEOUT           => $ENV{MOJO_REDIS_CONNECT_TIMEOUT}           || 10;
use constant SENTINELS_CONNECT_TIMEOUT => $ENV{MOJO_REDIS_SENTINELS_CONNECT_TIMEOUT} || CONNECT_TIMEOUT;

has encoding => sub { Carp::confess('encoding is required in constructor') };
has ioloop   => sub { Carp::confess('ioloop is required in constructor') };
has protocol => sub { Carp::confess('protocol is required in constructor') };
has url      => sub { Carp::confess('url is required in constructor') };

sub DESTROY {
  my $self = shift;
  $self->disconnect if defined $self->{pid} and $self->{pid} == $$;
}

sub disconnect {
  my $self = shift;
  $self->_reject_queue;
  $self->{stream}->close if $self->{stream};
  $self->{gone_away} = 1;
  return $self;
}

sub is_connected { $_[0]->{stream} && !$_[0]->{gone_away} ? 1 : 0 }

sub write {
  my $self = shift;
  push @{$self->{write}}, [$self->_encode(@_)];
  $self->is_connected ? $self->_write : $self->_connect;
  return $self;
}

sub write_p {
  my $self = shift;
  my $p    = Mojo::Promise->new->ioloop($self->ioloop);
  push @{$self->{write}}, [$self->_encode(@_), $p];
  $self->is_connected ? $self->_write : $self->_connect;
  return $p;
}

sub _connect {
  my $self = shift;
  return $self if $self->{id};    # Connecting

  # Cannot reuse a connection because of transaction state and other state
  return $self->_reject_queue('Redis server has gone away') if $self->{gone_away};

  my $url = $self->{master_url} || $self->url;
  return $self->_discover_master if !$self->{master_url} and $url->query->param('sentinel');

  Scalar::Util::weaken($self);
  delete $self->{master_url};     # Make sure we forget master_url so we can reconnect
  $self->protocol->on_message($self->_parse_message_cb);
  $self->{id} = $self->ioloop->client(
    $self->_connect_args($url, {port => 6379, timeout => CONNECT_TIMEOUT}),
    sub {
      return unless $self;
      my ($loop, $err, $stream) = @_;
      my $close_cb = $self->_on_close_cb;
      return $self->$close_cb($err) if $err;

      $stream->timeout(0);
      $stream->on(close => $close_cb);
      $stream->on(error => $close_cb);
      $stream->on(read  => $self->_on_read_cb);

      unshift @{$self->{write}}, [$self->_encode(SELECT => $url->path->[0])] if length $url->path->[0];
      unshift @{$self->{write}}, [$self->_encode(AUTH   => $url->password)]  if length $url->password;
      $self->{pid}    = $$;
      $self->{stream} = $stream;
      $self->emit('connect');
      $self->_write;
    }
  );

  warn "[@{[$self->_id]}] CONNECTING $url (blocking=@{[$self->_is_blocking]})\n" if DEBUG;
  return $self;
}

sub _connect_args {
  my ($self, $url, $defaults) = @_;
  my %args = (address => $url->host || 'localhost');

  if (file_name_is_absolute $args{address}) {
    $args{path} = delete $args{address};
  }
  else {
    $args{port} = $url->port || $defaults->{port};
  }

  $args{timeout} = $defaults->{timeout} || CONNECT_TIMEOUT;
  return \%args;
}

sub _discover_master {
  my $self      = shift;
  my $url       = $self->url->clone;
  my $sentinels = $url->query->every_param('sentinel');
  my $timeout   = $url->query->param('sentinel_connect_timeout') || SENTINELS_CONNECT_TIMEOUT;

  $url->host_port(shift @$sentinels);
  $self->url->query->param(sentinel => [@$sentinels, $url->host_port]);    # Round-robin sentinel list
  $self->protocol->on_message($self->_parse_message_cb);
  $self->{id} = $self->ioloop->client(
    $self->_connect_args($url, {port => 16379, timeout => $timeout}),
    sub {
      my ($loop, $err, $stream) = @_;
      return unless $self;
      return $self->_discover_master if $err;

      $stream->timeout(0);
      $stream->on(close => sub { $self->_discover_master unless $self->{master_url} });
      $stream->on(error => sub { $self->_discover_master });
      $stream->on(read  => $self->_on_read_cb);

      $self->{stream} = $stream;
      my $p = Mojo::Promise->new;
      unshift @{$self->{write}}, undef;    # prevent _write() from writing commands
      unshift @{$self->{write}}, [$self->_encode(SENTINEL => 'get-master-addr-by-name', $self->url->host), $p];
      unshift @{$self->{write}}, [$self->_encode(AUTH     => $url->password)] if length $url->password;

      $self->{write_lock} = 1;
      $p->then(
        sub {
          my $host_port = shift;
          delete $self->{id};
          delete $self->{write_lock};
          return $self->_discover_master unless ref $host_port and @$host_port == 2;
          $self->{master_url} = $self->url->clone->host($host_port->[0])->port($host_port->[1]);
          $self->{stream}->close;
          $self->_connect;
        },
        sub { $self->_discover_master },
      );

      $self->_write;
    }
  );

  warn "[@{[$self->_id]}] SENTINEL DISCOVERY $url (blocking=@{[$self->_is_blocking]})\n" if DEBUG;
  return $self;
}

sub _encode {
  my $self     = shift;
  my $encoding = $self->encoding;
  return $self->protocol->encode({
    type => '*', data => [map { +{type => '$', data => $encoding ? Mojo::Util::encode($encoding, $_) : $_} } @_]
  });
}

sub _id { $_[0]->{id} || '0' }

sub _is_blocking { shift->ioloop eq Mojo::IOLoop->singleton ? 0 : 1 }

sub _on_close_cb {
  my $self = shift;

  Scalar::Util::weaken($self);
  return sub {
    return unless $self;
    my ($stream, $err) = @_;
    delete $self->{$_} for qw(id stream);
    $self->{gone_away} = 1;
    $self->_reject_queue($err);
    $self->emit('close')                                             if @_ == 1;
    warn qq([@{[$self->_id]}] @{[$err ? "ERROR $err" : "CLOSED"]}\n) if $self and DEBUG;
  };
}

sub _on_read_cb {
  my $self = shift;

  Scalar::Util::weaken($self);
  return sub {
    return unless $self;
    my ($stream, $chunk) = @_;
    do { local $_ = $chunk; s!\r\n!\\r\\n!g; warn "[@{[$self->_id]}] >>> ($_)\n" } if DEBUG;
    $self->protocol->parse($chunk);
  };
}

sub _parse_message_cb {
  my $self = shift;

  Scalar::Util::weaken($self);
  return sub {
    my ($protocol, @messages) = @_;
    my $encoding = $self->encoding;
    $self->_write unless $self->{write_lock};

    my $unpack = sub {
      my @res;

      while (my $m = shift @_) {
        if ($m->{type} eq '-') {
          return $m->{data}, undef;
        }
        elsif ($m->{type} eq ':') {
          push @res, 0 + $m->{data};
        }
        elsif ($m->{type} eq '*' and ref $m->{data} eq 'ARRAY') {
          my ($err, $res) = __SUB__->(@{$m->{data}});
          return $err if defined $err;
          push @res, $res;
        }

        # Only bulk string replies can contain binary-safe encoded data
        elsif ($m->{type} eq '$' and $encoding and defined $m->{data}) {
          push @res, Mojo::Util::decode($encoding, $m->{data});
        }
        else {
          push @res, $m->{data};
        }
      }

      return undef, \@res;
    };

    my ($err, $res) = $unpack->(@messages);
    my $p = shift @{$self->{waiting} || []};
    return $p ? $p->reject($err)       : $self->emit(error    => $err) unless $res;
    return $p ? $p->resolve($res->[0]) : $self->emit(response => $res->[0]);
  };
}

sub _reject_queue {
  my ($self, $err) = @_;
  state $default = 'Premature connection close';
  for my $p (@{delete $self->{waiting} || []}) { $p      and $p->reject($err      || $default) }
  for my $i (@{delete $self->{write}   || []}) { $i->[1] and $i->[1]->reject($err || $default) }
  return $self;
}

sub _write {
  my $self = shift;

  while (my $op = shift @{$self->{write}}) {
    my $loop = $self->ioloop;
    do { local $_ = $op->[0]; s!\r\n!\\r\\n!g; warn "[@{[$self->_id]}] <<< ($_)\n" } if DEBUG;
    push @{$self->{waiting}}, $op->[1];
    $self->{stream}->write($op->[0]);
  }
}

1;

=encoding utf8

=head1 NAME

Mojo::Redis::Connection - Low level connection class for talking to Redis

=head1 SYNOPSIS

  use Mojo::Redis::Connection;

  my $conn = Mojo::Redis::Connection->new(
               ioloop   => Mojo::IOLoop->singleton,
               protocol => Protocol::Redis::Faster->new(api => 1),
               url      => Mojo::URL->new("redis://localhost"),
             );

  $conn->write_p("GET some_key")->then(sub { print "some_key=$_[0]" })->wait;

=head1 DESCRIPTION

L<Mojo::Redis::Connection> is a low level driver for writing and reading data
from a Redis server.

You probably want to use L<Mojo::Redis> instead of this class.

=head1 EVENTS

=head2 close

  $cb = $conn->on(close => sub { my ($conn) = @_; });

Emitted when the connection to the redis server gets closed.

=head2 connect

  $cb = $conn->on(connect => sub { my ($conn) = @_; });

Emitted right after a connection is established to the Redis server, but
after the AUTH and SELECT commands are queued.

=head2 error

  $cb = $conn->on(error => sub { my ($conn, $error) = @_; });

Emitted if there's a connection error or the Redis server emits an error, and
there's not a promise to handle the message.

=head2 response

  $cb = $conn->on(response => sub { my ($conn, $res) = @_; });

Emitted when receiving a message from the Redis server.

=head1 ATTRIBUTES

=head2 encoding

  $str  = $conn->encoding;
  $conn = $conn->encoding("UTF-8");

Holds the character encoding to use for data from/to Redis. Set to C<undef>
to disable encoding/decoding data. Without an encoding set, Redis expects and
returns bytes. See also L<Mojo::Redis/encoding>.

=head2 ioloop

  $loop = $conn->ioloop;
  $conn = $conn->ioloop(Mojo::IOLoop->new);

Holds an instance of L<Mojo::IOLoop>.

=head2 protocol

  $protocol = $conn->protocol;
  $conn     = $conn->protocol(Protocol::Redis::XS->new(api => 1));

Holds a protocol object, such as L<Protocol::Redis::Faster> that is used to
generate and parse Redis messages.

=head2 url

  $url  = $conn->url;
  $conn = $conn->url(Mojo::URL->new->host("/tmp/redis.sock")->path("/5"));
  $conn = $conn->url("redis://localhost:6379/1");

=head1 METHODS

=head2 disconnect

  $conn = $conn->disconnect;

Used to disconnect from the Redis server.

=head2 is_connected

  $bool = $conn->is_connected;

True if a connection to the Redis server is established.

=head2 write

  $conn = $conn->write(@command_and_args);

Used to write a message to the redis server. Calling this method should result
in either a L</error> or L</response> event.

This is useful in the a

=head2 write_p

  $promise = $conn->write_p(@command_and_args);

Will write a command to the Redis server and establish a connection if not
already connected and returns a L<Mojo::Promise>.

=head1 SEE ALSO

L<Mojo::Redis>

=cut



( run in 0.399 second using v1.01-cache-2.11-cpan-3989ada0592 )