Net-IMP

 view release on metacpan or  search on metacpan

lib/Net/IMP/Cascade.pm  view on Meta::CPAN

    my $part_in;    # internal feed into each part

    my $imp_callback;   # synchronization wrapper around callback for analyzers
    my $_imp_callback;  # real callback for the analyzers

    # pass passable bufs in part starting with ibuf[i]
    # returns all bufs which can be passed and strips them from part.ibuf
    $fwd_collect = sub {
	my ($dir,$pi,$i,$r_passed) = @_;
	my $part = $parts[$dir][$pi];
	my $ibuf = $part->{ibuf};
	$DEBUG && debug(
	    "fwd_collect[$dir][$pi]: p=$part->{pass} pp=$part->{prepass} "
	    .$dump_bufs->($ibuf));
	my @fwd;
	for my $pp (qw(pass prepass)) {
	    my $pass = $part->{$pp} or next;
	    for( ;$i<@$ibuf;$i++ ) {
		my $buf = $ibuf->[$i];
		last if ! $buf->{dtype}; # dummy buf
		if ( $pass != IMP_MAXOFFSET and $buf->{start} >= $pass ) {
		    $DEBUG && debug(
			"fwd_collect[$dir][$pi]: reset $pp due to start[$i]($buf->{start})>=$pp($pass)");
		    $part->{$pp} = 0;
		    last;
		}
		die "cannot pass bufs with replace_later"
		    if ! defined $buf->{data};
		if ( $pass == IMP_MAXOFFSET or $buf->{end} <= $pass ) {
		    # whole buffer can be passed
		    $DEBUG && debug(
			"fwd_collect[$dir][$pi]: pass whole buffer[$i] $buf->{start}..$buf->{end}");
		    $buf->{rtype} = IMP_PREPASS if $pp eq 'prepass'
			and $buf->{rtype} == IMP_PASS;
		    push @fwd,[ $pi,$dir,$buf ];

		    # r_passed is set from part_in to track position if data
		    # are passed. In case of prepass we don't pass data but
		    # only put them into fwd
		    next if $r_passed && $pp eq 'prepass';

		    # track what got passed for part_in
		    $$r_passed = $buf->{end} if $r_passed;

		    # remove passed data from ibuf, if ! r_passed also prepassed
		    # data (called from imp_callback)
		    shift(@$ibuf);
		    $i--;

		    if ( ! @$ibuf ) {
			if ( $part->{pass} == IMP_MAXOFFSET || $buf->{eof} ) {
			    # part done, skip it in the future
			    push @fwd,[$pi,$dir,undef]; # buf = undef is special
			}
			# insert dummy
			@$ibuf = $new_buf->(
			    start  => $buf->{end},
			    end    => $buf->{end},
			    gstart => $buf->{gend},
			    gend   => $buf->{gend},
			    # keep type for streaming data
			    $buf->{dtype} < 0 ? ( dtype => $buf->{dtype} ):(),
			);
			last;
		    }

		} else {
		    # only part of buffer can be passed
		    # split buffer and re-enter loop, this will foreward the
		    # first part and keep the later part
		    $DEBUG && debug(
			"fwd_collect[$dir][$pi]: need to split buffer[$i]: $buf->{start}..$pass..$buf->{end}");
		    $split_buf->($ibuf,$i,$pass - $buf->{start});
		    redo; # don't increase $i!
		}
	    }
	}
	return @fwd;
    };

    $fwd_up = sub {
	my ($dir,$buf) = @_;
	if ( $buf->{gstart} == $buf->{gend} && ! $buf->{gap}
	    && ($buf->{rtype} == IMP_PASS || $buf->{rtype} == IMP_PREPASS)) {
	    # don't repeat last (pre)pass because of empty buffer
	    return;
	}

	return [
	    $buf->{rtype},
	    $dir,
	    $buf->{gend},
	    ($buf->{rtype} == IMP_REPLACE) ? ( $buf->{data} ):()
	];
    };

    $exec_fwd = sub {
	my @fwd = @_;
	if (@fwd>1) {
	    $DEBUG && debug("trying to merge\n".join("\n", map {
		! defined $_->[0]
		    ? "<cb>"
		    : "fwd[$_->[1]][$_->[0]] " .
			( $_->[2] ? $dump_bufs->([$_->[2]]) : '<pass infinite>')
	    } @fwd));
	    # try to compress
	    my ($lpi,$ldir,$lbuf);
	    for( my $i=0;$i<@fwd;$i++ ) {
		if ( ! defined $fwd[$i][0] || ! defined $fwd[$i][2]) {
		    $lpi = undef;
		    next;
		}
		if ( ! defined $lpi
		    or $lpi  != $fwd[$i][0]
		    or $ldir != $fwd[$i][1] ) {
		    ($lpi,$ldir,$lbuf) = @{$fwd[$i]};
		    next;
		}

		my $buf = $fwd[$i][2];



( run in 0.515 second using v1.01-cache-2.11-cpan-39bf76dae61 )