Alt-CWB-ambs

 view release on metacpan or  search on metacpan

lib/CWB/CQP.pm  view on Meta::CPAN

package CWB::CQP;
# -*-cperl-*-

=head1 NAME

CWB::CQP - Interact with a CQP process running in the background

=head1 SYNOPSIS

B<TODO: Update synopsis!>

  use CWB::CQP;

  # start CQP server process in the background
  $cqp = new CWB::CQP;
  $cqp = new CWB::CQP("-r /corpora/registry", "-I /global/init.cqp");

  # check for specified or newer CQP version
  $ok = $cqp->check_version($major, $minor, $beta);

  # execute CQP command (blocking mode) and check for error
  @lines = $cqp->exec($my_cmd);
  unless ($cqp->ok) {
    @cqp_error_message = $cqp->error_message;
    my_error_handler();
  }

  # it's easier to use an automatic error handler
  $cqp->set_error_handler(\&my_error_handler); # user-defined
  $cqp->set_error_handler('die'); # built-in, useful for one-off scripts

  # read TAB-delimited table from count, group, tabulate, ...
  @table = $cqp->exec_rows($my_cmd);

  # run CQP command in background (non-blocking mode)
  $cqp->run($my_cmd);
  if ($cqp->ready) {  # specify optional timeout in seconds
    my $line = $cqp->getline;
    my @fields = $cqp->getrow; # TAB-delimited output
  }
  @lines = $cqp->getlines(10); # reads 10 lines, blocking if necessary

  # execute in query lock mode (to improve security of CGI scripts)
  $cqp->begin_query;
    # execute untrusted CQP queries
  $cqp->end_query;
  
  @lines = $cqp->exec_query($untrusted_query); # convenience wrapper
  
  # dump/undump a named query into/from a table of corpus positions
  @matches = $cqp->dump("Last" [, $from, $to]);
  $cqp->undump("Copy", @matches);  # produces copy of "Last"

  # activate CQP progress messages during query execution
  $cqp->progress_on;
  $status = $cqp->progress; # after starting CQP command with run()
  ($total, $pass, $n_passes, $msg, $percent) = $cqp->progress_info;
  $cqp->progress_off;

  $cqp->set_progress_handler(\&my_progress_handler); # user-defined handler

  # close down CQP server (exits gracefully)
  undef $cqp;

=head1 DESCRIPTION

A B<CWB::CQP> object represents an instance of the corpus query processor CQP
running as a background process.  By calling suitable methods on this object,
arbitrary CQP commands can be executed and their output can be captured.   
The C<STDERR> stream of the CQP process is monitored for error messages,
which can automatically trigger an error handler.

Every B<CWB::CQP> object has its own CQP background process and communication is
fully asynchronous.  This enables scripts to perform other actions while a long
CQP command is executing, or to run multiple CQP instances in parallel.

=cut

use warnings;
use strict;

use sigtrap qw(die PIPE);       # catch write errors to background CQP process
## $SIG{'CHLD'} = 'IGNORE'; # it would be nice to reap child processes automatically, but this seems to mess up closing pipes

use CWB;
use Carp;
use FileHandle;
use IPC::Open3;
use IO::Select;

## package global variables
our @CQP_options = "-c";        # always run CQP in child mode
our $CQP_version = "2.2.101";    # required version of CQP (checked at startup)

=head1 METHODS

The following methods are available:

=over 4

=item I<$cqp> = B<new> CWB::CQP;

=item I<$cqp> = B<new> CWB::CQP '-r /corpora/registry', '-l /data/cqpresults';

Spawn new CQP background process.  The object I<$cqp> can then be used to communicate with 
this CQP instance.  Optional arguments of the B<new> method are passed as command-line
options to CQP.  Use at your own risk.

=cut

## CWB::CQP object constructor
sub new {
  my $class = shift;            # class name
  my $self = {};                # namespace for new CQP class object
  my @options = @_;             # CQP command-line options (use at your own risk)
  # split options with values, e.g. "-r /my/registry" => "-r", "/my/registry" (doesn't work for multiple options in one string)
  @options = map { (/^(--?[A-Za-z0-9]+)\s+(.+)$/) ? ($1, $2) : $_ } @options;

  ## run CQP server in the background
  my $in = $self->{'in'} = new FileHandle;   # stdin of CQP
  my $out = $self->{'out'} = new FileHandle; # stdout of CQP
  my $err = $self->{'err'} = new FileHandle; # stderr of CQP
  my $pid = open3($in, $out, $err, $CWB::CQP, @CQP_options, @options);
  $self->{'pid'} = $pid; # child process ID (so process can be killed if necessary)
  $in->autoflush(1); # make sure that commands sent to CQP are always flushed immediately

  my ($need_major, $need_minor, $need_beta) = split /\./, $CQP_version; # required CQP version
  $need_beta = 0 unless $need_beta;

  my $version_string = $out->getline; # child mode (-c) should print version on startup
  chomp $version_string;
  croak "ERROR: CQP backend startup failed ('$CWB::CQP @CQP_options @options')\n"
    unless $version_string =~ /^CQP\s+(?:\w+\s+)*([0-9]+)\.([0-9]+)(?:\.b?([0-9]+))?(?:\s+(.*))?$/;
  $self->{'major_version'} = $1;
  $self->{'minor_version'} = $2;
  $self->{'beta_version'} = $3 || 0;
  $self->{'compile_date'} = $4 || "unknown";
  croak "ERROR: CQP version too old, need at least v$CQP_version ($version_string)\n"
    unless ($1 > $need_major or
            $1 == $need_major
            and ($2 > $need_minor or
                 ($2 == $need_minor and $3 >= $need_beta)));

  ## command execution
  $self->{'command'} = undef; # CQP command string that is currently being processed (undef = last command has been completed)
  $self->{'lines'} = [];      # array of output lines read from CQP process
  $self->{'buffer'} = "";     # read buffer for standard output from CQP process
  $self->{'block_size'} = 256;  # block size for reading from CQP's output and error streams
  $self->{'query_lock'} = undef;# holds random key while query lock mode is active

  ## error handling (messages on stderr)
  $self->{'error_handler'} = undef; # set to subref for user-defined error handler
  $self->{'status'} = 'ok';         # status of last executed command ('ok' or 'error')
  $self->{'error_message'} = [];    # arrayref to array containing message produced by last command (if any)

  ## handling of CQP progress messages
  $self->{'progress'} = 0;             # whether progress messages are activated
  $self->{'progress_handler'} = undef; # optional callback for progress messages
  $self->{'progress_info'} = [];       # contains last available progress information: [$total_percent, $pass, $n_passes, $message, $percent]

  ## debugging (prints more or less everything on stdout)
  $self->{'debug'} = 0;

  ## select vectors for CQP output (stdout, stderr, stdout|stderr)
  $self->{'select_err'} = new IO::Select($err);
  $self->{'select_out'} = new IO::Select($out);
  $self->{'select_any'} = new IO::Select($err, $out);

  ## CQP object setup complete
  bless($self, $class);

  ## the following command will collect and ignore any output which may have been produced during startup
  $self->exec("set PrettyPrint off"); # pretty-printing should be turned off for non-interactive use

  return $self;
}

=item B<undef> I<$cqp>;

Exit CQP background process gracefully by issuing an C<exit;> command.
This is done automatically when the variable I<$cqp> goes out of scope.
Note that there may be a slight delay while B<CWB::CQP> waits for the CQP
process to terminate.

=cut

sub DESTROY {
  my $self = shift;

  if ($self->{'command'}) {
    while ($self->_update) {} # read pending output from active command
  }
  my $out = $self->{'out'};
  if (defined $out) {
    $out->print("exit");        # exit CQP backend
    $out->close;
  }
  my $in = $self->{'in'};
  if (defined $in) {
    $in->close;
  }
  my $pid = $self->{'pid'};
  waitpid $pid, 0; # wait for CQP to exit and reap background process
  ## **TODO** -- this may hang in some cases; is there a safe workaround?
}

=item I<$ok> = I<$cqp>->B<check_version>(I<$major>, I<$minor>, I<$beta>);

Check for minimum required CQP version, i.e. the background process has
to be CQP version I<$major>.I<$minor>.I<$beta> or newer.
I<$minor> and I<$beta> may be omitted, in which case they default to 0.
Note that the B<CWB::CQP> module automatically checks whether the CQP version
is compatible with its own requirements when a new object is created.
The B<check_version> method can subsequently be used to check for a more
recent release that provides functionality needed by the Perl script.

=cut

sub check_version {
  my $self = shift;
  my ($major, $minor, $beta) = @_;
  $minor = 0 unless defined $minor;
  $beta = 0 unless defined $beta;

  my $maj = $self->{'major_version'};
  my $min = $self->{'minor_version'};
  my $bet = $self->{'beta_version'};
  if ($maj > $major or
      ($maj == $major
       and ($min > $minor or
            ($min == $minor and $bet >= $beta)))
     ) {
    return 1;
  }
  else {
    return 0;
  }
}

=item I<$version_string> = I<$cqp>->B<version>;

Returns formatted version string for the CQP background process, e.g. C<2.2.99> or C<3.0>.

=cut

sub version {
  my $self = shift;
  my $version = $self->{'major_version'}.".".$self->{'minor_version'};
  my $beta = $self->{'beta_version'};
  $version .= ".$beta"
    if $beta > 0;
  return $version;
}

