Alvis-Pipeline
view release on metacpan or search on metacpan
lib/Alvis/Pipeline/Read.pm view on Meta::CPAN
# $Id: Read.pm,v 1.12 2006/08/14 17:24:04 mike Exp $
package Alvis::Pipeline::Read;
use vars qw(@ISA);
@ISA = qw(Alvis::Pipeline);
use strict;
use warnings;
use IO::File;
use IO::Socket::INET;
use Fcntl qw(:flock);
sub new {
my $class = shift();
my(%opts) = @_;
my $this = bless {}, $class;
$this->{spooldir} = delete $opts{spooldir}
or die "new($class) with no spooldir";
$this->{port} = delete $opts{port}
or die "new($class) with no port";
$this->_setopts(sleep => 10, %opts);
# Asynchronicity: server process accepts pushes and stores them
$this->log(1, "forking");
my $pid = fork();
die "couldn't fork: $!" if !defined $pid;
if ($pid == 0) {
# Child process
$this->_start_server();
die "_start_server() returned! It should never do that";
}
# Automatic reaper prevents the child going zombie when we kill
# it. (Yes, "IGNORE" has a special-case meaning for SIGCHLD.)
$SIG{CHLD} = 'IGNORE';
$this->{pid} = $pid;
$this->log(1, "parent $$ spawned harvesting child, pid=$pid");
return $this;
}
sub read {
my $this = shift();
my($block) = @_;
$this->log(2, "parent reading from spooldir");
my $dir = $this->{spooldir};
my($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
while ($lastread == $lastwrite) {
$this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
return undef if !$block;
$this->log(2, "no document yet, sleeping");
sleep $this->option("sleep");
($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
}
$lastread++;
my $filename = "$dir/$lastread";
my $f2 = new IO::File("<$filename")
or die "can't read file '$filename': $!";
binmode $f2, ":utf8";
my $doc = join("", <$f2>);
$f2->close();
unlink($filename);
$this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
return $doc;
}
sub close {
my $this = shift();
# We need to kill the child process that is running the OAI server
lib/Alvis/Pipeline/Read.pm view on Meta::CPAN
# number is that of the last document written to the spool directory,
# or zero if no document has yet been written. If the two numbers are
# equal, there are no documents available to be read.
#
# _lock_and_read() and _write_and_unlock() together implement safe
# maintenance of the sequence file. The former returns a filehandle,
# locked; it is the caller's responsibility to unlock the returned
# filehandle using _write_and_unlock(), like this:
# ($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
# # Do some stuff
# $this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
#
sub _lock_and_read {
my $this = shift();
my($dir) = @_;
my $seqfile = "$dir/seq";
my $fh;
if (! -d $dir) {
mkdir($dir, 0777)
or die "can't create directory '$dir': $!";
my $f = new IO::File(">$seqfile")
or die "can't create initial '$seqfile': $!";
$f->close();
}
$fh = new IO::File("+<$seqfile")
or die "can't read '$seqfile': $!";
flock($fh, LOCK_EX) or die "can't lock '$seqfile': $!";
seek($fh, 0, SEEK_SET) or die "can't seek to start of '$seqfile': $!";
my($lastread, $lastwrite);
my $line = $fh->getline();
if (defined $line) {
($lastread, $lastwrite) = ($line =~ /(\d+) (\d+)/);
} else {
# File is empty: must have just been created
$lastread = $lastwrite = 0;
}
$this->log(3, "got lastread='$lastread', lastwrite='$lastwrite'");
return ($fh, $lastread, $lastwrite);
}
sub _write_and_unlock {
my $this = shift();
my($dir, $fh, $lastread, $lastwrite) = @_;
my $seqfile = "$dir/seq";
use Carp;
seek($fh, 0, SEEK_SET) or confess "can't seek to start of '$seqfile': $!";
$fh->print("$lastread $lastwrite\n") or die "can't rewrite '$seqfile': $!";
flock($fh, LOCK_UN) or die "can't unlock '$seqfile': $!";
$fh->close() or die "Truly unbelievable";
$this->log(3, "put lastread='$lastread', lastwrite='$lastwrite'");
}
# Test harness follows
# my $p = bless {
# spooldir => "/tmp/ap",
# }, "Alvis::Pipeline::Read";
#
# if (@ARGV) {
# $p->_store_file(join("", @ARGV));
# } else {
# my $doc = $p->read();
# die "no document queued" if !defined $doc;
# print $doc;
# }
1;
( run in 3.272 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )