Database-Async

 view release on metacpan or  search on metacpan

lib/Database/Async/ORM.pm  view on Meta::CPAN

use Database::Async::ORM::Extension;

use Log::Any qw($log);

sub new {
    my $class = shift;
    bless {
        schema => [],
        extension => [],
        @_
    }, $class
}

sub add_schema {
    my ($self, $schema) = @_;
    push $self->{schema}->@*, $schema;
}

sub add_extension {
    my ($self, $extension) = @_;
    push $self->{extension}->@*, $extension;
}

sub schemata {
    shift->{schema}->@*
}

sub schema_list {
    shift->{schema}->@*
}

sub extension_list {
    shift->{extension}->@*
}

sub schema_by_name {
    my ($self, $name) = @_;
    my ($schema) = grep { $_->name eq $name } $self->schemata or die 'cannot find schema ' . $name . ', have these instead: ' . join(',', map $_->name, $self->schemata);
    return $schema;
}

sub schema_definitions { shift->{schema_definitions} //= {} }

# Currently hardcoded to PostgreSQL, eventually we should be able to query
# the engine for this information.
sub ddl_for {
    require Database::Async::Engine::PostgreSQL::DDL;
    return Database::Async::Engine::PostgreSQL::DDL->new;
}

async sub apply_database_changes {
    my ($self, $db, @actions) = @_;

    my $ddl = $self->ddl_for($db);

    # Optional extensions first, and we don't care if any fail
    for my $ext ($self->extension_list) {
        my ($name) = $ext->name;
        try {
            die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
            await $db->query(qq{create extension if not exists "$name" cascade})->void if $ext->is_optional;
        } catch {
            $log->warnf('Failed to install optional extension %s, ignoring: %s', $name, $@);
        }
    }

    # All remaining steps are in a single transaction
    await $db->query(q{begin})->void;

    for my $ext (grep { not $_->is_optional } $self->extension_list) {
        my ($name) = $ext->name;
        await $db->query(qq{create extension if not exists "$name" cascade})->void;
    }

    my @out;
    for my $action (@actions) {
        if($action->isa('Database::Async::ORM::Table')) {
            $log->tracef('Table definition for %s', $action->name);
            my ($sql, @bind) = $ddl->table_info($action);
            my %map = (
                schema => $action->schema->name,
                table  => $action->name
            );
            my @data = map { $map{$_} } @bind;
            my (@fields) = await $db->query(
                $sql => @data
            )->row_hashrefs
             ->as_list;
            push @out, $ddl->create_table($action) unless @fields;
        } elsif($action->isa('Database::Async::ORM::Schema')) {
            $log->tracef('Create schema %s', $action->name);
            my ($sql, @bind) = $ddl->schema_info($action);
            my %map = (
                schema => $action->name,
            );
            my @data = map { $map{$_} } @bind;
            my (@schema) = await $db->query(
                $sql => @data
            )->row_hashrefs
             ->as_list;
            push @out, $ddl->create_schema($action) unless @schema;
        } elsif($action->isa('Database::Async::ORM::Type')) {
            $log->tracef('Create type %s', $action->name);
            my ($sql, @bind) = $ddl->type_info($action);
            my %map = (
                schema => $action->schema->name,
                type   => $action->name
            );
            my @data = map { $map{$_} } @bind;
            my ($existing_type) = await $db->query(
                $sql => @data
            )->row_hashrefs
             ->as_list;
            push @out, $ddl->create_type($action) unless $existing_type;
        } else {
            die 'unknown thing ' . $action;
        }
    }

    # Make sure that we have no empty queries in the list... should not be necessary,
    # perhaps this should just bail out instead.
    @out = grep { length } @out;

    $log->debugf('Applying %d pending database migrations', 0 + @out);
    for my $query (@out) {
        $log->tracef('Apply SQL: %s', $query);
        await $db->query($query)->void;
    }

    await $db->query(q{commit})->void;
    $log->debugf('Applied %d database migrations', 0 + @out);
    return;
}

sub database_changes_as_sql {
    my ($self, $db, @actions) = @_;

    my $ddl = $self->ddl_for($db);

    my @out;
    # Optional extensions first, and we don't care if any fail
    for my $ext ($self->extension_list) {
        my ($name) = $ext->name;
        try {
            die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
            push @out, qq{create extension if not exists "$name" cascade} if $ext->is_optional;
        } catch {
            $log->warnf('Failed to install optional extension %s, ignoring: %s', $name, $@);
        }
    }

    # All remaining steps are in a single transaction
    push @out, q{begin};

    for my $ext (grep { not $_->is_optional } $self->extension_list) {
        my ($name) = $ext->name;
        die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
        push @out, qq{create extension if not exists "$name" cascade};
    }

    for my $action (@actions) {
        if($action->isa('Database::Async::ORM::Table')) {
            $log->tracef('Create table %s', $action->name);
            push @out, $ddl->create_table($action);
        } elsif($action->isa('Database::Async::ORM::Schema')) {
            $log->tracef('Create schema %s', $action->name);
            push @out, $ddl->create_schema($action);
        } elsif($action->isa('Database::Async::ORM::Type')) {
            $log->tracef('Create type %s', $action->name);
            push @out, $ddl->create_type($action);
        } else {
            die 'unknown thing ' . $action;
        }
    }

    push @out, q{commit};

    # Make sure that we have no empty queries in the list... should not be necessary,
    # perhaps this should just bail out instead.
    return grep { length } @out;
}

=head2 load_from

Loads schema, tables, types and any other available objects from
a source - currently supports the following:

=over 4

=item * hashref

=item * YAML file

=item * directory of YAML files

=back

You can call this multiple times to accumulate objects from various
different sources.

Returns the current L<Database::Async::ORM> instance.

=cut

sub load_from {
    my ($self, $source, $loader) = @_;
    die 'needs a source to load from' unless defined $source;

    my $cfg = ref($source) ? $source : $self->read_from($source, $loader);
    $self->{schema_definitions} = $cfg;
    $log->tracef('Loaded config %s', $cfg);

    my @pending;

    for my $extension_name ($cfg->{extensions}{required}->@*) {
        my $extension = Database::Async::ORM::Extension->new(
            defined_in => $cfg->{extensions}{defined_in},
            name       => $extension_name,



( run in 1.692 second using v1.01-cache-2.11-cpan-d7f47b0818f )