## INTERNAL:
##    $lines_read = $self->_update([$timeout]);
## This is the main "workhorse" of the CWB::CQP module.  It checks for output from CQP process
## (stdout and stderr), updates progress status, fills internal buffers, and calls error and
## progress handlers if necessary.  The optional $timeout specifies how many seconds to wait for
## output; the default is 0 seconds, i.e. non-blocking mode, while a negative value blocks.
## NB: $lines_read includes the .EOL. terminator line, so it is safe to keep calling _update()
## until a non-zero value is returned (even if a CQP command fails with an error message).
sub _update {
  my $self = shift;
  my $timeout = shift || 0;
  $timeout = undef
    if $timeout < 0;
  my $stderr_buffer = "";
  my $lines = 0; # how many lines have successfully been read from stdout

  while ($self->{'select_any'}->can_read($timeout)) {
    ## STDERR -- read all available output on stderr first
    if ($self->{'select_err'}->can_read(0)) {
      sysread $self->{'err'}, $stderr_buffer, $self->{'block_size'}, length($stderr_buffer); # append to $stderr_buffer
    }

    ## STDOUT -- if there is no more data on stderr, we should be able to read from stdout
    elsif ($self->{'select_out'}->can_read(0)) {
      sysread $self->{'out'}, $self->{'buffer'}, $self->{'block_size'}, length($self->{'buffer'}); # append to object's input buffer
      if ($self->{'buffer'} =~ /\n/) {
        ## if there's a complete line in the input buffer, split off all lines
        my @new_lines = split /\n/, $self->{'buffer'}, -1; # make sure that last line is handled correctly if buffer ends in \n
        $self->{'buffer'} = pop @new_lines; # last entry is incomplete line ("" if buffer ended in \n) => return to input buffer
        foreach my $line (@new_lines) {
          ## skip blank line printed after each CQP command
          next if $line eq "";
          ## handle progress messages if ProgressBar has been activated
          if ($self->{'progress'} and $line =~ /^-::-PROGRESS-::-/) {
              my ($pass, $n_passes, $message); 
              (undef, $pass, $n_passes, $message) = split /\t/, $line;
              my $percent = ($message =~ /([0-9]+)\%\s*complete/) ? $1+0 : undef; # extract progress percentage, if present
              my $total_percent = (100 * ($pass - 1) + ($percent || 0)) / $n_passes; # estimate total progress ($percent assumed to be 0% if not given)
              $self->{'progress_info'} = [$total_percent, $pass, $n_passes, $message, $percent];
              my $handler = $self->{'progress_handler'};
              if (ref($handler) eq 'CODE') {
                $handler->($total_percent, $pass, $n_passes, $message, $percent); # call user-defined progress handler
              }
          }
          ## regular output lines are collected in object's line buffer
          else {
            push @{$self->{'lines'}}, $line;
            $lines++;
          }
        }
      }
      last if $lines > 0;       # if we have read a line and there is no output on stderr, return from function
    }

    ## ERROR -- we should never reach this point
    else {
      die "CWB::CQP: INTERNAL ERROR in _update() -- no data on stdout or stderr of CQP child process";
    }
  }

  if ($stderr_buffer ne "") {
    $self->{'status'} = 'error'; # any output on stderr indicates that something went wrong
    push @{$self->{'error_message'}}, split /\n/, $stderr_buffer;  # append output on stderr to error message
    $self->error(@{$self->{'error_message'}});                     # may call error handler and abort, or print message and continue
  }
  return $lines;
}

=item I<$cqp>->B<run>(I<$cmd>);

Start a single CQP command I<$cmd> in the background.  This method returns immediately.
Command output can then be read with the B<getline>, B<getlines> and B<getrow> methods.
If asynchronous communication is desired, use B<ready> to check whether output is available.

It is an error to B<run> a new command before the output of the previous command has completely
been processed.

=cut

sub run {
  croak 'USAGE:  $cqp->run($cmd]);'
    unless @_ == 2;
  my $self = shift;
  my $cmd = shift;
  my $debug = $self->{'debug'};

  $cmd =~ s/\n+/ /g;            # make sure there are no newline characters (to be on the safe side)
  $cmd =~ s/(;\s*)+$//;         # ";" will be added when $cmd is sent to CQP

  my $active_cmd = $self->{'command'};
  croak "Error: new CQP command issued while '$active_cmd' is still being processed"
    if $active_cmd;

  $self->{'command'} = "$cmd;";
  $self->{'status'} = 'ok';
  $self->{'buffer'} = "";
  $self->{'lines'} = [];
  $self->{'error_message'} = [];

  print "CQP << $cmd;\n"
    if $debug;
  $self->{'in'}->print("$cmd;\n .EOL.;\n"); # append .EOL. command to mark end of CQP output
}

=item I<$num_of_lines> = I<$cqp>->B<ready>;

=item I<$num_of_lines> = I<$cqp>->B<ready>(I<$timeout>);

Check if output from current CQP command is available for reading with B<getline> etc.,
returning the number of lines currently held in the input buffer (possibly including an
end-of-output marker line that will not be returned by B<getline> etc.).  If there is no
active command, returns B<undef>.

The first form of the command returns immediately.  The second form waits up to I<$timeout>
seconds for CQP output to become available.  Use a negative I<$timeout> for blocking mode.

=cut

sub ready {
  my $self = shift;
  my $timeout = shift;

  my $lines = @{$self->{'lines'}};
  return $lines            # output has already been buffered => ready to read
    if $lines > 0;
  return undef             # no command active => undefined state
    unless $self->{'command'};
  return $self->_update($timeout); # try to read from CQP process & return number of lines available (NB: line buffer was empty before)
}

## INTERNAL: reset internal status after command has been completed, check that there is no extra output



( run in 1.622 second using v1.01-cache-2.11-cpan-d8267643d1d )