Net-IMP
view release on metacpan or search on metacpan
lib/Net/IMP/Cascade.pm view on Meta::CPAN
my $part_in; # internal feed into each part
my $imp_callback; # synchronization wrapper around callback for analyzers
my $_imp_callback; # real callback for the analyzers
# pass passable bufs in part starting with ibuf[i]
# returns all bufs which can be passed and strips them from part.ibuf
$fwd_collect = sub {
my ($dir,$pi,$i,$r_passed) = @_;
my $part = $parts[$dir][$pi];
my $ibuf = $part->{ibuf};
$DEBUG && debug(
"fwd_collect[$dir][$pi]: p=$part->{pass} pp=$part->{prepass} "
.$dump_bufs->($ibuf));
my @fwd;
for my $pp (qw(pass prepass)) {
my $pass = $part->{$pp} or next;
for( ;$i<@$ibuf;$i++ ) {
my $buf = $ibuf->[$i];
last if ! $buf->{dtype}; # dummy buf
if ( $pass != IMP_MAXOFFSET and $buf->{start} >= $pass ) {
$DEBUG && debug(
"fwd_collect[$dir][$pi]: reset $pp due to start[$i]($buf->{start})>=$pp($pass)");
$part->{$pp} = 0;
last;
}
die "cannot pass bufs with replace_later"
if ! defined $buf->{data};
if ( $pass == IMP_MAXOFFSET or $buf->{end} <= $pass ) {
# whole buffer can be passed
$DEBUG && debug(
"fwd_collect[$dir][$pi]: pass whole buffer[$i] $buf->{start}..$buf->{end}");
$buf->{rtype} = IMP_PREPASS if $pp eq 'prepass'
and $buf->{rtype} == IMP_PASS;
push @fwd,[ $pi,$dir,$buf ];
# r_passed is set from part_in to track position if data
# are passed. In case of prepass we don't pass data but
# only put them into fwd
next if $r_passed && $pp eq 'prepass';
# track what got passed for part_in
$$r_passed = $buf->{end} if $r_passed;
# remove passed data from ibuf, if ! r_passed also prepassed
# data (called from imp_callback)
shift(@$ibuf);
$i--;
if ( ! @$ibuf ) {
if ( $part->{pass} == IMP_MAXOFFSET || $buf->{eof} ) {
# part done, skip it in the future
push @fwd,[$pi,$dir,undef]; # buf = undef is special
}
# insert dummy
@$ibuf = $new_buf->(
start => $buf->{end},
end => $buf->{end},
gstart => $buf->{gend},
gend => $buf->{gend},
# keep type for streaming data
$buf->{dtype} < 0 ? ( dtype => $buf->{dtype} ):(),
);
last;
}
} else {
# only part of buffer can be passed
# split buffer and re-enter loop, this will foreward the
# first part and keep the later part
$DEBUG && debug(
"fwd_collect[$dir][$pi]: need to split buffer[$i]: $buf->{start}..$pass..$buf->{end}");
$split_buf->($ibuf,$i,$pass - $buf->{start});
redo; # don't increase $i!
}
}
}
return @fwd;
};
$fwd_up = sub {
my ($dir,$buf) = @_;
if ( $buf->{gstart} == $buf->{gend} && ! $buf->{gap}
&& ($buf->{rtype} == IMP_PASS || $buf->{rtype} == IMP_PREPASS)) {
# don't repeat last (pre)pass because of empty buffer
return;
}
return [
$buf->{rtype},
$dir,
$buf->{gend},
($buf->{rtype} == IMP_REPLACE) ? ( $buf->{data} ):()
];
};
$exec_fwd = sub {
my @fwd = @_;
if (@fwd>1) {
$DEBUG && debug("trying to merge\n".join("\n", map {
! defined $_->[0]
? "<cb>"
: "fwd[$_->[1]][$_->[0]] " .
( $_->[2] ? $dump_bufs->([$_->[2]]) : '<pass infinite>')
} @fwd));
# try to compress
my ($lpi,$ldir,$lbuf);
for( my $i=0;$i<@fwd;$i++ ) {
if ( ! defined $fwd[$i][0] || ! defined $fwd[$i][2]) {
$lpi = undef;
next;
}
if ( ! defined $lpi
or $lpi != $fwd[$i][0]
or $ldir != $fwd[$i][1] ) {
($lpi,$ldir,$lbuf) = @{$fwd[$i]};
next;
}
my $buf = $fwd[$i][2];
( run in 0.515 second using v1.01-cache-2.11-cpan-39bf76dae61 )