Alt-CWB-ambs

 view release on metacpan or  search on metacpan

lib/CWB/CQP.pm  view on Meta::CPAN

  # close down CQP server (exits gracefully)
  undef $cqp;

=head1 DESCRIPTION

A B<CWB::CQP> object represents an instance of the corpus query processor CQP
running as a background process.  By calling suitable methods on this object,
arbitrary CQP commands can be executed and their output can be captured.   
The C<STDERR> stream of the CQP process is monitored for error messages,
which can automatically trigger an error handler.

Every B<CWB::CQP> object has its own CQP background process and communication is
fully asynchronous.  This enables scripts to perform other actions while a long
CQP command is executing, or to run multiple CQP instances in parallel.

=cut

use warnings;
use strict;

use sigtrap qw(die PIPE);       # catch write errors to background CQP process
## $SIG{'CHLD'} = 'IGNORE'; # it would be nice to reap child processes automatically, but this seems to mess up closing pipes

use CWB;
use Carp;
use FileHandle;
use IPC::Open3;
use IO::Select;

## package global variables
our @CQP_options = "-c";        # always run CQP in child mode
our $CQP_version = "2.2.101";    # required version of CQP (checked at startup)

=head1 METHODS

The following methods are available:

=over 4

=item I<$cqp> = B<new> CWB::CQP;

=item I<$cqp> = B<new> CWB::CQP '-r /corpora/registry', '-l /data/cqpresults';

Spawn new CQP background process.  The object I<$cqp> can then be used to communicate with 
this CQP instance.  Optional arguments of the B<new> method are passed as command-line
options to CQP.  Use at your own risk.

=cut

## CWB::CQP object constructor
sub new {
  my $class = shift;            # class name
  my $self = {};                # namespace for new CQP class object
  my @options = @_;             # CQP command-line options (use at your own risk)
  # split options with values, e.g. "-r /my/registry" => "-r", "/my/registry" (doesn't work for multiple options in one string)
  @options = map { (/^(--?[A-Za-z0-9]+)\s+(.+)$/) ? ($1, $2) : $_ } @options;

  ## run CQP server in the background
  my $in = $self->{'in'} = new FileHandle;   # stdin of CQP
  my $out = $self->{'out'} = new FileHandle; # stdout of CQP
  my $err = $self->{'err'} = new FileHandle; # stderr of CQP
  my $pid = open3($in, $out, $err, $CWB::CQP, @CQP_options, @options);
  $self->{'pid'} = $pid; # child process ID (so process can be killed if necessary)
  $in->autoflush(1); # make sure that commands sent to CQP are always flushed immediately

  my ($need_major, $need_minor, $need_beta) = split /\./, $CQP_version; # required CQP version
  $need_beta = 0 unless $need_beta;

  my $version_string = $out->getline; # child mode (-c) should print version on startup
  chomp $version_string;
  croak "ERROR: CQP backend startup failed ('$CWB::CQP @CQP_options @options')\n"
    unless $version_string =~ /^CQP\s+(?:\w+\s+)*([0-9]+)\.([0-9]+)(?:\.b?([0-9]+))?(?:\s+(.*))?$/;
  $self->{'major_version'} = $1;
  $self->{'minor_version'} = $2;
  $self->{'beta_version'} = $3 || 0;
  $self->{'compile_date'} = $4 || "unknown";
  croak "ERROR: CQP version too old, need at least v$CQP_version ($version_string)\n"
    unless ($1 > $need_major or
            $1 == $need_major
            and ($2 > $need_minor or
                 ($2 == $need_minor and $3 >= $need_beta)));

  ## command execution
  $self->{'command'} = undef; # CQP command string that is currently being processed (undef = last command has been completed)
  $self->{'lines'} = [];      # array of output lines read from CQP process
  $self->{'buffer'} = "";     # read buffer for standard output from CQP process
  $self->{'block_size'} = 256;  # block size for reading from CQP's output and error streams
  $self->{'query_lock'} = undef;# holds random key while query lock mode is active

  ## error handling (messages on stderr)
  $self->{'error_handler'} = undef; # set to subref for user-defined error handler
  $self->{'status'} = 'ok';         # status of last executed command ('ok' or 'error')
  $self->{'error_message'} = [];    # arrayref to array containing message produced by last command (if any)

  ## handling of CQP progress messages
  $self->{'progress'} = 0;             # whether progress messages are activated
  $self->{'progress_handler'} = undef; # optional callback for progress messages
  $self->{'progress_info'} = [];       # contains last available progress information: [$total_percent, $pass, $n_passes, $message, $percent]

  ## debugging (prints more or less everything on stdout)
  $self->{'debug'} = 0;

  ## select vectors for CQP output (stdout, stderr, stdout|stderr)
  $self->{'select_err'} = new IO::Select($err);
  $self->{'select_out'} = new IO::Select($out);
  $self->{'select_any'} = new IO::Select($err, $out);

  ## CQP object setup complete
  bless($self, $class);

  ## the following command will collect and ignore any output which may have been produced during startup
  $self->exec("set PrettyPrint off"); # pretty-printing should be turned off for non-interactive use

  return $self;
}

