Alzabo

 view release on metacpan or  search on metacpan

lib/Alzabo/RDBMSRules/PostgreSQL.pm  view on Meta::CPAN

    my @sql;
    push @sql, 'DROP INDEX "' . $p{old}->name . '_pkey"';

    if ( $p{new}->primary_key )
    {
        push @sql, ( 'CREATE UNIQUE INDEX "' . $p{new}->name . '_pkey" ON "' .
                     $p{new}->name . '" (' .
                     ( join ', ',
                       map { '"' . $_->name . '"' } $p{new}->primary_key ) . ')' );
    }

    return @sql;
}

# Actually, Postgres _can_ change table names, but it's inability to
# change most aspects of a column definition make it very difficult to
# properly change a table name and then change its column definitions,
# so its easier just to recreate the table
sub can_alter_table_name
{
    0;
}

# Not sure if this is possible
sub alter_table_attributes_sql
{
    my $self = shift;

    recreate_table_exception();
}

sub alter_column_name_sql
{
    my $self = shift;
    my $column = shift;

    return
        ( 'ALTER TABLE "' . $column->table->name . '" RENAME COLUMN ' .
          $column->former_name . ' TO ' . $column->name
        );
}

sub reverse_engineer
{
    my $self = shift;
    my $schema = shift;

    my $driver = $schema->driver;

    foreach my $table ( $driver->tables )
    {
        $table =~ s/^[^\.]+\.//;
        $table =~ s/^\"|\"$//g;

        print STDERR "Adding table $table to schema\n"
            if Alzabo::Debug::REVERSE_ENGINEER;

        my $t = $schema->make_table( name => $table );

        my $t_oid = $driver->one_row( sql => 'SELECT oid FROM pg_class WHERE relname = ?',
                                      bind => $table );

        my $sql = <<'EOF';
SELECT a.attname, a.attnotnull, t.typname, a.attnum, a.atthasdef, a.atttypmod
FROM pg_attribute a, pg_type t
WHERE a.attrelid = ?
AND a.atttypid = t.oid
AND a.attnum > 0
EOF

        $sql .= ' AND NOT a.attisdropped' if $driver->rdbms_version ge '7.3';

        $sql .= ' ORDER BY attnum';


        my %cols_by_number;
        foreach my $row ( $driver->rows( sql => $sql,
                                         bind => $t_oid ) )
        {
            my %p;

            $p{type} = $row->[2];

            # has default
            if ( $row->[4] )
            {
                $p{default} =
                    $driver->one_row
                        ( sql => 'SELECT adsrc FROM pg_attrdef WHERE adrelid = ? AND adnum = ?',
                          bind => [ $t_oid, $row->[3] ] );

                if ( $p{default} =~ /^nextval\(/ )
                {
                    $p{sequenced} = 1;
                    $p{type} =~ s/(?:int(?:eger)?|numeric)/serial/;
                }
                else
                {
                    # strip quotes (and type!) Postgres added
                    $p{default} =~ s/^'//; #'
                    if ( $driver->rdbms_version ge '7.4' )
                    {
                        # 'grotesque' becomes 'grotesque'::character
                        # varying. See
                        # src/backend/utils/adt/format_type.c

                        # This is from
                        # src/backend/util/adt/format_type.c
                        $p{default} =~ s/'(?:::[^']{3,})?$//;
                        $p{default} =~ s/\('(\w+)$/$1/;
                    }
                    else
                    {
                        $p{'default'} =~ s/'$//;
                    }

                    if ( $p{default} =~ /\([^\)]*\)/
                         || $p{default} =~ /^(?:current_timestamp|localtime|localtimestamp|now)$/i )
                    {
                        $p{default_is_raw} = 1;
                    }

                    $p{default} = 'now()' if $p{default} eq 'now';
                }
            }

            if ( $p{type} =~ /char/i )
            {
                # The real length is the value of: a.atttypmod - ((int32) sizeof(int32))
                #
                # Sure wish I knew how to figure this out in Perl.
                # Its provided as VARHDRSZ in postgres.h but I can't
                # really get at it.  On my linux machine this is 4.  A
                # better way of doing this would be welcome.
                $p{length} = $row->[5] - 4;
            }
            if ( lc $p{type} eq 'numeric' )
            {
                # see comment above.
                my $num = $row->[5] - 4;
                $p{length} = ($num >> 16) & 0xffff;
                $p{precision} = $num & 0xffff;
            }

            $p{type} = 'char' if lc $p{type} eq 'bpchar';

            print STDERR "Adding $row->[0] column to $table\n"
                if Alzabo::Debug::REVERSE_ENGINEER;

            my $col = $t->make_column( name => $row->[0],
                                       nullable => ! $row->[1],
                                       %p
                                     );

            if ( $col->is_integer )
            {
                if ( $self->_re_sequence_exists( $driver, $col ) )
                {
                    $col->set_sequenced(1);
                }
            }

            $cols_by_number{ $row->[3] } = $row->[0];
        }


        $sql = <<'EOF';
SELECT indkey
FROM pg_index
WHERE indisprimary
AND indrelid = ?
EOF

        foreach my $cols ( $driver->column( sql => $sql,
                                           bind => $t_oid ) )
        {
	    my @cols = @cols_by_number{ split ' ', $cols };
	    local $" = ", ";

	    print STDERR "Setting @cols as primary key for $table\n"
 		if Alzabo::Debug::REVERSE_ENGINEER;

	    $t->add_primary_key( $_ ) for $t->columns( @cols );
        }

	my %i;
	if ( $driver->rdbms_version ge '7.4' )
	{
            %i = $self->_74_indexes( $driver, $t, $t_oid, \%cols_by_number );
	}
        else
        {
            %i = $self->_pre_74_indexes( $driver, $t, $t_oid, \%cols_by_number );
 	}

	foreach my $idx (values %i)
 	{
	    my @c = map { { column => $_ } } @{ $idx->{cols} };

	    print STDERR "Adding index "
		. ( defined $idx->{'function'}
		    ? $idx->{'function'}
		    : join(', ', map $_->name, @{$idx->{'cols'}} ) )
		. " to $table\n"
		if Alzabo::Debug::REVERSE_ENGINEER;

 	    $t->make_index( columns  => \@c,
			    unique   => $idx->{unique},
                            function => $idx->{function},
                          );
        }

        $sql = <<'EOF';
SELECT consrc, array_to_string(conkey,' ')
FROM pg_constraint
WHERE conrelid = ?
AND contype = 'c'
EOF

        my @att;

        foreach my $row ( $driver->rows( sql => $sql,
                                         bind => $t_oid ) )
        {
            my ( $con, $cols ) = @$row;

            # this stuff is not needed
            $con =~ s/::(\w+)//g;

	    # If $cols ever covers more than one value then this will fail.
            if ( $cols =~ /^(\d+)$/ )
            {
                my $column = $cols_by_number{$1};

                print STDERR qq|Adding constraint "$con" to $table.$column\n|
                    if Alzabo::Debug::REVERSE_ENGINEER;

                $t->column($column)->add_attribute("CHECK $con");
            }
            else
            {
                print STDERR qq|Adding constraint "$con" to $table\n|
                    if Alzabo::Debug::REVERSE_ENGINEER;

                $t->add_attribute("CHECK $con");
            }
        }

    }

    # Foreign key info is available in PG 7.3.0 and higher (could fake
    # it from pg_triggers with extensive gymnastics in version 7.0 and
    # higher, but that's a little iffy)
    $self->_foreign_keys_to_relationships($schema)
        if $driver->rdbms_version ge '7.3';
}

