Alvis-Pipeline
view release on metacpan or search on metacpan
lib/Alvis/Pipeline/Read.pm view on Meta::CPAN
$this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
return undef if !$block;
$this->log(2, "no document yet, sleeping");
sleep $this->option("sleep");
($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
}
$lastread++;
my $filename = "$dir/$lastread";
my $f2 = new IO::File("<$filename")
or die "can't read file '$filename': $!";
binmode $f2, ":utf8";
my $doc = join("", <$f2>);
$f2->close();
unlink($filename);
$this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
return $doc;
}
sub close {
my $this = shift();
# We need to kill the child process that is running the OAI server
# so that its Internet port is cleared for subsequent invocations.
# Also so that the parent can exit cleanly.
my $pid = $this->{pid};
kill 15, $pid;
sleep 1;
if (kill 0, $pid) {
warn "kill -15 failed; killing $pid with rude signal 9";
kill 9, $pid;
sleep 1;
}
if (kill 0, $pid) {
die "can't kill child server with pid $pid";
}
}
sub _start_server {
my $this = shift();
$this->log(1, "opening listener on port ", $this->{port});
my $listener = new IO::Socket::INET(Listen => 1,
LocalPort => $this->{port},
Proto => "tcp",
ReuseAddr => 1)
or die("can't listen on port '" . $this->{port} . "': $!");
while (1) {
$this->log(1, "accepting connection");
$this->{socket} = $listener->accept()
or die "can't accept connection: $!";
binmode $this->{socket}, ":utf8";
$this->log(1, "started background process, pid $$");
while (1) {
my $doc = $this->_read();
last if !defined $doc;
$this->_store_file($doc);
}
}
}
sub _read {
my $this = shift();
my $s = $this->{socket}
or die "$this reading from non-existent socket";
my $magic = $s->getline();
return undef if !defined $magic;
$magic eq "Alvis::Pipeline\n" or die "incorrect magic '$magic'";
my $version = $s->getline() or die "can't get protocol version: $!";
$version == 1 or die "unsupported protocol version '$version'";
my $count = $s->getline() or die "can't get object-length byte-count: $!";
chomp($count);
my $buf;
my $nread = $s->read($buf, $count); ### multiple reads may be necessary
die "can't read document: $!" if !defined $nread;
die "document was short: $nread of $count bytes" if $nread != $count;
my $term = $s->getline() or die "can't get terminator: $!";
$term eq "--end--\n" or die "incorrect terminator '$term'";
return $buf;
}
sub _store_file {
my $this = shift();
my($doc) = @_;
$this->log(2, "child writing to spooldir");
my $dir = $this->{spooldir};
my($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
$lastwrite++;
my $filename = "$dir/$lastwrite";
my $f2 = new IO::File(">$filename")
or die "can't create new file '$filename': $!";
binmode $f2, ":utf8";
$f2->print($doc) or die "can't write '$filename': $!";
$f2->close() or die "can't close '$filename': $!";
$this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
}
# A sequence file called "seq" is maintained in the spool directory,
# and is always locked when read and rewritten. The invariant it
# preserves between lock-read-write operations is that it contains two
# numbers, space-serarate, followed by a newline. The first number is
# that of the last document read from the spool directory. The second
# number is that of the last document written to the spool directory,
# or zero if no document has yet been written. If the two numbers are
( run in 1.170 second using v1.01-cache-2.11-cpan-f56aa216473 )