AnyEvent-Pg-Pool-Multiserver

 view release on metacpan or  search on metacpan

lib/AnyEvent/Pg/Pool/Multiserver.pm  view on Meta::CPAN

package AnyEvent::Pg::Pool::Multiserver;

our $VERSION = '0.4';

use strict;
use warnings;
use utf8;
use v5.10;

use Carp qw( croak carp );
use AnyEvent;
use AnyEvent::Pg::Pool;
use Future;
use Params::Validate qw( validate_with );

use fields qw(
  pool
  local
);

use Class::XSAccessor {
  getters => {
    local => 'local'
  },
};

sub new {
  my $class  = shift;
  my $params = {@_};

  my $self = fields::new( $class );

  $params = $self->_validate_new( $params );

  my $pool = {};

  foreach my $server ( @{ $params->{servers} } ) {
    my $dbh = AnyEvent::Pg::Pool->new(
      $server->{conn},
      connection_retries => 10,
      connection_delay   => 1,
      size               => 4,
      on_error           => sub { carp 'Some error'; },
      on_transient_error => sub { carp 'Transient error'; },
      on_connect_error   => sub { carp 'Connection error'; },
    );

    croak 'server_id must be unique' if $pool->{ $server->{id} };
    $pool->{ $server->{id} } = { dbh => $dbh, name => $server->{name}, id => $server->{id} };
  }

  $self->{pool} = $pool;
  $self->{local} = $params->{local};

  return $self;
}

sub _validate_new {
  my __PACKAGE__ $self = shift;
  my $params = shift;

  $params = validate_with(
    params => $params,
    spec => {
      servers => 1,
      local   => 1,
    },
  );

  return $params;
}

sub selectall_arrayref {
  my __PACKAGE__ $self = shift;
  my $params = {@_};

  $params = $self->_validate_selectall_arrayref( $params );

  my @futures = ();

  my @pool = ();
  if ( defined $params->{server_id} ) {
    push @pool, $params->{server_id};
  }
  else {
    @pool = keys %{ $self->{pool} };
  }

  foreach my $server_id ( @pool ) {
    my $server = $self->{pool}{ $server_id };

    push @futures, $self->_get_future_push_query(
      query     => $params->{query},
      args      => $params->{args},
      server    => $server,
      cb_server => $params->{cb_server},
      type      => 'selectall_arrayref_slice',
    );
  }

  my $main_future = Future->wait_all( @futures );
  $main_future->on_done( sub {
    my @results = ();
    my @errors  = ();

    foreach my $future ( @futures ) {
      my ( $server, $result, $error ) = $future->get();

      if ( !$error ) {
        push @results, @$result;
      }
      else {
        push @errors, {
          server_name => $server->{name},
          server_id   => $server->{id},
          error       => $error,
        };
      }
    }

    $params->{cb}->(
      scalar( @results ) ? [ @results ] : undef,
      scalar( @errors ) ? [ @errors ] : undef,
    );

    undef $main_future;
  } );

  return;
}

sub _validate_selectall_arrayref {
  my __PACKAGE__ $self = shift;
  my $params = shift;

  $params = validate_with(
    params => $params,
    spec => {
      query     => 1,
      args      => 0,
      cb        => 1,
      server_id => 0,
      cb_server => 0,
    },
  );

  return $params;
}

