DTA-CAB
view release on metacpan or search on metacpan
CAB/Queue/Server.pm view on Meta::CPAN
}
sub addcounts {
$_[0]{ntok} += $_[1] if (defined($_[1]));
$_[0]{nchr} += $_[2] if (defined($_[2]));
return @{$_[0]}{qw(ntok nchr)};
}
## $qs = $qs->addblock(\%blk)
## + append block \%blk to appropriate output file if possible, or save it for later
## + %blk should have keys: (off=>$nbytes, len=>$nbytes, ofile=>$ofilename, fmt=>$class, data=>\$data, ...)
sub addblock {
my ($qs,$blk) = @_;
##-- push block to block-tracker's ($bt) pending list
my ($bt);
$bt = $qs->{blocks}{$blk->{ofile}} = {cur=>0,pending=>[]} if (!defined($bt=$qs->{blocks}{$blk->{ofile}}));
push(@{$bt->{pending}}, $blk);
##-- greedy append
if ($blk->{id}[0] == $bt->{cur}) {
my $fmt = DTA::CAB::Format->newFormat($blk->{ofmt} || $DTA::CAB::Format::CLASS_DEFAULT);
@{$bt->{pending}} = sort {$a->{id}[0]<=>$b->{id}[0]} @{$bt->{pending}};
while (@{$bt->{pending}} && $bt->{pending}[0]{id}[0]==$bt->{cur}) {
$blk=shift(@{$bt->{pending}});
$qs->vlog($qs->{logBlock}, "BLOCK_APPEND(ofile=$blk->{ofile}, id=$blk->{id}[0]/$blk->{id}[1], ioff=$blk->{ioff}, ilen=$blk->{ilen}, iend=".($blk->{ioff}+$blk->{ilen}).")");
$fmt->blockAppend($blk);
$bt->{cur}++;
}
} else {
$qs->vlog($qs->{logBlock}, "BLOCK_DELAY(ofile=$blk->{ofile}, id=$blk->{id}[0]/$blk->{id}[1])");
}
return $qs;
}
##==============================================================================
## Server Methods
## $class = $CLASS_OR_OBJECT->clientClass()
## + default client class, used by newClient()
sub clientClass {
return 'DTA::CAB::Queue::ClientConn';
}
## $client = $CLASS_OR_OBJECT->newClient(%args)
## + wrapper for clients, called by $s->accept()
## + default just calls $CLASS_OR_OBJECT->clientClass->new(%args)
sub newClient {
my $that = shift;
return $that->clientClass->new(@_, logSocket=>$that->{logSocket});
}
## $cli_or_undef = $qs->accept()
## $cli_or_undef = $qs->accept($timeout_secs)
## + accept incoming client connections with optional timeout
## + INHERITED from DTA::CAB::Socket
## $rc = $qs->handleClient($cli)
## $rc = $qs->handleClient($cli, %callbacks)
## + handle a single client request
## + INHERITED from DTA::CAB::Socket
##--------------------------------------------------------------
## Server Methods: Request Handling
##
## + request commands (case-insensitive) handled here:
## ADDCOUNTS $NN : add to total number of (tokens,characters) processed; arg (string) $NN=pack('NN',$ntok,$chr); no response
## ADDBLOCK $blk : block output; $blk is a HASH-ref passed to $qs->block($blk); no response
## DEQ : dequeue the first item in the queue; response: $cli->put($item)
## DEQ_STR : dequeue a string reference; response: $cli->put_str(\$item)
## DEQ_REF : dequeue a reference; response: $cli->put_ref($item)
## ENQ $item : enqueue an item; no response
## ENQ_STR $str : enqueue a string-reference; no response
## ENQ_REF $ref : enqueue a reference; no response
## SIZE : get current queue size; response=STRING $qs->size()
## STATUS : get queue status response: STRING $qs->{status}
## CLEAR : clear queue; no response
## QUIT : close client connection; no response
## ... : other messages are passed to $callback->(\$request,$cli) or produce an error
## + returns: same as $callback->() if called, otherwise $qs
## $qs = $qs->handle_deq($cli,\$cmd)
## $qs = $qs->handle_deq_str($cli,\$cmd)
## $qs = $qs->handle_deq_ref($cli,\$cmd)
## + implements "$item = DEQ", "\$str = DEQ_STR", "$ref = DEQ_REF"
BEGIN {
*handle_deq_str = *handle_deq_ref = \&handle_deq;
}
sub handle_deq {
my ($qs,$cli,$creq) = @_;
my $cmd = lc($$creq);
my $qu = $qs->{queue};
if ($cmd =~ /^deq(?:_ref|_str)?$/) {
##-- DEQ: dequeue an item
if (!@{$qs->{queue}}) { $cli->put_eoq(); }
elsif ($cmd eq 'deq') { $cli->put( $qu->[0] ); }
elsif ($cmd eq 'deq_str') { $cli->put_str( ref($qu->[0]) ? $qu->[0] : \$qu->[0] ); }
elsif ($cmd eq 'deq_ref') { $cli->put_ref( ref($qu->[0]) ? $qu->[0] : \$qu->[0] ); }
shift(@$qu);
}
return $qs;
}
## $qs = $qs->handle_enq($cli,\$cmd)
## + implements "ENQ $item"
sub handle_enq {
my ($qs,$cli,$creq) = @_;
my $buf = undef;
my $ref = $cli->get(\$buf);
push(@{$qs->{queue}}, ($ref eq \$buf ? $buf : $ref));
return $qs;
}
## $qs = $qs->handle_enq_str($cli,\$cmd)
## $qs = $qs->handle_enq_ref($cli,\$cmd)
## + implements "ENQ_STR \$str", "ENQ_REF $ref"
BEGIN {
*handle_enq_str = *handle_enq_ref;
( run in 1.655 second using v1.01-cache-2.11-cpan-437f7b0c052 )