PGObject-Util-LogRep-TestDecoding
view release on metacpan or search on metacpan
lib/PGObject/Util/LogRep/TestDecoding.pm view on Meta::CPAN
my $msg = parse_msg($repmsg); # procedural interace
# tells you the operation, transaction status etc.
# or the OO interface which gives more functionality
my $decoder = PGObject::util::LogRep::TestDecoding->new(
schema=> ['myschema'], txn_status => 0
);
handle_message($decoder->parse($repmsg)) if $decoder->matches($repmsg);
=head1 DESCRIPTION
This module provides parse capabiltiies for the test_decoding plugin for
PostgreSQL's logical replication. The test_decoding plugin does not recognize
or handle publications and simply replicates everything.
Messages follow formats like:
table public.data: INSERT: id[integer]:3 data[text]:'5'
or for begin or commit messages:
BEGIN 529
COMMIT 529
Transactions are always processed sequentially in the test decoding stack.
This module can be combined with C<AnyEvent::PGRecvLogical> to create programs
which process PostgreSQL's logical replication streams.
Note that PostgreSQL's logical replication sends out transactions in commit
order and this module assumes that it will process most messages if transaction
information is important (which it might not be for some applications).
=head1 EXPORT
=over
=item parse_msg # single message / non-OO parser
=back
=cut
BEGIN { our @EXPORT_OK = ('parse_msg'); }
=head1 ATTRIBUTES/ACCESSORS
These are for the OO interface only. These are read-ony after the object is
created but can be set in the constructor. If you need to change them. create
a new object instead.
=head2 schema Maybe[ArrayRef[Str]]
Undef or an arrayref of schalars. If it is set, then matches returns true if
the message matches any table in any schema mentioned.
=cut
has schema => (is => 'ro', isa => Maybe[ArrayRef[Str]]);
=head2 txn_status Bool
Whether to report transactoin status.
=cut
has txn_status => (is => 'ro', isa => Bool);
=head2 tables Maybe[ArrayRef[Str]]
A list of fully qualified tablenames to match against. Note that this filter
operates along with the schema filter and if either matches, the match is
met.
=cut
has tables => (is => 'ro', isa => Maybe[ArrayRef[Str]]);
=head2 current_txn (calculated)
Logical replication sends messages out for transactions in commit order.
Assuming the transaction numbers have been requested, this will produce the
transaction number of the most recent BEGIN statement. Note that this
information is only available when certain options are passed so it may return
C<undef>.
=cut
has current_txn => (is => 'rw', writer => '_set_current_txn');
=head1 GRAMMAR
Test_decoding lines come in two basic formats: transaction control lines and
DML lines. The former have a syntax like C<BEGIN 123> (or COMMIT).
The DML records have a longer and more complex. They have a format begins with
the word "table" followed by a fully qualified tablename, then an operation,
then a column list in name[type]:value format. Identifiers can be SQL escaped.
So can literals.
Since transactions are handled sequentially in commit order, the DML records do
not carry transaction identifiers in them.
=cut
my $grammar = <<'_ENDGRAMMAR' ;
{ my $retval = {}; 1; }
record : dmlrec | txnrec
{ $retval; }
txnrec : txncmd txnid(?)
{ $retval->{"txn_cmd"} = $item[1]; $retval; }
{ $retval->{"type"} = "txn"; $retval; }
dmlrec : header operation ":" col(s)
{ $retval->{"type"} = "dml"; $retval; }
{ $retval->{"operation"} = $item{operation}; $retval }
header : "table " schema "." tablename ": "
col : column(s)
schema : sqlident
{ $retval->{"schema"} = $item[1]; $retval; }
tablename : sqlident
{ $retval->{"tablename"} = $item[1]; $retval; }
column : /\s?/ colname "[" coltype "]" ":" value
{ $retval->{row_data}->{$item{"colname"}} = $item{"value"} }
colname : sqlident
coltype : schemaq(?) sqlident array(?)
schemaq : sqlident '.'
array : '[]'
value : literal
sqlident : /[a-zA-Z0-9()_ ]+/ | /"([^"]|"")+"/
literal : /\w+/ | /'([^']|'')+'/
txnid : /\d+/
{ $retval->{txnid} = $item[1]; $retval; }
txncmd : "BEGIN" | "COMMIT"
operation : "INSERT" | "UPDATE" | "DELETE"
_ENDGRAMMAR
( run in 1.027 second using v1.01-cache-2.11-cpan-39bf76dae61 )