sub _re_sequence_exists
{
    my $self = shift;
    my $driver = shift;
    my $col = shift;

    my $seq_name = $self->_sequence_name($col);

    my $sql = <<'EOF';
SELECT 1
  FROM pg_class
 WHERE relname = ?
   AND relkind = ?
EOF

    return $driver->one_row( sql  => $sql,
                             bind => [ $seq_name, 'S' ],
                           );
}

sub _74_indexes
{
    my $self   = shift;
    my $driver = shift;
    my $table  = shift;
    my $t_oid  = shift;
    my $cols_by_number  = shift;

    my $sql = <<'EOF';
SELECT indexrelid, indisunique, indkey, indnatts
FROM pg_index
WHERE indrelid = ?
AND NOT indisprimary
EOF

    my %i;
   INDEX:
    foreach my $row ( $driver->rows( sql => $sql,
                                     bind => $t_oid ) )
    {
        my $function;
        my @col_numbers;

        my $spi =
            $driver->one_row
                ( sql => "SELECT COALESCE(indexprs,'') FROM pg_index WHERE indexrelid = ?",
                  bind => $row->[0] );

        if ( $spi )
        {
          SPI_EXPRESSION:
            while ( my $spi_expr =
                    Text::Balanced::extract_bracketed( $spi, '{}', '[^{}]*' ) )
            {
                # A wanton lack of respect for boundaries. 'Parse' the
                # PostgreSQL internal SPI language to find out what
                # columns are being accessed.
                push( @col_numbers,
                      join( ' ',
                            $spi_expr =~ /:varattno (\d+)/g ) );
            }
        }

        if ( scalar( @col_numbers ) > 1 )
        {
            # Index objects are not prepared to handle functional
            # indexes that use more than one function.
            die "Alzabo " . Alzabo->VERSION . " does not support functional"
                . " indexes that are not strictly a single function."
                . "  There are multiple functions on an index on the "
                . $table->name() . " table.\n";
        }
        elsif ( scalar( @col_numbers ) == 1 )
        {
            my $func =
                $driver->one_row
                    ( sql => 'SELECT pg_catalog.pg_get_indexdef( ?, 1, true)',
                      bind => $row->[0] );

            # XXX - not sure if this is a good idea but it makes the
            # rev-eng tests pass
            $func =~ s/\b(\w+)::\w+\b/$1/g;
            my $col_in_func = $1;

            my @function;
            for my $num ( split / +/, $row->[2] )
            {
                if ( $num == 0 )
                {
                    push @function, $func;
                }
                else
                {
                    push @function, $cols_by_number->{$num};
                    push @col_numbers, $num;
                }
            }

            $function = join ', ', @function;
        }
        else
        {
            # A regular index!
            @col_numbers = split / +/, $row->[2];
        }

        push( @{ $i{ $row->[0] }{cols} },
              $table->columns( @{ $cols_by_number }{ @col_numbers } ) );

        $i{ $row->[0] }{function} = $function;
        $i{ $row->[0] }{unique} = $row->[1];
    }

    return %i;
}

sub _pre_74_indexes
{
    my $self   = shift;
    my $driver = shift;
    my $table  = shift;
    my $t_oid  = shift;
    my $cols_by_number  = shift;

    my $sql = <<'EOF';
SELECT c.oid, a.attname, i.indisunique, i.indproc, i.indkey
FROM pg_index i, pg_attribute a, pg_class c
WHERE i.indrelid = ?
AND NOT i.indisprimary
AND i.indexrelid = c.oid
AND c.oid = a.attrelid
AND a.attnum > 0
ORDER BY a.attnum
EOF

    my %i;
    foreach my $row ( $driver->rows( sql => $sql,
                                     bind => $t_oid ) )
    {
        my @col_names = @{ $cols_by_number }{ split ' ', $row->[4] };

        my $function;
        if ( $row->[3] && $row->[3] =~ /\w/ && $row->[3] ne '-' )
        {
            # some function names come out as "pg_catalog.foo"
            $row->[3] =~ s/\w+\.(\w+)/$1/;
            $function = uc $row->[3];
            $function .= '(';

            $function .= join ', ', @col_names;

            $function .= ')';
        }

        push( @{ $i{ $row->[0] }{cols} },
              $table->columns( @col_names ) );

        $i{ $row->[0] }{unique} = $row->[2];
        $i{ $row->[0] }{function} = $function;
    }

    return %i;
}

sub _foreign_keys_to_relationships
{
    my ($self, $schema) = @_;
    my $driver = $schema->driver;

    my $constraint_sql = <<'EOF';
SELECT conrelid, confrelid,
    array_to_string(conkey,' '),
    array_to_string(confkey,' ')
FROM pg_constraint
WHERE contype = 'f'
EOF

    my $table_sql = <<'EOF';
SELECT relname
FROM pg_class
WHERE oid = ?
EOF

    my $column_sql = <<'EOF';
SELECT attname
FROM pg_attribute
WHERE attrelid = ?
  AND attnum = ?
EOF

    foreach my $row ( $driver->rows( sql => $constraint_sql ) )
    {
        my $from_table = $driver->one_row( sql => $table_sql,
                                           bind => $row->[0] );
        my $to_table   = $driver->one_row( sql => $table_sql,
                                           bind => $row->[1] );

	# Column numbers are given as strings like "3 5"
	my @from_cols = split ' ', $row->[2]
 	    or die "Weird column specification $row->[2]";

	my @to_cols   = split ' ', $row->[3]
 	    or die "Weird column specification $row->[3]";

        # Convert column numbers to names
        foreach (@from_cols)
        {
            $_ = $driver->one_row( sql => $column_sql,
                                   bind => [$row->[0], $_] );
        }
        foreach (@to_cols)
        {
            $_ = $driver->one_row( sql => $column_sql,
                                   bind => [$row->[1], $_] );
        }

        print STDERR "Adding $from_table foreign key to $to_table\n"
            if Alzabo::Debug::REVERSE_ENGINEER;

        # Convert to Alzabo objects
        $from_table = $schema->table($from_table);
        $to_table   = $schema->table($to_table);
        @from_cols = map { $from_table->column($_) } @from_cols;
        @to_cols   = map {   $to_table->column($_) } @to_cols;

        # If there's a unique constraint on the "from" columns, treat
        # is as 1-to-1.  Otherwise treat it as n-to-1.
        my $from_unique = 0;

        # Only use PK as determination of uniqueness if the FK is from
        # the _whole_ PK to something else.  If the FK only includes
        # _part_ of the PK then it is not unique.
        $from_unique = 1
            if ( ( @from_cols == grep { $_->is_primary_key } @from_cols )
                 &&
                 ( @from_cols == $from_table->primary_key_size ) );

        $from_unique = 1
            if @from_cols == grep { $_->has_attribute( attribute => 'UNIQUE' ) } @from_cols;

      INDEX:
        foreach my $i ( grep { $_->unique } $from_table->indexes )
        {
            my @i_cols = $i->columns;

            next unless @i_cols == @from_cols;

            for ( my $x = 0; $x < @i_cols; $x++ )
            {
                next INDEX unless $i_cols[$x] eq $from_cols[$x];
            }

            $from_unique = 1;
        }

        my $from_cardinality = $from_unique ? '1' : 'n';

        my $from_is_dependent =
            ( grep { $_->nullable || defined $_->default } @from_cols ) ? 0 : 1;
        my $to_is_dependent =
            ( grep { $_->nullable || $_->is_primary_key } @to_cols ) ? 0 : 1;

        $schema->add_relationship( cardinality => [ $from_cardinality, '1' ],
                                   table_from => $from_table,
                                   table_to   => $to_table,
                                   columns_from => \@from_cols,
                                   columns_to   => \@to_cols,
                                   from_is_dependent => $from_is_dependent,
                                   to_is_dependent => $to_is_dependent,
                                 );
    }
}

sub rules_id



( run in 0.731 second using v1.01-cache-2.11-cpan-2398b32b56e )