=item B<undef> I<$cqp>;

Exit CQP background process gracefully by issuing an C<exit;> command.
This is done automatically when the variable I<$cqp> goes out of scope.
Note that there may be a slight delay while B<CWB::CQP> waits for the CQP
process to terminate.

=cut

sub DESTROY {
  my $self = shift;

  if ($self->{'command'}) {
    while ($self->_update) {} # read pending output from active command
  }
  my $out = $self->{'out'};
  if (defined $out) {
    $out->print("exit");        # exit CQP backend
    $out->close;
  }
  my $in = $self->{'in'};
  if (defined $in) {
    $in->close;
  }
  my $pid = $self->{'pid'};
  waitpid $pid, 0; # wait for CQP to exit and reap background process
  ## **TODO** -- this may hang in some cases; is there a safe workaround?
}

=item I<$ok> = I<$cqp>->B<check_version>(I<$major>, I<$minor>, I<$beta>);

Check for minimum required CQP version, i.e. the background process has
to be CQP version I<$major>.I<$minor>.I<$beta> or newer.
I<$minor> and I<$beta> may be omitted, in which case they default to 0.
Note that the B<CWB::CQP> module automatically checks whether the CQP version
is compatible with its own requirements when a new object is created.
The B<check_version> method can subsequently be used to check for a more
recent release that provides functionality needed by the Perl script.

=cut

sub check_version {
  my $self = shift;
  my ($major, $minor, $beta) = @_;
  $minor = 0 unless defined $minor;
  $beta = 0 unless defined $beta;

  my $maj = $self->{'major_version'};
  my $min = $self->{'minor_version'};
  my $bet = $self->{'beta_version'};
  if ($maj > $major or
      ($maj == $major
       and ($min > $minor or
            ($min == $minor and $bet >= $beta)))
     ) {
    return 1;
  }
  else {
    return 0;
  }
}

=item I<$version_string> = I<$cqp>->B<version>;

Returns formatted version string for the CQP background process, e.g. C<2.2.99> or C<3.0>.

=cut

sub version {
  my $self = shift;
  my $version = $self->{'major_version'}.".".$self->{'minor_version'};
  my $beta = $self->{'beta_version'};
  $version .= ".$beta"
    if $beta > 0;
  return $version;
}

## INTERNAL:
##    $lines_read = $self->_update([$timeout]);
## This is the main "workhorse" of the CWB::CQP module.  It checks for output from CQP process
## (stdout and stderr), updates progress status, fills internal buffers, and calls error and
## progress handlers if necessary.  The optional $timeout specifies how many seconds to wait for
## output; the default is 0 seconds, i.e. non-blocking mode, while a negative value blocks.
## NB: $lines_read includes the .EOL. terminator line, so it is safe to keep calling _update()
## until a non-zero value is returned (even if a CQP command fails with an error message).
sub _update {
  my $self = shift;
  my $timeout = shift || 0;
  $timeout = undef
    if $timeout < 0;
  my $stderr_buffer = "";
  my $lines = 0; # how many lines have successfully been read from stdout

  while ($self->{'select_any'}->can_read($timeout)) {
    ## STDERR -- read all available output on stderr first
    if ($self->{'select_err'}->can_read(0)) {
      sysread $self->{'err'}, $stderr_buffer, $self->{'block_size'}, length($stderr_buffer); # append to $stderr_buffer
    }

    ## STDOUT -- if there is no more data on stderr, we should be able to read from stdout
    elsif ($self->{'select_out'}->can_read(0)) {
      sysread $self->{'out'}, $self->{'buffer'}, $self->{'block_size'}, length($self->{'buffer'}); # append to object's input buffer
      if ($self->{'buffer'} =~ /\n/) {
        ## if there's a complete line in the input buffer, split off all lines
        my @new_lines = split /\n/, $self->{'buffer'}, -1; # make sure that last line is handled correctly if buffer ends in \n
        $self->{'buffer'} = pop @new_lines; # last entry is incomplete line ("" if buffer ended in \n) => return to input buffer
        foreach my $line (@new_lines) {
          ## skip blank line printed after each CQP command
          next if $line eq "";
          ## handle progress messages if ProgressBar has been activated
          if ($self->{'progress'} and $line =~ /^-::-PROGRESS-::-/) {
              my ($pass, $n_passes, $message); 
              (undef, $pass, $n_passes, $message) = split /\t/, $line;
              my $percent = ($message =~ /([0-9]+)\%\s*complete/) ? $1+0 : undef; # extract progress percentage, if present
              my $total_percent = (100 * ($pass - 1) + ($percent || 0)) / $n_passes; # estimate total progress ($percent assumed to be 0% if not given)
              $self->{'progress_info'} = [$total_percent, $pass, $n_passes, $message, $percent];
              my $handler = $self->{'progress_handler'};
              if (ref($handler) eq 'CODE') {
                $handler->($total_percent, $pass, $n_passes, $message, $percent); # call user-defined progress handler
              }
          }
          ## regular output lines are collected in object's line buffer
          else {
            push @{$self->{'lines'}}, $line;
            $lines++;
          }
        }
      }
      last if $lines > 0;       # if we have read a line and there is no output on stderr, return from function
    }

    ## ERROR -- we should never reach this point
    else {
      die "CWB::CQP: INTERNAL ERROR in _update() -- no data on stdout or stderr of CQP child process";
    }
  }

  if ($stderr_buffer ne "") {
    $self->{'status'} = 'error'; # any output on stderr indicates that something went wrong
    push @{$self->{'error_message'}}, split /\n/, $stderr_buffer;  # append output on stderr to error message
    $self->error(@{$self->{'error_message'}});                     # may call error handler and abort, or print message and continue
  }
  return $lines;
}

=item I<$cqp>->B<run>(I<$cmd>);

Start a single CQP command I<$cmd> in the background.  This method returns immediately.
Command output can then be read with the B<getline>, B<getlines> and B<getrow> methods.
If asynchronous communication is desired, use B<ready> to check whether output is available.

It is an error to B<run> a new command before the output of the previous command has completely
been processed.

=cut

sub run {
  croak 'USAGE:  $cqp->run($cmd]);'
    unless @_ == 2;
  my $self = shift;
  my $cmd = shift;
  my $debug = $self->{'debug'};

  $cmd =~ s/\n+/ /g;            # make sure there are no newline characters (to be on the safe side)
  $cmd =~ s/(;\s*)+$//;         # ";" will be added when $cmd is sent to CQP

  my $active_cmd = $self->{'command'};
  croak "Error: new CQP command issued while '$active_cmd' is still being processed"
    if $active_cmd;

  $self->{'command'} = "$cmd;";
  $self->{'status'} = 'ok';
  $self->{'buffer'} = "";
  $self->{'lines'} = [];
  $self->{'error_message'} = [];

  print "CQP << $cmd;\n"
    if $debug;
  $self->{'in'}->print("$cmd;\n .EOL.;\n"); # append .EOL. command to mark end of CQP output
}

=item I<$num_of_lines> = I<$cqp>->B<ready>;

=item I<$num_of_lines> = I<$cqp>->B<ready>(I<$timeout>);

Check if output from current CQP command is available for reading with B<getline> etc.,
returning the number of lines currently held in the input buffer (possibly including an
end-of-output marker line that will not be returned by B<getline> etc.).  If there is no
active command, returns B<undef>.

The first form of the command returns immediately.  The second form waits up to I<$timeout>
seconds for CQP output to become available.  Use a negative I<$timeout> for blocking mode.

