Alzabo
view release on metacpan or search on metacpan
lib/Alzabo/RDBMSRules/PostgreSQL.pm view on Meta::CPAN
my @sql;
push @sql, 'DROP INDEX "' . $p{old}->name . '_pkey"';
if ( $p{new}->primary_key )
{
push @sql, ( 'CREATE UNIQUE INDEX "' . $p{new}->name . '_pkey" ON "' .
$p{new}->name . '" (' .
( join ', ',
map { '"' . $_->name . '"' } $p{new}->primary_key ) . ')' );
}
return @sql;
}
# Actually, Postgres _can_ change table names, but it's inability to
# change most aspects of a column definition make it very difficult to
# properly change a table name and then change its column definitions,
# so its easier just to recreate the table
sub can_alter_table_name
{
0;
}
# Not sure if this is possible
sub alter_table_attributes_sql
{
my $self = shift;
recreate_table_exception();
}
sub alter_column_name_sql
{
my $self = shift;
my $column = shift;
return
( 'ALTER TABLE "' . $column->table->name . '" RENAME COLUMN ' .
$column->former_name . ' TO ' . $column->name
);
}
sub reverse_engineer
{
my $self = shift;
my $schema = shift;
my $driver = $schema->driver;
foreach my $table ( $driver->tables )
{
$table =~ s/^[^\.]+\.//;
$table =~ s/^\"|\"$//g;
print STDERR "Adding table $table to schema\n"
if Alzabo::Debug::REVERSE_ENGINEER;
my $t = $schema->make_table( name => $table );
my $t_oid = $driver->one_row( sql => 'SELECT oid FROM pg_class WHERE relname = ?',
bind => $table );
my $sql = <<'EOF';
SELECT a.attname, a.attnotnull, t.typname, a.attnum, a.atthasdef, a.atttypmod
FROM pg_attribute a, pg_type t
WHERE a.attrelid = ?
AND a.atttypid = t.oid
AND a.attnum > 0
EOF
$sql .= ' AND NOT a.attisdropped' if $driver->rdbms_version ge '7.3';
$sql .= ' ORDER BY attnum';
my %cols_by_number;
foreach my $row ( $driver->rows( sql => $sql,
bind => $t_oid ) )
{
my %p;
$p{type} = $row->[2];
# has default
if ( $row->[4] )
{
$p{default} =
$driver->one_row
( sql => 'SELECT adsrc FROM pg_attrdef WHERE adrelid = ? AND adnum = ?',
bind => [ $t_oid, $row->[3] ] );
if ( $p{default} =~ /^nextval\(/ )
{
$p{sequenced} = 1;
$p{type} =~ s/(?:int(?:eger)?|numeric)/serial/;
}
else
{
# strip quotes (and type!) Postgres added
$p{default} =~ s/^'//; #'
if ( $driver->rdbms_version ge '7.4' )
{
# 'grotesque' becomes 'grotesque'::character
# varying. See
# src/backend/utils/adt/format_type.c
# This is from
# src/backend/util/adt/format_type.c
$p{default} =~ s/'(?:::[^']{3,})?$//;
$p{default} =~ s/\('(\w+)$/$1/;
}
else
{
$p{'default'} =~ s/'$//;
}
if ( $p{default} =~ /\([^\)]*\)/
|| $p{default} =~ /^(?:current_timestamp|localtime|localtimestamp|now)$/i )
{
$p{default_is_raw} = 1;
}
$p{default} = 'now()' if $p{default} eq 'now';
}
}
if ( $p{type} =~ /char/i )
{
# The real length is the value of: a.atttypmod - ((int32) sizeof(int32))
#
# Sure wish I knew how to figure this out in Perl.
# Its provided as VARHDRSZ in postgres.h but I can't
# really get at it. On my linux machine this is 4. A
# better way of doing this would be welcome.
$p{length} = $row->[5] - 4;
}
if ( lc $p{type} eq 'numeric' )
{
# see comment above.
my $num = $row->[5] - 4;
$p{length} = ($num >> 16) & 0xffff;
$p{precision} = $num & 0xffff;
}
$p{type} = 'char' if lc $p{type} eq 'bpchar';
print STDERR "Adding $row->[0] column to $table\n"
if Alzabo::Debug::REVERSE_ENGINEER;
my $col = $t->make_column( name => $row->[0],
nullable => ! $row->[1],
%p
);
if ( $col->is_integer )
{
if ( $self->_re_sequence_exists( $driver, $col ) )
{
$col->set_sequenced(1);
}
}
$cols_by_number{ $row->[3] } = $row->[0];
}
$sql = <<'EOF';
SELECT indkey
FROM pg_index
WHERE indisprimary
AND indrelid = ?
EOF
foreach my $cols ( $driver->column( sql => $sql,
bind => $t_oid ) )
{
my @cols = @cols_by_number{ split ' ', $cols };
local $" = ", ";
print STDERR "Setting @cols as primary key for $table\n"
if Alzabo::Debug::REVERSE_ENGINEER;
$t->add_primary_key( $_ ) for $t->columns( @cols );
}
my %i;
if ( $driver->rdbms_version ge '7.4' )
{
%i = $self->_74_indexes( $driver, $t, $t_oid, \%cols_by_number );
}
else
{
%i = $self->_pre_74_indexes( $driver, $t, $t_oid, \%cols_by_number );
}
foreach my $idx (values %i)
{
my @c = map { { column => $_ } } @{ $idx->{cols} };
print STDERR "Adding index "
. ( defined $idx->{'function'}
? $idx->{'function'}
: join(', ', map $_->name, @{$idx->{'cols'}} ) )
. " to $table\n"
if Alzabo::Debug::REVERSE_ENGINEER;
$t->make_index( columns => \@c,
unique => $idx->{unique},
function => $idx->{function},
);
}
$sql = <<'EOF';
SELECT consrc, array_to_string(conkey,' ')
FROM pg_constraint
WHERE conrelid = ?
AND contype = 'c'
EOF
my @att;
foreach my $row ( $driver->rows( sql => $sql,
bind => $t_oid ) )
{
my ( $con, $cols ) = @$row;
# this stuff is not needed
$con =~ s/::(\w+)//g;
# If $cols ever covers more than one value then this will fail.
if ( $cols =~ /^(\d+)$/ )
{
my $column = $cols_by_number{$1};
print STDERR qq|Adding constraint "$con" to $table.$column\n|
if Alzabo::Debug::REVERSE_ENGINEER;
$t->column($column)->add_attribute("CHECK $con");
}
else
{
print STDERR qq|Adding constraint "$con" to $table\n|
if Alzabo::Debug::REVERSE_ENGINEER;
$t->add_attribute("CHECK $con");
}
}
}
# Foreign key info is available in PG 7.3.0 and higher (could fake
# it from pg_triggers with extensive gymnastics in version 7.0 and
# higher, but that's a little iffy)
$self->_foreign_keys_to_relationships($schema)
if $driver->rdbms_version ge '7.3';
}
sub _re_sequence_exists
{
my $self = shift;
my $driver = shift;
my $col = shift;
my $seq_name = $self->_sequence_name($col);
my $sql = <<'EOF';
SELECT 1
FROM pg_class
WHERE relname = ?
AND relkind = ?
EOF
return $driver->one_row( sql => $sql,
bind => [ $seq_name, 'S' ],
);
}
sub _74_indexes
{
my $self = shift;
my $driver = shift;
my $table = shift;
my $t_oid = shift;
my $cols_by_number = shift;
my $sql = <<'EOF';
SELECT indexrelid, indisunique, indkey, indnatts
FROM pg_index
WHERE indrelid = ?
AND NOT indisprimary
EOF
my %i;
INDEX:
foreach my $row ( $driver->rows( sql => $sql,
bind => $t_oid ) )
{
my $function;
my @col_numbers;
my $spi =
$driver->one_row
( sql => "SELECT COALESCE(indexprs,'') FROM pg_index WHERE indexrelid = ?",
bind => $row->[0] );
if ( $spi )
{
SPI_EXPRESSION:
while ( my $spi_expr =
Text::Balanced::extract_bracketed( $spi, '{}', '[^{}]*' ) )
{
# A wanton lack of respect for boundaries. 'Parse' the
# PostgreSQL internal SPI language to find out what
# columns are being accessed.
push( @col_numbers,
join( ' ',
$spi_expr =~ /:varattno (\d+)/g ) );
}
}
if ( scalar( @col_numbers ) > 1 )
{
# Index objects are not prepared to handle functional
# indexes that use more than one function.
die "Alzabo " . Alzabo->VERSION . " does not support functional"
. " indexes that are not strictly a single function."
. " There are multiple functions on an index on the "
. $table->name() . " table.\n";
}
elsif ( scalar( @col_numbers ) == 1 )
{
my $func =
$driver->one_row
( sql => 'SELECT pg_catalog.pg_get_indexdef( ?, 1, true)',
bind => $row->[0] );
# XXX - not sure if this is a good idea but it makes the
# rev-eng tests pass
$func =~ s/\b(\w+)::\w+\b/$1/g;
my $col_in_func = $1;
my @function;
for my $num ( split / +/, $row->[2] )
{
if ( $num == 0 )
{
push @function, $func;
}
else
{
push @function, $cols_by_number->{$num};
push @col_numbers, $num;
}
}
$function = join ', ', @function;
}
else
{
# A regular index!
@col_numbers = split / +/, $row->[2];
}
push( @{ $i{ $row->[0] }{cols} },
$table->columns( @{ $cols_by_number }{ @col_numbers } ) );
$i{ $row->[0] }{function} = $function;
$i{ $row->[0] }{unique} = $row->[1];
}
return %i;
}
sub _pre_74_indexes
{
my $self = shift;
my $driver = shift;
my $table = shift;
my $t_oid = shift;
my $cols_by_number = shift;
my $sql = <<'EOF';
SELECT c.oid, a.attname, i.indisunique, i.indproc, i.indkey
FROM pg_index i, pg_attribute a, pg_class c
WHERE i.indrelid = ?
AND NOT i.indisprimary
AND i.indexrelid = c.oid
AND c.oid = a.attrelid
AND a.attnum > 0
ORDER BY a.attnum
EOF
my %i;
foreach my $row ( $driver->rows( sql => $sql,
bind => $t_oid ) )
{
my @col_names = @{ $cols_by_number }{ split ' ', $row->[4] };
my $function;
if ( $row->[3] && $row->[3] =~ /\w/ && $row->[3] ne '-' )
{
# some function names come out as "pg_catalog.foo"
$row->[3] =~ s/\w+\.(\w+)/$1/;
$function = uc $row->[3];
$function .= '(';
$function .= join ', ', @col_names;
$function .= ')';
}
push( @{ $i{ $row->[0] }{cols} },
$table->columns( @col_names ) );
$i{ $row->[0] }{unique} = $row->[2];
$i{ $row->[0] }{function} = $function;
}
return %i;
}
sub _foreign_keys_to_relationships
{
my ($self, $schema) = @_;
my $driver = $schema->driver;
my $constraint_sql = <<'EOF';
SELECT conrelid, confrelid,
array_to_string(conkey,' '),
array_to_string(confkey,' ')
FROM pg_constraint
WHERE contype = 'f'
EOF
my $table_sql = <<'EOF';
SELECT relname
FROM pg_class
WHERE oid = ?
EOF
my $column_sql = <<'EOF';
SELECT attname
FROM pg_attribute
WHERE attrelid = ?
AND attnum = ?
EOF
foreach my $row ( $driver->rows( sql => $constraint_sql ) )
{
my $from_table = $driver->one_row( sql => $table_sql,
bind => $row->[0] );
my $to_table = $driver->one_row( sql => $table_sql,
bind => $row->[1] );
# Column numbers are given as strings like "3 5"
my @from_cols = split ' ', $row->[2]
or die "Weird column specification $row->[2]";
my @to_cols = split ' ', $row->[3]
or die "Weird column specification $row->[3]";
# Convert column numbers to names
foreach (@from_cols)
{
$_ = $driver->one_row( sql => $column_sql,
bind => [$row->[0], $_] );
}
foreach (@to_cols)
{
$_ = $driver->one_row( sql => $column_sql,
bind => [$row->[1], $_] );
}
print STDERR "Adding $from_table foreign key to $to_table\n"
if Alzabo::Debug::REVERSE_ENGINEER;
# Convert to Alzabo objects
$from_table = $schema->table($from_table);
$to_table = $schema->table($to_table);
@from_cols = map { $from_table->column($_) } @from_cols;
@to_cols = map { $to_table->column($_) } @to_cols;
# If there's a unique constraint on the "from" columns, treat
# is as 1-to-1. Otherwise treat it as n-to-1.
my $from_unique = 0;
# Only use PK as determination of uniqueness if the FK is from
# the _whole_ PK to something else. If the FK only includes
# _part_ of the PK then it is not unique.
$from_unique = 1
if ( ( @from_cols == grep { $_->is_primary_key } @from_cols )
&&
( @from_cols == $from_table->primary_key_size ) );
$from_unique = 1
if @from_cols == grep { $_->has_attribute( attribute => 'UNIQUE' ) } @from_cols;
INDEX:
foreach my $i ( grep { $_->unique } $from_table->indexes )
{
my @i_cols = $i->columns;
next unless @i_cols == @from_cols;
for ( my $x = 0; $x < @i_cols; $x++ )
{
next INDEX unless $i_cols[$x] eq $from_cols[$x];
}
$from_unique = 1;
}
my $from_cardinality = $from_unique ? '1' : 'n';
my $from_is_dependent =
( grep { $_->nullable || defined $_->default } @from_cols ) ? 0 : 1;
my $to_is_dependent =
( grep { $_->nullable || $_->is_primary_key } @to_cols ) ? 0 : 1;
$schema->add_relationship( cardinality => [ $from_cardinality, '1' ],
table_from => $from_table,
table_to => $to_table,
columns_from => \@from_cols,
columns_to => \@to_cols,
from_is_dependent => $from_is_dependent,
to_is_dependent => $to_is_dependent,
);
}
}
sub rules_id
( run in 0.731 second using v1.01-cache-2.11-cpan-2398b32b56e )