Database-Async-Engine-PostgreSQL

 view release on metacpan or  search on metacpan

lib/Database/Async/Engine/PostgreSQL.pm  view on Meta::CPAN

        $log->tracef('This is SSL, let us upgrade');
        $stream = await $self->loop->SSL_upgrade(
            handle          => $stream,
            # SSL defaults...
            SSL_server      => 0,
            SSL_hostname    => $self->uri->host,
            SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_NONE(),
            # Pass through anything SSL-related unchanged, the user knows
            # better than we do
            (map {; $_ => $self->{$_} } grep { /^SSL_/ } keys %$self)
        );
        $log->tracef('Upgrade complete');
    } elsif($resp eq 'N') {
        # N for "no SSL"...
        $log->tracef('No to SSL');
        die 'Server does not support SSL' if $self->ssl == SSL_REQUIRE;
    } else {
        # anything else is unexpected
        die 'Unknown response to SSL request';
    }
    return $stream;
}

sub is_replication { shift->{is_replication} //= 0 }
sub application_name { shift->{application_name} //= 'perl' }

=head2 uri_for_dsn

Returns a L<URI> corresponding to the given L<database source name|https://en.wikipedia.org/wiki/Data_source_name>.

May throw an exception if we don't have a valid string.

=cut

sub uri_for_dsn {
    my ($class, $dsn) = @_;
    die 'invalid DSN, expecting DBI:Pg:...' unless $dsn =~ s/^DBI:Pg://i;
    my %args = split /[=;]/, $dsn;
    my $uri = URI->new('postgresql://postgres@localhost/postgres');
    $uri->$_(delete $args{$_}) for grep exists $args{$_}, qw(host port user password dbname);
    $uri
}

sub uri_for_service {
    my ($class, $service) = @_;
    my $cfg = $class->find_service($service);

    # Start with common default values (i.e. follow libpq behaviour unless there's a strong reason not to)
    my $uri = URI->new('postgresql://postgres@localhost/postgres');

    # Standard fields supported by URI::pg
    $uri->$_(delete $cfg->{$_}) for grep exists $cfg->{$_}, qw(host port user password dbname);
    # ... note that `hostaddr` takes precedence over plain `host`
    $uri->host(delete $cfg->{hostaddr}) if exists $cfg->{hostaddr};

    # Everything else is handled via query parameters, this list is non-exhaustive and likely to be
    # extended in future (e.g. text/binary protocol mode)
    $uri->query_param($_ => delete $cfg->{$_}) for grep exists $cfg->{$_}, qw(
        application_name
        fallback_application_name
        keepalives
        options
        sslmode
        replication
    );
    $uri
}

=head2 stream

The L<IO::Async::Stream> representing the database connection.

=cut

sub stream { shift->{stream} }

=head2 on_read

Process incoming database packets.

Expects the following parameters:

=over 4

=item * C<$stream> - the L<IO::Async::Stream> we are receiving data on

=item * C<$buffref> - a scalar reference to the current input data buffer

=item * C<$eof> - true if we have reached the end of input

=back

=cut

sub on_read {
    my ($self, $stream, $buffref, $eof) = @_;

    try {
        $log->tracef('Have server message of length %d', length $$buffref);
        while(my $msg = $self->protocol->extract_message($buffref)) {
            $log->tracef('Message: %s', $msg);
            $self->incoming->emit($msg);
        }
    } catch($e) {
        # This really shouldn't happen, but since we can't trust our current state we should drop
        # the connection ASAP, and avoid any chance of barrelling through to a COMMIT or other
        # risky operation.
        $log->errorf('Failed to handle read, connection is no longer in a valid state: %s', $e);
        $stream->close_now;
    } finally {
        $self->connected->set_numeric(0) if $eof;
    }
    return 0;
}

=head2 ryu

Provides a L<Ryu::Async> instance.

=cut



( run in 0.997 second using v1.01-cache-2.11-cpan-39bf76dae61 )