sub _get_future_push_query {
  my __PACKAGE__ $self = shift;
  my $params = {@_};

  my $future = Future->new();

  my $watcher;

  $watcher = $params->{server}{dbh}->push_query(
    query => $params->{query},
    args  => $params->{args},
    on_error => sub {
      carp shift;
      $future->done( $params->{server}, undef, 'Push error' );
      undef $watcher;
    },
    on_result => sub {
      my $p   = shift;
      my $w   = shift;
      my $res = shift;

      if ( $res->errorMessage ) {
        carp $res->errorMessage;
        $future->done( $params->{server}, undef, $res->errorMessage );
        undef $watcher;
        return;
      }

      my $result;

      if ( $params->{type} eq 'selectall_arrayref_slice' ) {
        $result = _fetchall_arrayref_slice( $params->{server}{id}, $res );
      }
      elsif ( $params->{type} eq 'selectrow_hashref' ) {
        $result = _fetchrow_hashref( $params->{server}{id}, $res );
      }
      elsif ( $params->{type} eq 'selectrow_array' ) {
        $result = _fetchrow_array( $params->{server}{id}, $res );
      }
      elsif ( $params->{type} eq 'do' ) {
        $result = _fetch_do( $params->{server}{id}, $res );
      }

      my $cb = sub {
        $future->done( $params->{server}, $result );
        undef $watcher;
      };

lib/AnyEvent/Pg/Pool/Multiserver.pm  view on Meta::CPAN

sub _fetchall_arrayref_slice {
  my $id  = shift;
  my $res = shift;

  my $result = [];

  if ( $res->nRows ) {
    foreach my $row ( $res->rowsAsHashes ) {
      $row->{_server_id} = $id;
      push @$result, $row;
    }
  }

  return $result;
}

sub _fetchrow_hashref {
  my $id  = shift;
  my $res = shift;

  my $result;

  if ( $res->nRows ) {
    $result = $res->rowAsHash(0);
    $result->{_server_id} = $id;
  }

  return $result;
}

sub _fetchrow_array {
  my $id  = shift;
  my $res = shift;

  my $result;

  if ( $res->nRows ) {
    $result = [ $id, $res->row(0) ];
  }

  return $result;
}

sub _fetch_do {
  my $id  = shift;
  my $res = shift;

  my $result;

  if ( $res->cmdRows ) {
    $result = [ $id, $res->cmdRows ];
  }

  return $result;
}

sub selectrow_hashref {
  my __PACKAGE__ $self = shift;
  my $params = {@_};

  $params = $self->_validate_selectrow_hashref( $params );

  my $future = $self->_get_future_push_query(
    query     => $params->{query},
    args      => $params->{args},
    server    => $self->{pool}{ $params->{server_id} },
    cb_server => $params->{cb_server},
    type      => 'selectrow_hashref',
  );

  $future->on_done( sub {
    my ( $server, $result, $error ) = $future->get();

    if ( !$error ) {
      $params->{cb}->( $result, undef );
    }
    else {
      $params->{cb}->( undef, {
        server_name => $server->{name},
        server_id   => $server->{id},
        error       => $error,
      } );
    }

    undef $future;
  } );

  return;
}

sub _validate_selectrow_hashref {
  my __PACKAGE__ $self = shift;
  my $params = shift;

  $params = validate_with(
    params => $params,
    spec => {
      query     => 1,
      args      => 0,
      cb        => 1,
      server_id => 1,
      cb_server => 0,
    },
  );

  return $params;
}

sub selectrow_array {
  my __PACKAGE__ $self = shift;
  my $params = {@_};

  $params = $self->_validate_selectrow_array( $params );

  my $future = $self->_get_future_push_query(
    query     => $params->{query},
    args      => $params->{args},
    server    => $self->{pool}{ $params->{server_id} },
    cb_server => $params->{cb_server},
    type      => 'selectrow_array',
  );

  $future->on_done( sub {
    my ( $server, $result, $error ) = $future->get();

    if ( !$error ) {
      $params->{cb}->( $result, undef );
    }
    else {
      $params->{cb}->( undef, {
        server_name => $server->{name},
        server_id   => $server->{id},
        error       => $error,
      } );
    }

    undef $future;
  } );

  return;
}

sub _validate_selectrow_array {
  my __PACKAGE__ $self = shift;
  my $params = shift;

  $params = validate_with(
    params => $params,
    spec => {
      query     => 1,
      args      => 0,
      cb        => 1,
      server_id => 1,
      cb_server => 0,
    },
  );

  return $params;
}

sub do {
  my __PACKAGE__ $self = shift;
  my $params = {@_};

  $params = $self->_validate_do( $params );

  my $future = $self->_get_future_push_query(
    query     => $params->{query},
    args      => $params->{args},
    server    => $self->{pool}{ $params->{server_id} },
    cb_server => $params->{cb_server},
    type      => 'do',
  );

  $future->on_done( sub {
    my ( $server, $result, $error ) = $future->get();

    if ( !$error ) {
      $params->{cb}->( $result, undef );
    }
    else {
      $params->{cb}->( undef, {
        server_name => $server->{name},
        server_id   => $server->{id},
        error       => $error,
      } );
    }

    undef $future;
  } );

  return;
}

sub _validate_do {
  my __PACKAGE__ $self = shift;
  my $params = shift;

  $params = validate_with(
    params => $params,
    spec => {
      query     => 1,
      args      => 0,
      cb        => 1,
      server_id => 1,
      cb_server => 0,
    },
  );

  return $params;
}

1;

=head1 NAME

AnyEvent::Pg::Pool::Multiserver - Asyncronious multiserver requests to Postgresql with AnyEvent::Pg

=head1 SYNOPSIS

  my $servers = [
    {
      id   => 1,
      name => 'remote 1',
      conn => 'host=remote1 port=5432 dbname=mydb user=myuser password=mypass',
    },
    {
      id   => 2,
      name => 'remote 2',
      conn => 'host=remote2 port=5432 dbname=mydb user=myuser password=mypass',
    },
  ];
  my $pool = AnyEvent::Pg::Pool::Multiserver->new( servers => $servers, local => 1 );

  # multi-server request

  $pool->selectall_arrayref(
    query  => 'SELECT val FROM ( SELECT 1 AS val ) tmp WHERE tmp.val = $1;',
    args   => [ 1 ],
    cb     => sub {
      my $results = shift;
      my $errors  = shift;

      if ( $errors ) {
        foreach my $srv ( @$errors ) {
          say "err $srv->{error} with $srv->{server_name} $srv->{server_id}";
        }
      }

      if ( $results ) {
        foreach my $val ( @$results ) {
          say "server_id=$val->{_server_id} value=$val->{val}";
        }
      }
    },
  );

  # single-server request



( run in 2.236 seconds using v1.01-cache-2.11-cpan-99c4e6809bf )