DTA-CAB

 view release on metacpan or  search on metacpan

CAB/Queue/Server.pm  view on Meta::CPAN

}
sub addcounts {
  $_[0]{ntok} += $_[1] if (defined($_[1]));
  $_[0]{nchr} += $_[2] if (defined($_[2]));
  return @{$_[0]}{qw(ntok nchr)};
}

## $qs = $qs->addblock(\%blk)
##   + append block \%blk to appropriate output file if possible, or save it for later
##   + %blk should have keys: (off=>$nbytes, len=>$nbytes, ofile=>$ofilename, fmt=>$class, data=>\$data, ...)
sub addblock {
  my ($qs,$blk) = @_;

  ##-- push block to block-tracker's ($bt) pending list
  my ($bt);
  $bt = $qs->{blocks}{$blk->{ofile}} = {cur=>0,pending=>[]} if (!defined($bt=$qs->{blocks}{$blk->{ofile}}));
  push(@{$bt->{pending}}, $blk);

  ##-- greedy append
  if ($blk->{id}[0] == $bt->{cur}) {
    my $fmt = DTA::CAB::Format->newFormat($blk->{ofmt} || $DTA::CAB::Format::CLASS_DEFAULT);
    @{$bt->{pending}} = sort {$a->{id}[0]<=>$b->{id}[0]} @{$bt->{pending}};
    while (@{$bt->{pending}} && $bt->{pending}[0]{id}[0]==$bt->{cur}) {
      $blk=shift(@{$bt->{pending}});

      $qs->vlog($qs->{logBlock}, "BLOCK_APPEND(ofile=$blk->{ofile}, id=$blk->{id}[0]/$blk->{id}[1], ioff=$blk->{ioff}, ilen=$blk->{ilen}, iend=".($blk->{ioff}+$blk->{ilen}).")");
      $fmt->blockAppend($blk);
      $bt->{cur}++;
    }
  } else {
    $qs->vlog($qs->{logBlock}, "BLOCK_DELAY(ofile=$blk->{ofile}, id=$blk->{id}[0]/$blk->{id}[1])");
  }

  return $qs;
}


##==============================================================================
## Server Methods

## $class = $CLASS_OR_OBJECT->clientClass()
##  + default client class, used by newClient()
sub clientClass {
  return 'DTA::CAB::Queue::ClientConn';
}

## $client = $CLASS_OR_OBJECT->newClient(%args)
##  + wrapper for clients, called by $s->accept()
##  + default just calls $CLASS_OR_OBJECT->clientClass->new(%args)
sub newClient {
  my $that = shift;
  return $that->clientClass->new(@_, logSocket=>$that->{logSocket});
}

## $cli_or_undef = $qs->accept()
## $cli_or_undef = $qs->accept($timeout_secs)
##  + accept incoming client connections with optional timeout
##  + INHERITED from DTA::CAB::Socket

## $rc = $qs->handleClient($cli)
## $rc = $qs->handleClient($cli, %callbacks)
##  + handle a single client request
##  + INHERITED from DTA::CAB::Socket

##--------------------------------------------------------------
## Server Methods: Request Handling
##
##  + request commands (case-insensitive) handled here:
##     ADDCOUNTS $NN : add to total number of (tokens,characters) processed; arg (string) $NN=pack('NN',$ntok,$chr); no response
##     ADDBLOCK $blk : block output; $blk is a HASH-ref passed to $qs->block($blk); no response
##     DEQ           : dequeue the first item in the queue; response: $cli->put($item)
##     DEQ_STR       : dequeue a string reference; response: $cli->put_str(\$item)
##     DEQ_REF       : dequeue a reference; response: $cli->put_ref($item)
##     ENQ $item     : enqueue an item; no response
##     ENQ_STR $str  : enqueue a string-reference; no response
##     ENQ_REF $ref  : enqueue a reference; no response
##     SIZE          : get current queue size; response=STRING $qs->size()
##     STATUS        : get queue status response: STRING $qs->{status}
##     CLEAR         : clear queue; no response
##     QUIT          : close client connection; no response
##     ...           : other messages are passed to $callback->(\$request,$cli) or produce an error
##  + returns: same as $callback->() if called, otherwise $qs


## $qs = $qs->handle_deq($cli,\$cmd)
## $qs = $qs->handle_deq_str($cli,\$cmd)
## $qs = $qs->handle_deq_ref($cli,\$cmd)
##  + implements "$item = DEQ", "\$str = DEQ_STR", "$ref = DEQ_REF"
BEGIN {
  *handle_deq_str = *handle_deq_ref = \&handle_deq;
}
sub handle_deq {
  my ($qs,$cli,$creq) = @_;
  my $cmd = lc($$creq);
  my $qu  = $qs->{queue};
  if ($cmd =~ /^deq(?:_ref|_str)?$/) {
    ##-- DEQ: dequeue an item
    if    (!@{$qs->{queue}})  { $cli->put_eoq(); }
    elsif ($cmd eq 'deq')     { $cli->put( $qu->[0] ); }
    elsif ($cmd eq 'deq_str') { $cli->put_str( ref($qu->[0]) ? $qu->[0] : \$qu->[0] ); }
    elsif ($cmd eq 'deq_ref') { $cli->put_ref( ref($qu->[0]) ? $qu->[0] : \$qu->[0] ); }
    shift(@$qu);
  }
  return $qs;
}

## $qs = $qs->handle_enq($cli,\$cmd)
##  + implements "ENQ $item"
sub handle_enq {
  my ($qs,$cli,$creq) = @_;
  my $buf = undef;
  my $ref = $cli->get(\$buf);
  push(@{$qs->{queue}}, ($ref eq \$buf ? $buf : $ref));
  return $qs;
}

## $qs = $qs->handle_enq_str($cli,\$cmd)
## $qs = $qs->handle_enq_ref($cli,\$cmd)
##  + implements "ENQ_STR \$str", "ENQ_REF $ref"
BEGIN {
  *handle_enq_str = *handle_enq_ref;



( run in 1.655 second using v1.01-cache-2.11-cpan-437f7b0c052 )