AnyEvent-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

  };
}

sub _create_on_eof {
  my $self = shift;

  weaken($self);

  return sub {
    my $err = _new_error( 'Connection closed by remote host.',
        E_CONN_CLOSED_BY_REMOTE_HOST );
    $self->_disconnect($err);
  };
}

sub _create_on_handle_error {
  my $self = shift;

  weaken($self);

  return sub {
    my $err_msg = pop;

    my $err = _new_error( $err_msg, E_IO );
    $self->_disconnect($err);
  };
}

sub _create_on_read {
  my $self = shift;

  weaken($self);

  my $str_len;
  my @bufs;
  my $bufs_num = 0;

  return sub {
    my $handle = shift;

    MAIN: while (1) {
      return if $handle->destroyed;

      my $reply;
      my $err_code;

      if ( defined $str_len ) {
        if ( length( $handle->{rbuf} ) < $str_len + EOL_LENGTH ) {
          return;
        }

        $reply = substr( $handle->{rbuf}, 0, $str_len, '' );
        substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );
        if ( $self->{utf8} ) {
          utf8::decode($reply);
        }

        undef $str_len;
      }
      else {
        my $eol_pos = index( $handle->{rbuf}, EOL );

        if ( $eol_pos < 0 ) {
          return;
        }

        $reply = substr( $handle->{rbuf}, 0, $eol_pos, '' );
        my $type = substr( $reply, 0, 1, '' );
        substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );

        if ( $type ne '+' && $type ne ':' ) {
          if ( $type eq '$' ) {
            if ( $reply >= 0 ) {
              $str_len = $reply;
              next;
            }

            undef $reply;
          }
          elsif ( $type eq '*' ) {
            if ( $reply > 0 ) {
              push( @bufs,
                { reply      => [],
                  err_code   => undef,
                  chunks_cnt => $reply,
                }
              );
              $bufs_num++;

              next;
            }
            elsif ( $reply == 0 ) {
              $reply = [];
            }
            else {
              undef $reply;
            }
          }
          elsif ( $type eq '-' ) {
            $err_code = E_OPRN_ERROR;
            if ( $reply =~ m/^([A-Z]{3,}) / ) {
              if ( exists $ERR_PREFS_MAP{$1} ) {
                $err_code = $ERR_PREFS_MAP{$1};
              }
            }
          }
          else {
            my $err = _new_error( 'Unexpected reply received.',
                E_UNEXPECTED_DATA );
            $self->_disconnect($err);

            return;
          }
        }
      }

      while ( $bufs_num > 0 ) {
        my $curr_buf = $bufs[-1];
        if ( defined $err_code ) {
          unless ( ref($reply) ) {
            $reply = _new_error( $reply, $err_code );
          }
          $curr_buf->{err_code} = E_OPRN_ERROR;
        }
        push( @{ $curr_buf->{reply} }, $reply );
        if ( --$curr_buf->{chunks_cnt} > 0 ) {
          next MAIN;



( run in 1.580 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )