PGObject-Util-LogRep-TestDecoding

 view release on metacpan or  search on metacpan

lib/PGObject/Util/LogRep/TestDecoding.pm  view on Meta::CPAN


    my $msg = parse_msg($repmsg); # procedural interace
    # tells you the operation, transaction status etc.

    # or the OO interface which gives more functionality

    my $decoder = PGObject::util::LogRep::TestDecoding->new(
        schema=> ['myschema'], txn_status => 0
    );
    handle_message($decoder->parse($repmsg)) if $decoder->matches($repmsg);

=head1 DESCRIPTION

This module provides parse capabiltiies for the test_decoding plugin for
PostgreSQL's logical replication.  The test_decoding plugin does not recognize
or handle publications and simply replicates everything.

Messages follow formats like:

  table public.data: INSERT: id[integer]:3 data[text]:'5'

or for begin or commit messages:

  BEGIN 529
  COMMIT 529

Transactions are always processed sequentially in the test decoding stack.

This module can be combined with C<AnyEvent::PGRecvLogical> to create programs
which process PostgreSQL's logical replication streams.

Note that PostgreSQL's logical replication sends out transactions in commit
order and this module assumes that it will process most messages if transaction
information is important (which it might not be for some applications).

=head1 EXPORT

=over

=item parse_msg # single message / non-OO parser

=back

=cut

BEGIN { our @EXPORT_OK = ('parse_msg'); }

=head1 ATTRIBUTES/ACCESSORS

These are for the OO interface only.  These are read-ony after the object is
created but can be set in the constructor.  If you need to change them. create
a new object instead.

=head2 schema Maybe[ArrayRef[Str]]

Undef or an arrayref of schalars.  If it is set, then matches returns true if
the message matches any table in any schema mentioned.

=cut

has schema => (is => 'ro', isa => Maybe[ArrayRef[Str]]);

=head2 txn_status Bool

Whether to report transactoin status.

=cut

has txn_status => (is => 'ro', isa => Bool);

=head2 tables Maybe[ArrayRef[Str]]

A list of fully qualified tablenames to match against.  Note that this filter
operates along with the schema filter and if either matches, the match is
met.

=cut

has tables => (is => 'ro', isa => Maybe[ArrayRef[Str]]);

=head2 current_txn (calculated)

Logical replication sends messages out for transactions in commit order.
Assuming the transaction numbers have been requested, this will produce the
transaction number of the most recent BEGIN statement.  Note that this
information is only available when certain options are passed so it may return
C<undef>.

=cut

has current_txn => (is => 'rw', writer => '_set_current_txn');

=head1 GRAMMAR

Test_decoding lines come in two basic formats:  transaction control lines and
DML lines.  The former have a syntax like C<BEGIN 123> (or COMMIT).

The DML records have a longer and more complex.  They have a format begins with
the word "table" followed by a fully qualified tablename, then an operation,
then a column list in name[type]:value format.  Identifiers can be SQL escaped.
So can literals.

Since transactions are handled sequentially in commit order, the DML records do
not carry transaction identifiers in them.

=cut


my $grammar = <<'_ENDGRAMMAR' ;
           { my $retval = {}; 1; }
    record : dmlrec | txnrec
           { $retval; }
    txnrec : txncmd txnid(?)
           { $retval->{"txn_cmd"} = $item[1]; $retval; } 
           { $retval->{"type"} = "txn"; $retval; }
    dmlrec : header operation ":" col(s)
           { $retval->{"type"} = "dml"; $retval; }
           { $retval->{"operation"} = $item{operation}; $retval }
    header : "table " schema "." tablename ": "
    col : column(s)
    schema : sqlident
           { $retval->{"schema"} = $item[1]; $retval; }
    tablename : sqlident
           { $retval->{"tablename"} = $item[1]; $retval; }
    column : /\s?/ colname "[" coltype "]" ":" value
           { $retval->{row_data}->{$item{"colname"}} = $item{"value"} }
    colname : sqlident
    coltype : schemaq(?) sqlident array(?)
    schemaq : sqlident '.'
    array   : '[]'
    value   : literal
    sqlident : /[a-zA-Z0-9()_ ]+/ | /"([^"]|"")+"/
    literal : /\w+/ | /'([^']|'')+'/
    txnid : /\d+/
           { $retval->{txnid} = $item[1]; $retval; }
    txncmd : "BEGIN" | "COMMIT"
    operation : "INSERT" | "UPDATE" | "DELETE"
_ENDGRAMMAR



( run in 1.027 second using v1.01-cache-2.11-cpan-39bf76dae61 )