Alvis-Pipeline
view release on metacpan or search on metacpan
lib/Alvis/Pipeline/Read.pm view on Meta::CPAN
return undef if !$block;
$this->log(2, "no document yet, sleeping");
sleep $this->option("sleep");
($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
}
$lastread++;
my $filename = "$dir/$lastread";
my $f2 = new IO::File("<$filename")
or die "can't read file '$filename': $!";
binmode $f2, ":utf8";
my $doc = join("", <$f2>);
$f2->close();
unlink($filename);
$this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
return $doc;
}
sub close {
lib/Alvis/Pipeline/Read.pm view on Meta::CPAN
LocalPort => $this->{port},
Proto => "tcp",
ReuseAddr => 1)
or die("can't listen on port '" . $this->{port} . "': $!");
while (1) {
$this->log(1, "accepting connection");
$this->{socket} = $listener->accept()
or die "can't accept connection: $!";
binmode $this->{socket}, ":utf8";
$this->log(1, "started background process, pid $$");
while (1) {
my $doc = $this->_read();
last if !defined $doc;
$this->_store_file($doc);
}
}
}
lib/Alvis/Pipeline/Read.pm view on Meta::CPAN
my($doc) = @_;
$this->log(2, "child writing to spooldir");
my $dir = $this->{spooldir};
my($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
$lastwrite++;
my $filename = "$dir/$lastwrite";
my $f2 = new IO::File(">$filename")
or die "can't create new file '$filename': $!";
binmode $f2, ":utf8";
$f2->print($doc) or die "can't write '$filename': $!";
$f2->close() or die "can't close '$filename': $!";
$this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
}
# A sequence file called "seq" is maintained in the spool directory,
# and is always locked when read and rewritten. The invariant it
# preserves between lock-read-write operations is that it contains two
lib/Alvis/Pipeline/Write.pm view on Meta::CPAN
$this->{port} = delete $opts{port}
or die "new($class) with no port";
$this->_setopts(%opts);
$this->{socket} = new IO::Socket::INET(PeerAddr => $this->{host},
PeerPort => $this->{port},
Proto => "tcp")
or die("can't connect to '" . $this->{host} . "', ",
"port '" . $this->{port} . "': $!");
binmode $this->{socket}, ":utf8";
return $this;
}
# Protocol. Each packet consists of the following:
# 1. Magic string "Alvis::Pipeline\n"
# 2. Decimal-rendered protocol version-number [initially 1] followed by "\n"
# 3. Decimal-rendered integer byte-count, followed by "\n"
# 4. Binary object of length specified in #2.
# 5. Magic string "--end--\n";
t/2-unicode.t view on Meta::CPAN
}
ok(1, "reading parent $$ spawned writing child with pid=$pid");
# Parent: read documents generated by child
my $pipe = new Alvis::Pipeline::Read(spooldir => $spooldir, port => $port,
loglevel => 0, sleep => 1)
or die "can't make read-pipe with spooldir='$spooldir', port='$port': $!";
# It's very, very stupid that we have to do this.
binmode Test::Builder::new()->output(), ":utf8";
for my $i (1..$ndocs) {
my $doc = $pipe->read(1);
my $nb;
{ use bytes; $nb = length($doc) }
my $nc = length($doc);
ok($doc eq $docs[$i-1],
"read document $i of $ndocs ($nb bytes, $nc chars) '$doc'");
}
( run in 1.123 second using v1.01-cache-2.11-cpan-49f99fa48dc )