Alvis-Pipeline

 view release on metacpan or  search on metacpan

lib/Alvis/Pipeline/Read.pm  view on Meta::CPAN

	return undef if !$block;
	$this->log(2, "no document yet, sleeping");
	sleep $this->option("sleep");
	($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
    }

    $lastread++;
    my $filename = "$dir/$lastread";
    my $f2 = new IO::File("<$filename")
	or die "can't read file '$filename': $!";
    binmode $f2, ":utf8";
    my $doc = join("", <$f2>);
    $f2->close();
    unlink($filename);

    $this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
    return $doc;
}


sub close {

lib/Alvis/Pipeline/Read.pm  view on Meta::CPAN

					LocalPort => $this->{port},
					Proto => "tcp",
					ReuseAddr => 1)
	or die("can't listen on port '" . $this->{port} . "': $!");

    while (1) {
	$this->log(1, "accepting connection");
	$this->{socket} = $listener->accept()
	    or die "can't accept connection: $!";

	binmode $this->{socket}, ":utf8";
	$this->log(1, "started background process, pid $$");
	while (1) {
	    my $doc = $this->_read();
	    last if !defined $doc;
	    $this->_store_file($doc);
	}
    }
}


lib/Alvis/Pipeline/Read.pm  view on Meta::CPAN

    my($doc) = @_;

    $this->log(2, "child writing to spooldir");
    my $dir = $this->{spooldir};
    my($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);

    $lastwrite++;
    my $filename = "$dir/$lastwrite";
    my $f2 = new IO::File(">$filename")
	or die "can't create new file '$filename': $!";
    binmode $f2, ":utf8";
    $f2->print($doc) or die "can't write '$filename': $!";
    $f2->close() or die "can't close '$filename': $!";

    $this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
}


# A sequence file called "seq" is maintained in the spool directory,
# and is always locked when read and rewritten.  The invariant it
# preserves between lock-read-write operations is that it contains two

lib/Alvis/Pipeline/Write.pm  view on Meta::CPAN

    $this->{port} = delete $opts{port}
	or die "new($class) with no port";

    $this->_setopts(%opts);
    $this->{socket} = new IO::Socket::INET(PeerAddr => $this->{host},
					   PeerPort => $this->{port},
					   Proto => "tcp")
	or die("can't connect to '" . $this->{host} . "', ",
	       "port '" . $this->{port} . "': $!");

    binmode $this->{socket}, ":utf8";
    return $this;
}


# Protocol.  Each packet consists of the following:
# 1. Magic string "Alvis::Pipeline\n"
# 2. Decimal-rendered protocol version-number [initially 1] followed by "\n"
# 3. Decimal-rendered integer byte-count, followed by "\n"
# 4. Binary object of length specified in #2.
# 5. Magic string "--end--\n";

t/2-unicode.t  view on Meta::CPAN

}

ok(1, "reading parent $$ spawned writing child with pid=$pid");

# Parent: read documents generated by child
my $pipe = new Alvis::Pipeline::Read(spooldir => $spooldir, port => $port,
				     loglevel => 0, sleep => 1)
    or die "can't make read-pipe with spooldir='$spooldir', port='$port': $!";

# It's very, very stupid that we have to do this.
binmode Test::Builder::new()->output(), ":utf8";

for my $i (1..$ndocs) {
    my $doc = $pipe->read(1);
    my $nb;
    { use bytes; $nb = length($doc) }
    my $nc = length($doc);
    ok($doc eq $docs[$i-1],
       "read document $i of $ndocs ($nb bytes, $nc chars) '$doc'");
}



( run in 1.123 second using v1.01-cache-2.11-cpan-49f99fa48dc )