Database-Async
view release on metacpan or search on metacpan
lib/Database/Async/ORM.pm view on Meta::CPAN
use Database::Async::ORM::Extension;
use Log::Any qw($log);
sub new {
my $class = shift;
bless {
schema => [],
extension => [],
@_
}, $class
}
sub add_schema {
my ($self, $schema) = @_;
push $self->{schema}->@*, $schema;
}
sub add_extension {
my ($self, $extension) = @_;
push $self->{extension}->@*, $extension;
}
sub schemata {
shift->{schema}->@*
}
sub schema_list {
shift->{schema}->@*
}
sub extension_list {
shift->{extension}->@*
}
sub schema_by_name {
my ($self, $name) = @_;
my ($schema) = grep { $_->name eq $name } $self->schemata or die 'cannot find schema ' . $name . ', have these instead: ' . join(',', map $_->name, $self->schemata);
return $schema;
}
sub schema_definitions { shift->{schema_definitions} //= {} }
# Currently hardcoded to PostgreSQL, eventually we should be able to query
# the engine for this information.
sub ddl_for {
require Database::Async::Engine::PostgreSQL::DDL;
return Database::Async::Engine::PostgreSQL::DDL->new;
}
async sub apply_database_changes {
my ($self, $db, @actions) = @_;
my $ddl = $self->ddl_for($db);
# Optional extensions first, and we don't care if any fail
for my $ext ($self->extension_list) {
my ($name) = $ext->name;
try {
die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
await $db->query(qq{create extension if not exists "$name" cascade})->void if $ext->is_optional;
} catch {
$log->warnf('Failed to install optional extension %s, ignoring: %s', $name, $@);
}
}
# All remaining steps are in a single transaction
await $db->query(q{begin})->void;
for my $ext (grep { not $_->is_optional } $self->extension_list) {
my ($name) = $ext->name;
await $db->query(qq{create extension if not exists "$name" cascade})->void;
}
my @out;
for my $action (@actions) {
if($action->isa('Database::Async::ORM::Table')) {
$log->tracef('Table definition for %s', $action->name);
my ($sql, @bind) = $ddl->table_info($action);
my %map = (
schema => $action->schema->name,
table => $action->name
);
my @data = map { $map{$_} } @bind;
my (@fields) = await $db->query(
$sql => @data
)->row_hashrefs
->as_list;
push @out, $ddl->create_table($action) unless @fields;
} elsif($action->isa('Database::Async::ORM::Schema')) {
$log->tracef('Create schema %s', $action->name);
my ($sql, @bind) = $ddl->schema_info($action);
my %map = (
schema => $action->name,
);
my @data = map { $map{$_} } @bind;
my (@schema) = await $db->query(
$sql => @data
)->row_hashrefs
->as_list;
push @out, $ddl->create_schema($action) unless @schema;
} elsif($action->isa('Database::Async::ORM::Type')) {
$log->tracef('Create type %s', $action->name);
my ($sql, @bind) = $ddl->type_info($action);
my %map = (
schema => $action->schema->name,
type => $action->name
);
my @data = map { $map{$_} } @bind;
my ($existing_type) = await $db->query(
$sql => @data
)->row_hashrefs
->as_list;
push @out, $ddl->create_type($action) unless $existing_type;
} else {
die 'unknown thing ' . $action;
}
}
# Make sure that we have no empty queries in the list... should not be necessary,
# perhaps this should just bail out instead.
@out = grep { length } @out;
$log->debugf('Applying %d pending database migrations', 0 + @out);
for my $query (@out) {
$log->tracef('Apply SQL: %s', $query);
await $db->query($query)->void;
}
await $db->query(q{commit})->void;
$log->debugf('Applied %d database migrations', 0 + @out);
return;
}
sub database_changes_as_sql {
my ($self, $db, @actions) = @_;
my $ddl = $self->ddl_for($db);
my @out;
# Optional extensions first, and we don't care if any fail
for my $ext ($self->extension_list) {
my ($name) = $ext->name;
try {
die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
push @out, qq{create extension if not exists "$name" cascade} if $ext->is_optional;
} catch {
$log->warnf('Failed to install optional extension %s, ignoring: %s', $name, $@);
}
}
# All remaining steps are in a single transaction
push @out, q{begin};
for my $ext (grep { not $_->is_optional } $self->extension_list) {
my ($name) = $ext->name;
die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
push @out, qq{create extension if not exists "$name" cascade};
}
for my $action (@actions) {
if($action->isa('Database::Async::ORM::Table')) {
$log->tracef('Create table %s', $action->name);
push @out, $ddl->create_table($action);
} elsif($action->isa('Database::Async::ORM::Schema')) {
$log->tracef('Create schema %s', $action->name);
push @out, $ddl->create_schema($action);
} elsif($action->isa('Database::Async::ORM::Type')) {
$log->tracef('Create type %s', $action->name);
push @out, $ddl->create_type($action);
} else {
die 'unknown thing ' . $action;
}
}
push @out, q{commit};
# Make sure that we have no empty queries in the list... should not be necessary,
# perhaps this should just bail out instead.
return grep { length } @out;
}
=head2 load_from
Loads schema, tables, types and any other available objects from
a source - currently supports the following:
=over 4
=item * hashref
=item * YAML file
=item * directory of YAML files
=back
You can call this multiple times to accumulate objects from various
different sources.
Returns the current L<Database::Async::ORM> instance.
=cut
sub load_from {
my ($self, $source, $loader) = @_;
die 'needs a source to load from' unless defined $source;
my $cfg = ref($source) ? $source : $self->read_from($source, $loader);
$self->{schema_definitions} = $cfg;
$log->tracef('Loaded config %s', $cfg);
my @pending;
for my $extension_name ($cfg->{extensions}{required}->@*) {
my $extension = Database::Async::ORM::Extension->new(
defined_in => $cfg->{extensions}{defined_in},
name => $extension_name,
( run in 1.692 second using v1.01-cache-2.11-cpan-d7f47b0818f )