Net-Z3950-AsyncZ

 view release on metacpan or  search on metacpan

AsyncZ.pm  view on Meta::CPAN

  $raw = delZ_serverName($raw,1);
  $raw = delZ_header($raw,1,'<!##!>');
  return $raw;
}

# param:  ref to string of raw records
# return next record
sub get_ZRawRec {
  my $raw = shift;
  return undef if ! $raw;

  if ($$raw !~ /<!##!>/) {  # presumed last record
   my $rec =  $$raw;
   $$raw = "";
   return $rec; 
  }
  $$raw =~ s/(.*?)<!##!>//;    
  return $1;
}

# tests whether line is our substitue for absence of Report:
#	 {!-- library.anu.edu.au --}
#  It reports previous server's name in curlies, substituted for angle brackets
#  (like HTML comment) which hold server name in header of each report item
sub noZ_Response {  $_[0]=~/\{!--\s+.*\s+--\}/; }

# tests if line contains server name
sub isZ_ServerName { $_[0] =~ /<!--(.*)-->/; }
sub isZ_PID { $_[0] =~ /<#--\d+-->/; }
sub isZ_Info { &isZ_PID || &noZ_Response; }
# returns server name
sub Z_serverName {   
         if( $_[0] =~ /<!--(.*?)-->/){
           return $1 if $1;
         }
	 return undef;
}

# returns 0 if not an error
# returns 2 if cycle 2 error
# returns 1 if non-recoverable cycle 1 error
 sub isZ_Error {
    my $err = shift;
    return 0 if !$err; 
    return 2 if $err->[0] && $err->[1];
    return 1 if $err->[0] && !$err->[0]->{retry};
    return 0;
}

# tests return value of isZ_Error()
# returns true if the error was a cycle 1 fatal error
sub isZ_nonRetryable {  $_[0] == 1;  }



{

my @results=();
my @errors=();
my @recSize = ();
my $busy = 0;
my $utf8_init = 0;

 sub is_utf8_init {      
   $utf8_init;
 }

 sub set_uft8_init {
   $utf8_init = 1;
 }

 sub _utf8 {
    my $index = shift;    

    _setupUTF8() if !$utf8_init;
    return if !$utf8_init;

    my $cs = MARC::Charset->new();
    for(my $i = 0; $i < scalar(@{$results[$index]}); $i++) {
                 $results[$index]->[$i] = $cs->to_utf8($results[$index]->[$i]);
    }
 }

 sub _saveResults {
    $busy = 1; 
    my ($arr, $index) = @_;
    $results[$index] = $arr;      
    $busy = 0;
 }

 sub _saveErrors {
    @errors = @_; 
 }

 sub _isBusy { return $busy; }

# returns reference to results array
 sub getResult {
    my ($self,$index) = @_; 
    _utf8($index) if $self->{options}[$index]-> _getFieldValue('utf8');
    return $results[$index];
 }

sub getZ_RecSize { $recSize[$_[0]];  }

 sub getErrors {
    my ($self,$index) = @_; 
    return [$errors[$index]->[0], $errors[$index]->[1]] if $errors[$index];
    return undef;
 }


sub getMaxErrors { return scalar @errors; }

sub _callback {  
  $busy = 1;
  my ($self, $index) = @_;
  _utf8($index) if $self->{options}[$index]-> _getFieldValue('utf8');
  my $cb = $self->{options}[$index]-> _getFieldValue('cb');
  $cb = $self->{cb} if !$cb;  

  my $last_el = scalar(@{$results[$index]})-1;
  my $size = $results[$index]->[$last_el]; 
  $size =~ /\*==(\d+)==\*/;
  $recSize[$index] = $1 ? $1 : 0;  
  $results[$index]->[$last_el] =~s/\*==(\d+)==\*//;  
  
  &$cb($index, $results[$index]) if $cb;
  $busy = 0;
}

}



#-------------------------------------------------------------------#
# private paramaters:
#       start:     start time for timers
#	zl:	   array of forked processes 
#	errors:    reference to Net::Z3950::AsyncZ::Errors object for main process
#	share:     reference to IPC::ShareLite
#       timer:	   reference to timer watcher 
#	unlooped:  notifies DESTROY when all pipes have been processed,		
#		   because DESTROY is called for each closed pipe--hence
#		   makes it safe to do cleanup that applies to main process
#      monitor_pid:  pid of the monitor, for killing it
#--------------------------------------------------------------------#
sub new {
my($class, %args) = @_;
my $index = 0;

my $self = { 
	          start => time(), zl => [], query=>$args{query}, errors=>undef,
        	  log=>$args{log} || undef, cb=>$args{cb}, timer => undef,
                  timeout=>$args{timeout}  || 25, timeout_min=>$args{timeout_min} || 5,
                  interval => $args{interval} || 1, servers=>$args{servers},
                  options=>$args{options}, unlooped=>0, maxpipes=>$args{maxpipes} || 4, 
                  share => undef, monitor => 0 || $args{monitor}, monitor_pid=>undef,
 		  swap_check => $args{swap_check} || 0, swap_attempts => $args{swap_attempts} || 5 
          };

	bless $self,$class;        
        $self->{ errors } = Net::Z3950::AsyncZ::Errors->new($self->{log});

%forkedPID=(); 
%exitCode=();  
%resultTable = ();
        
	my $incr = $self->{maxpipes};
        $self->{share} = new IPC::ShareLite( -key => $$ +  5000, 
                                -create  => 'yes', 
                                -destroy => 'yes');
      $self->{monitor_pid} = $self->_monitor() if $self->{monitor};
      
      $SIG{HUP} = sub {             
          $self->{abort} = 1;          
          $self->{unlooped} = 1;   # notify DESTROY that it's safe to kill outstanding processes              
          $! = 227;
          die "Aborting." 
       };
      

      $self->processHosts(-1,%args);

		#  retry servers that returned without error fatal codes
      my @retries = $self->_getReTries();  
      $args{'servers'} =  \@retries;
      $self->{'servers'} = $args{'servers'};
      $self->processHosts(-2, %args);      



( run in 0.908 second using v1.01-cache-2.11-cpan-39bf76dae61 )