Database-Async-Engine-PostgreSQL

 view release on metacpan or  search on metacpan

lib/Database/Async/Engine/PostgreSQL.pm  view on Meta::CPAN


=over 4

=item * C<SSL_REQUIRE>

=item * C<SSL_PREFER>

=item * C<SSL_DISABLE>

=back

=cut

sub ssl { shift->{ssl} }

=head2 read_len

Buffer read length. Higher values mean we will attempt to read more
data for each I/O loop iteration.

Defaults to 2 megabytes.

=cut

sub read_len { shift->{read_len} //= 2 * 1024 * 1024 }

=head2 write_len

Buffer write length. Higher values mean we will attempt to write more
data for each I/O loop iteration.

Defaults to 2 megabytes.

=cut

sub write_len { shift->{write_len} //= 2 * 1024 * 1024 }

=head2 connect

Establish a connection to the server.

Returns a L<Future> which resolves to the L<IO::Async::Stream>
once ready.

=cut

async sub connect {
    my ($self) = @_;
    my $loop = $self->loop;

    my $connected = $self->connected;
    die 'We think we are already connected, and that is bad' if $connected->as_numeric;

    # Initial connection is made directly through the URI
    # parameters. Eventually we also want to support UNIX
    # socket and other types.
    $self->{uri} ||= $self->uri_for_service($self->service) if $self->service;
    my $uri = $self->uri;
    die 'bad URI' unless ref $uri;
    $log->tracef('URI for connection is %s', "$uri");
    my $endpoint = join ':', $uri->host, $uri->port;

    $log->tracef('Will connect to %s', $endpoint);
    $self->{ssl} = do {
        my $mode = $uri->query_param('sslmode') // 'prefer';
        $Protocol::Database::PostgreSQL::Constants::SSL_NAME_MAP{$mode} // die 'unknown SSL mode ' . $mode;
    };

    # We're assuming TCP (either v4 or v6) here, but there's not really any reason we couldn't have
    # UNIX sockets or other transport layers here other than lack of demand so far.
    my @connect_params;
    if ($uri->host and not $uri->host =~ m!^[/@]!) {
        @connect_params = (
            service     => $uri->port,
            host        => $uri->host,
            socktype    => 'stream',
        );
    } elsif ($uri->host eq '') {
        @connect_params = (
            addr => {
                family   => 'unix',
                socktype => 'stream',
                path     => '/var/run/postgresql/.s.PGSQL.'.$uri->port,
            }
        );
    } else {
        @connect_params = (
            addr => {
                family   => 'unix',
                socktype => 'stream',
                path     => $uri->host.'/.s.PGSQL.'.$uri->port,
            }
        );
    }
    my $sock = await $loop->connect(@connect_params);

    if ($sock->sockdomain == Socket::PF_INET or $sock->sockdomain == Socket::PF_INET6) {
        my $local  = join ':', $sock->sockhost_service(1);
        my $remote = join ':', $sock->peerhost_service(1);
        $log->tracef('Connected to %s as %s from %s', $endpoint, $remote, $local);
    } elsif ($sock->sockdomain == Socket::PF_UNIX) {
        $log->tracef('Connected to %s as %s', $endpoint, $sock->peerpath);
    }

    # We start with a null handler for read, because our behaviour varies depending on
    # whether we want to go through the SSL dance or not.
    $self->add_child(
        my $stream = IO::Async::Stream->new(
            handle   => $sock,
            on_read  => sub { 0 }
        )
    );

    # SSL is conveniently simple: a prefix exchange before the real session starts,
    # and the user can decide whether SSL is mandatory or optional.
    $stream = await $self->negotiate_ssl(
        stream => $stream,
    );

    Scalar::Util::weaken($self->{stream} = $stream);
    $self->outgoing->each(sub {
        $log->tracef('Write bytes [%v02x]', $_);
        $self->ready_for_query->set_string('');
        $self->stream->write("$_");
        return;
    });
    $stream->configure(
        on_read   => $self->curry::weak::on_read,
        read_len  => $self->read_len,
        write_len => $self->write_len,
        autoflush => 0,
    );

    $log->tracef('Send initial request with user %s', $uri->user);

    # This is where the extensible options for initial connection are applied:
    # we have already handled SSL by this point, so we exclude this from the
    # list and pass everything else directly to the startup packet.
    my %qp = $uri->query_params;
    delete $qp{sslmode};

    $qp{application_name} //= $self->application_name;
    $self->protocol->send_startup_request(
        database         => $self->database_name,
        user             => $self->database_user,
        %qp
    );
    $connected->set_numeric(1);
    return $stream;
}

=head2 service_conf_path

Return the expected location for the pg_service.conf file.

=cut

sub service_conf_path {
    my ($class) = @_;
    return $ENV{PGSERVICEFILE} if exists $ENV{PGSERVICEFILE};
    return $ENV{PGSYSCONFDIR} . '/pg_service.conf' if exists $ENV{PGSYSCONFDIR};
    my $path = File::HomeDir->my_home . '/.pg_service.conf';



( run in 1.071 second using v1.01-cache-2.11-cpan-5735350b133 )