Database-Async-Engine-PostgreSQL
view release on metacpan or search on metacpan
lib/Database/Async/Engine/PostgreSQL.pm view on Meta::CPAN
$log->tracef('This is SSL, let us upgrade');
$stream = await $self->loop->SSL_upgrade(
handle => $stream,
# SSL defaults...
SSL_server => 0,
SSL_hostname => $self->uri->host,
SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_NONE(),
# Pass through anything SSL-related unchanged, the user knows
# better than we do
(map {; $_ => $self->{$_} } grep { /^SSL_/ } keys %$self)
);
$log->tracef('Upgrade complete');
} elsif($resp eq 'N') {
# N for "no SSL"...
$log->tracef('No to SSL');
die 'Server does not support SSL' if $self->ssl == SSL_REQUIRE;
} else {
# anything else is unexpected
die 'Unknown response to SSL request';
}
return $stream;
}
sub is_replication { shift->{is_replication} //= 0 }
sub application_name { shift->{application_name} //= 'perl' }
=head2 uri_for_dsn
Returns a L<URI> corresponding to the given L<database source name|https://en.wikipedia.org/wiki/Data_source_name>.
May throw an exception if we don't have a valid string.
=cut
sub uri_for_dsn {
my ($class, $dsn) = @_;
die 'invalid DSN, expecting DBI:Pg:...' unless $dsn =~ s/^DBI:Pg://i;
my %args = split /[=;]/, $dsn;
my $uri = URI->new('postgresql://postgres@localhost/postgres');
$uri->$_(delete $args{$_}) for grep exists $args{$_}, qw(host port user password dbname);
$uri
}
sub uri_for_service {
my ($class, $service) = @_;
my $cfg = $class->find_service($service);
# Start with common default values (i.e. follow libpq behaviour unless there's a strong reason not to)
my $uri = URI->new('postgresql://postgres@localhost/postgres');
# Standard fields supported by URI::pg
$uri->$_(delete $cfg->{$_}) for grep exists $cfg->{$_}, qw(host port user password dbname);
# ... note that `hostaddr` takes precedence over plain `host`
$uri->host(delete $cfg->{hostaddr}) if exists $cfg->{hostaddr};
# Everything else is handled via query parameters, this list is non-exhaustive and likely to be
# extended in future (e.g. text/binary protocol mode)
$uri->query_param($_ => delete $cfg->{$_}) for grep exists $cfg->{$_}, qw(
application_name
fallback_application_name
keepalives
options
sslmode
replication
);
$uri
}
=head2 stream
The L<IO::Async::Stream> representing the database connection.
=cut
sub stream { shift->{stream} }
=head2 on_read
Process incoming database packets.
Expects the following parameters:
=over 4
=item * C<$stream> - the L<IO::Async::Stream> we are receiving data on
=item * C<$buffref> - a scalar reference to the current input data buffer
=item * C<$eof> - true if we have reached the end of input
=back
=cut
sub on_read {
my ($self, $stream, $buffref, $eof) = @_;
try {
$log->tracef('Have server message of length %d', length $$buffref);
while(my $msg = $self->protocol->extract_message($buffref)) {
$log->tracef('Message: %s', $msg);
$self->incoming->emit($msg);
}
} catch($e) {
# This really shouldn't happen, but since we can't trust our current state we should drop
# the connection ASAP, and avoid any chance of barrelling through to a COMMIT or other
# risky operation.
$log->errorf('Failed to handle read, connection is no longer in a valid state: %s', $e);
$stream->close_now;
} finally {
$self->connected->set_numeric(0) if $eof;
}
return 0;
}
=head2 ryu
Provides a L<Ryu::Async> instance.
=cut
( run in 0.997 second using v1.01-cache-2.11-cpan-39bf76dae61 )