Alt-CWB-ambs
view release on metacpan or search on metacpan
lib/CWB/CQP.pm view on Meta::CPAN
package CWB::CQP;
# -*-cperl-*-
=head1 NAME
CWB::CQP - Interact with a CQP process running in the background
=head1 SYNOPSIS
B<TODO: Update synopsis!>
use CWB::CQP;
# start CQP server process in the background
$cqp = new CWB::CQP;
$cqp = new CWB::CQP("-r /corpora/registry", "-I /global/init.cqp");
# check for specified or newer CQP version
$ok = $cqp->check_version($major, $minor, $beta);
# execute CQP command (blocking mode) and check for error
@lines = $cqp->exec($my_cmd);
unless ($cqp->ok) {
@cqp_error_message = $cqp->error_message;
my_error_handler();
}
# it's easier to use an automatic error handler
$cqp->set_error_handler(\&my_error_handler); # user-defined
$cqp->set_error_handler('die'); # built-in, useful for one-off scripts
# read TAB-delimited table from count, group, tabulate, ...
@table = $cqp->exec_rows($my_cmd);
# run CQP command in background (non-blocking mode)
$cqp->run($my_cmd);
if ($cqp->ready) { # specify optional timeout in seconds
my $line = $cqp->getline;
my @fields = $cqp->getrow; # TAB-delimited output
}
@lines = $cqp->getlines(10); # reads 10 lines, blocking if necessary
# execute in query lock mode (to improve security of CGI scripts)
$cqp->begin_query;
# execute untrusted CQP queries
$cqp->end_query;
@lines = $cqp->exec_query($untrusted_query); # convenience wrapper
# dump/undump a named query into/from a table of corpus positions
@matches = $cqp->dump("Last" [, $from, $to]);
$cqp->undump("Copy", @matches); # produces copy of "Last"
# activate CQP progress messages during query execution
$cqp->progress_on;
$status = $cqp->progress; # after starting CQP command with run()
($total, $pass, $n_passes, $msg, $percent) = $cqp->progress_info;
$cqp->progress_off;
$cqp->set_progress_handler(\&my_progress_handler); # user-defined handler
# close down CQP server (exits gracefully)
undef $cqp;
=head1 DESCRIPTION
A B<CWB::CQP> object represents an instance of the corpus query processor CQP
running as a background process. By calling suitable methods on this object,
arbitrary CQP commands can be executed and their output can be captured.
The C<STDERR> stream of the CQP process is monitored for error messages,
which can automatically trigger an error handler.
Every B<CWB::CQP> object has its own CQP background process and communication is
fully asynchronous. This enables scripts to perform other actions while a long
CQP command is executing, or to run multiple CQP instances in parallel.
=cut
use warnings;
use strict;
use sigtrap qw(die PIPE); # catch write errors to background CQP process
## $SIG{'CHLD'} = 'IGNORE'; # it would be nice to reap child processes automatically, but this seems to mess up closing pipes
use CWB;
use Carp;
use FileHandle;
use IPC::Open3;
use IO::Select;
## package global variables
our @CQP_options = "-c"; # always run CQP in child mode
our $CQP_version = "2.2.101"; # required version of CQP (checked at startup)
=head1 METHODS
The following methods are available:
=over 4
=item I<$cqp> = B<new> CWB::CQP;
=item I<$cqp> = B<new> CWB::CQP '-r /corpora/registry', '-l /data/cqpresults';
Spawn new CQP background process. The object I<$cqp> can then be used to communicate with
this CQP instance. Optional arguments of the B<new> method are passed as command-line
options to CQP. Use at your own risk.
=cut
## CWB::CQP object constructor
sub new {
my $class = shift; # class name
my $self = {}; # namespace for new CQP class object
my @options = @_; # CQP command-line options (use at your own risk)
# split options with values, e.g. "-r /my/registry" => "-r", "/my/registry" (doesn't work for multiple options in one string)
@options = map { (/^(--?[A-Za-z0-9]+)\s+(.+)$/) ? ($1, $2) : $_ } @options;
## run CQP server in the background
my $in = $self->{'in'} = new FileHandle; # stdin of CQP
my $out = $self->{'out'} = new FileHandle; # stdout of CQP
my $err = $self->{'err'} = new FileHandle; # stderr of CQP
my $pid = open3($in, $out, $err, $CWB::CQP, @CQP_options, @options);
$self->{'pid'} = $pid; # child process ID (so process can be killed if necessary)
$in->autoflush(1); # make sure that commands sent to CQP are always flushed immediately
my ($need_major, $need_minor, $need_beta) = split /\./, $CQP_version; # required CQP version
$need_beta = 0 unless $need_beta;
my $version_string = $out->getline; # child mode (-c) should print version on startup
chomp $version_string;
croak "ERROR: CQP backend startup failed ('$CWB::CQP @CQP_options @options')\n"
unless $version_string =~ /^CQP\s+(?:\w+\s+)*([0-9]+)\.([0-9]+)(?:\.b?([0-9]+))?(?:\s+(.*))?$/;
$self->{'major_version'} = $1;
$self->{'minor_version'} = $2;
$self->{'beta_version'} = $3 || 0;
$self->{'compile_date'} = $4 || "unknown";
croak "ERROR: CQP version too old, need at least v$CQP_version ($version_string)\n"
unless ($1 > $need_major or
$1 == $need_major
and ($2 > $need_minor or
($2 == $need_minor and $3 >= $need_beta)));
## command execution
$self->{'command'} = undef; # CQP command string that is currently being processed (undef = last command has been completed)
$self->{'lines'} = []; # array of output lines read from CQP process
$self->{'buffer'} = ""; # read buffer for standard output from CQP process
$self->{'block_size'} = 256; # block size for reading from CQP's output and error streams
$self->{'query_lock'} = undef;# holds random key while query lock mode is active
## error handling (messages on stderr)
$self->{'error_handler'} = undef; # set to subref for user-defined error handler
$self->{'status'} = 'ok'; # status of last executed command ('ok' or 'error')
$self->{'error_message'} = []; # arrayref to array containing message produced by last command (if any)
## handling of CQP progress messages
$self->{'progress'} = 0; # whether progress messages are activated
$self->{'progress_handler'} = undef; # optional callback for progress messages
$self->{'progress_info'} = []; # contains last available progress information: [$total_percent, $pass, $n_passes, $message, $percent]
## debugging (prints more or less everything on stdout)
$self->{'debug'} = 0;
## select vectors for CQP output (stdout, stderr, stdout|stderr)
$self->{'select_err'} = new IO::Select($err);
$self->{'select_out'} = new IO::Select($out);
$self->{'select_any'} = new IO::Select($err, $out);
## CQP object setup complete
bless($self, $class);
## the following command will collect and ignore any output which may have been produced during startup
$self->exec("set PrettyPrint off"); # pretty-printing should be turned off for non-interactive use
return $self;
}
=item B<undef> I<$cqp>;
Exit CQP background process gracefully by issuing an C<exit;> command.
This is done automatically when the variable I<$cqp> goes out of scope.
Note that there may be a slight delay while B<CWB::CQP> waits for the CQP
process to terminate.
=cut
sub DESTROY {
my $self = shift;
if ($self->{'command'}) {
while ($self->_update) {} # read pending output from active command
}
my $out = $self->{'out'};
if (defined $out) {
$out->print("exit"); # exit CQP backend
$out->close;
}
my $in = $self->{'in'};
if (defined $in) {
$in->close;
}
my $pid = $self->{'pid'};
waitpid $pid, 0; # wait for CQP to exit and reap background process
## **TODO** -- this may hang in some cases; is there a safe workaround?
}
=item I<$ok> = I<$cqp>->B<check_version>(I<$major>, I<$minor>, I<$beta>);
Check for minimum required CQP version, i.e. the background process has
to be CQP version I<$major>.I<$minor>.I<$beta> or newer.
I<$minor> and I<$beta> may be omitted, in which case they default to 0.
Note that the B<CWB::CQP> module automatically checks whether the CQP version
is compatible with its own requirements when a new object is created.
The B<check_version> method can subsequently be used to check for a more
recent release that provides functionality needed by the Perl script.
=cut
sub check_version {
my $self = shift;
my ($major, $minor, $beta) = @_;
$minor = 0 unless defined $minor;
$beta = 0 unless defined $beta;
my $maj = $self->{'major_version'};
my $min = $self->{'minor_version'};
my $bet = $self->{'beta_version'};
if ($maj > $major or
($maj == $major
and ($min > $minor or
($min == $minor and $bet >= $beta)))
) {
return 1;
}
else {
return 0;
}
}
=item I<$version_string> = I<$cqp>->B<version>;
Returns formatted version string for the CQP background process, e.g. C<2.2.99> or C<3.0>.
=cut
sub version {
my $self = shift;
my $version = $self->{'major_version'}.".".$self->{'minor_version'};
my $beta = $self->{'beta_version'};
$version .= ".$beta"
if $beta > 0;
return $version;
}
## INTERNAL:
## $lines_read = $self->_update([$timeout]);
## This is the main "workhorse" of the CWB::CQP module. It checks for output from CQP process
## (stdout and stderr), updates progress status, fills internal buffers, and calls error and
## progress handlers if necessary. The optional $timeout specifies how many seconds to wait for
## output; the default is 0 seconds, i.e. non-blocking mode, while a negative value blocks.
## NB: $lines_read includes the .EOL. terminator line, so it is safe to keep calling _update()
## until a non-zero value is returned (even if a CQP command fails with an error message).
sub _update {
my $self = shift;
my $timeout = shift || 0;
$timeout = undef
if $timeout < 0;
my $stderr_buffer = "";
my $lines = 0; # how many lines have successfully been read from stdout
while ($self->{'select_any'}->can_read($timeout)) {
## STDERR -- read all available output on stderr first
if ($self->{'select_err'}->can_read(0)) {
sysread $self->{'err'}, $stderr_buffer, $self->{'block_size'}, length($stderr_buffer); # append to $stderr_buffer
}
## STDOUT -- if there is no more data on stderr, we should be able to read from stdout
elsif ($self->{'select_out'}->can_read(0)) {
sysread $self->{'out'}, $self->{'buffer'}, $self->{'block_size'}, length($self->{'buffer'}); # append to object's input buffer
if ($self->{'buffer'} =~ /\n/) {
## if there's a complete line in the input buffer, split off all lines
my @new_lines = split /\n/, $self->{'buffer'}, -1; # make sure that last line is handled correctly if buffer ends in \n
$self->{'buffer'} = pop @new_lines; # last entry is incomplete line ("" if buffer ended in \n) => return to input buffer
foreach my $line (@new_lines) {
## skip blank line printed after each CQP command
next if $line eq "";
## handle progress messages if ProgressBar has been activated
if ($self->{'progress'} and $line =~ /^-::-PROGRESS-::-/) {
my ($pass, $n_passes, $message);
(undef, $pass, $n_passes, $message) = split /\t/, $line;
my $percent = ($message =~ /([0-9]+)\%\s*complete/) ? $1+0 : undef; # extract progress percentage, if present
my $total_percent = (100 * ($pass - 1) + ($percent || 0)) / $n_passes; # estimate total progress ($percent assumed to be 0% if not given)
$self->{'progress_info'} = [$total_percent, $pass, $n_passes, $message, $percent];
my $handler = $self->{'progress_handler'};
if (ref($handler) eq 'CODE') {
$handler->($total_percent, $pass, $n_passes, $message, $percent); # call user-defined progress handler
}
}
## regular output lines are collected in object's line buffer
else {
push @{$self->{'lines'}}, $line;
$lines++;
}
}
}
last if $lines > 0; # if we have read a line and there is no output on stderr, return from function
}
## ERROR -- we should never reach this point
else {
die "CWB::CQP: INTERNAL ERROR in _update() -- no data on stdout or stderr of CQP child process";
}
}
if ($stderr_buffer ne "") {
$self->{'status'} = 'error'; # any output on stderr indicates that something went wrong
push @{$self->{'error_message'}}, split /\n/, $stderr_buffer; # append output on stderr to error message
$self->error(@{$self->{'error_message'}}); # may call error handler and abort, or print message and continue
}
return $lines;
}
=item I<$cqp>->B<run>(I<$cmd>);
Start a single CQP command I<$cmd> in the background. This method returns immediately.
Command output can then be read with the B<getline>, B<getlines> and B<getrow> methods.
If asynchronous communication is desired, use B<ready> to check whether output is available.
It is an error to B<run> a new command before the output of the previous command has completely
been processed.
=cut
sub run {
croak 'USAGE: $cqp->run($cmd]);'
unless @_ == 2;
my $self = shift;
my $cmd = shift;
my $debug = $self->{'debug'};
$cmd =~ s/\n+/ /g; # make sure there are no newline characters (to be on the safe side)
$cmd =~ s/(;\s*)+$//; # ";" will be added when $cmd is sent to CQP
my $active_cmd = $self->{'command'};
croak "Error: new CQP command issued while '$active_cmd' is still being processed"
if $active_cmd;
$self->{'command'} = "$cmd;";
$self->{'status'} = 'ok';
$self->{'buffer'} = "";
$self->{'lines'} = [];
$self->{'error_message'} = [];
print "CQP << $cmd;\n"
if $debug;
$self->{'in'}->print("$cmd;\n .EOL.;\n"); # append .EOL. command to mark end of CQP output
}
=item I<$num_of_lines> = I<$cqp>->B<ready>;
=item I<$num_of_lines> = I<$cqp>->B<ready>(I<$timeout>);
Check if output from current CQP command is available for reading with B<getline> etc.,
returning the number of lines currently held in the input buffer (possibly including an
end-of-output marker line that will not be returned by B<getline> etc.). If there is no
active command, returns B<undef>.
The first form of the command returns immediately. The second form waits up to I<$timeout>
seconds for CQP output to become available. Use a negative I<$timeout> for blocking mode.
=cut
sub ready {
my $self = shift;
my $timeout = shift;
my $lines = @{$self->{'lines'}};
return $lines # output has already been buffered => ready to read
if $lines > 0;
return undef # no command active => undefined state
unless $self->{'command'};
return $self->_update($timeout); # try to read from CQP process & return number of lines available (NB: line buffer was empty before)
}
## INTERNAL: reset internal status after command has been completed, check that there is no extra output
( run in 1.622 second using v1.01-cache-2.11-cpan-d8267643d1d )