=cut

sub ready {
  my $self = shift;
  my $timeout = shift;

  my $lines = @{$self->{'lines'}};

lib/CWB/CQP.pm  view on Meta::CPAN

        if $n_el >= 4;
    }
    else {
      croak "CQP: all rows in undump table must have the same length (first row = $n_el, this row = $row_el)"
        unless $row_el == $n_el;
    }
    $tf->write(join("\t", @$row), "\n");
  }
  $tf->finish;

  # now send undump command with filename of temporary file
  my $tempfile = $tf->name;
  $self->exec("undump $nqr $with < 'gzip -cd $tempfile |'");

  $tf->close;                                   # delete temporary file

  return $self->ok;                             # return success status of undump command
}

=item I<$status> = I<$cqp>->B<status>;  # "ok" or "error"

=item I<$ok> = I<$cqp>->B<ok>;

=item I<@lines> = I<$cqp>->B<error_message>;

=item I<$cqp>->B<error>(I<@message>);

Error handling functions.  B<status> returns the status of the last CQP command executed, which is either C<'ok'> or C<'error'>.  B<ok> returns B<true> or B<false>, depending on whether the last command was completed successfully (i.e., it is a simpl...

B<error> is an internal function used to report CQP errors.  It may also be of interest to application programs if a suitable error handler has been defined (see below).

=cut

## query CQP object's status and error messages
sub status {
  my $self = shift;

  return $self->{'status'};
}

sub ok {
  my $self = shift;
  return ($self->status eq 'ok'); # convenient wrapper function to check for CQP errors
}

sub error_message {
  my $self = shift;
  my $aref = $self->{'error_message'};

  return @{$aref};
}

## throw CQP error (optionally through user-defined error handler)
sub error {
  my $self = shift;

  if (ref $self->{'error_handler'} eq 'CODE') {
    $self->{'error_handler'}->(@_); # call error handler if a suitable subref has been installed
  }
  else {
    warn "\n", "=+===CWB::CQP ERROR=====\n", # default behaviour is to issue a warning on stderr
      (map {" | $_\n"} @_), "=+======================\n"; 
  }
}

=item I<$cqp>->B<set_error_handler>(I<&my_error_handler>);

=item I<$cqp>->B<set_error_handler>('die' | 'warn' | 'ignore');

The first form of the B<set_error_handler> method activates a user-defined error handler.  The argument is a reference to a named or anonymous subroutine, which will be called whenever a CQP error is detected (or an error is raised explicitly with th...

The second form of the method activates one of the built-in error handlers:

=over 4

=item *

B<C<'die'>> aborts program execution with an error message; this handler is particularly convenient for one-off scripts or command-line utilities that do not need to recover from error conditions.

=item *

B<C<'warn'>> prints the error message on STDERR, but continues program execution.  This is the default error handler of a new B<CWB::CQP> object.

=item *

B<C<'ignore'>> silently ignores all errors.  The application script should check for error conditions after every CQP command, using the B<ok> or B<status> method.

=back 

=cut

## set user-defined error handler (or built-in handlers 'die', 'warn' [default], 'ignore')
sub set_error_handler {
  my $self = shift;
  my $handler = shift;

  if (defined $handler) {
    my $type = ref $handler;
    if ($type ne 'CODE') {
      $handler = lc($handler);
      croak 'USAGE:  $cqp->set_error_handler( \&my_error_handler | "die" | "warn" | "ignore" );'
        unless $handler =~ /^(die|warn|ignore)$/;
      if ($handler eq 'die') {
        $handler = \&_error_handler_die;
      }
      elsif ($handler eq 'warn') {
        $handler = undef;       # default behaviour if no error handler is specified
      }
      elsif ($handler eq 'ignore') {
        $handler = \&_error_handler_ignore;
      }
    }
  }
  $self->{'error_handler'} = $handler;
}

## INTERNAL: built in error handlers for 'die' and 'ignore' modes
sub _error_handler_die {
  croak "\n", "=+===CWB::CQP ERROR=====\n", (map {" | $_\n"} @_), "=+== occurred";
}



( run in 2.462 seconds using v1.01-cache-2.11-cpan-8f98c5d2c55 )