AnyEvent-Redis-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

    }
  };
}

sub _get_on_eof {
  my $self = shift;

  weaken( $self );

  return sub {
    $self->_disconnect( 'Connection closed by remote host.',
        E_CONN_CLOSED_BY_REMOTE_HOST );
  };
}

sub _get_handle_on_error {
  my $self = shift;

  weaken( $self );

  return sub {
    my $err_msg = pop;

    $self->_disconnect( $err_msg, E_IO );
  };
}

sub _get_on_read {
  my $self = shift;

  weaken( $self );

  my $str_len;
  my @bufs;
  my $bufs_num = 0;

  return sub {
    my $handle = shift;

    MAIN: while ( 1 ) {
      if ( $handle->destroyed() ) {
        return;
      }

      my $reply;
      my $err_code;

      if ( defined $str_len ) {
        if ( length( $handle->{rbuf} ) < $str_len + EOL_LEN ) {
          return;
        }

        $reply = substr( $handle->{rbuf}, 0, $str_len, '' );
        substr( $handle->{rbuf}, 0, EOL_LEN, '' );
        if ( defined $self->{encoding} ) {
          $reply = $self->{encoding}->decode( $reply );
        }
        undef $str_len;
      }
      else {
        my $eol_pos = index( $handle->{rbuf}, EOL );

        if ( $eol_pos < 0 ) {
          return;
        }

        $reply = substr( $handle->{rbuf}, 0, $eol_pos, '' );
        my $type = substr( $reply, 0, 1, '' );
        substr( $handle->{rbuf}, 0, EOL_LEN, '' );

        if ( $type ne '+' && $type ne ':' ) {
          if ( $type eq '$' ) {
            if ( $reply >= 0 ) {
              $str_len = $reply;

              next;
            }

            undef $reply;
          }
          elsif ( $type eq '*' ) {
            if ( $reply > 0 ) {
              push( @bufs,
                { data       => [],
                  err_code   => undef,
                  chunks_cnt => $reply,
                }
              );
              $bufs_num++;

              next;
            }
            elsif ( $reply == 0 ) {
              $reply = [];
            }
            else {
              undef $reply;
            }
          }
          elsif ( $type eq '-' ) {
            $err_code = E_OPRN_ERROR;
            if ( $reply =~ m/^([A-Z]{3,}) / ) {
              if ( exists $ERR_PREFS_MAP{$1} ) {
                $err_code = $ERR_PREFS_MAP{$1};
              }
            }
          }
          else {
            $self->_disconnect( 'Unexpected reply received.',
                E_UNEXPECTED_DATA );

            return;
          }
        }
      }

      while ( $bufs_num > 0 ) {
        my $curr_buf = $bufs[-1];
        if ( defined $err_code ) {
          unless ( ref($reply) ) {
            $reply
                = AnyEvent::Redis::RipeRedis::Error->new( $reply, $err_code );
          }
          $curr_buf->{err_code} = E_OPRN_ERROR;
        }
        push( @{ $curr_buf->{data} }, $reply );
        if ( --$curr_buf->{chunks_cnt} > 0 ) {



( run in 2.278 seconds using v1.01-cache-2.11-cpan-8f98c5d2c55 )