Net-IMP

 view release on metacpan or  search on metacpan

lib/Net/IMP/Cascade.pm  view on Meta::CPAN

	for my $if ( $p->get_interface(@_)) {
	    # $if should require only return types I support
	    push @if,$if
		if ! grep { ! $rtypes_implemented_myself{$_} } @{ $if->[1] };
	}
	@if or return; # nothing in common
	push @if4part,\@if
    }

    # find interfaces which are supported by all parts
    my @common;
    for( my $i=0;$i<@if4part;$i++ ) {
	for my $if_i ( @{ $if4part[$i] } ) {
	    my ($in_i,$out_i) = @$if_i;
	    # check if $if_i matches at least on interface description in
	    # all other parts, e.g. $if_i is same or included in $if_k
	    # - data type/proto: $in_k should be undef or same as $in_i
	    # - return types: $out_i should include $out_k
	    for( my $k=0;$k<@if4part;$k++ ) {
		next if $i == $k; # same
		for my $if_k ( @{  $if4part[$k] } ) {
		    my ($in_k,$out_k) = @$if_k;
		    # should be same data type or $in_k undef
		    next if $in_k and ( ! $in_i or $in_k != $in_i );
		    # $out_i should include all of $out_k
		    my %out_k = map { $_ => 1 } @$out_k;
		    delete @out_k{ @$out_i };
		    next if %out_k; # some in k are not in i

		    # junction if i and k
		    push @common,[ $in_k,$out_i ];
		}
	    }
	}
    }

    # remove duplicates from match
    my (@uniq,%m);
    for( @common ) {
	my $key = ( $_->[0] // '<undef>' )."\0".join("\0",sort @{$_->[1]});
	push @uniq,$_ if ! $m{$key}++;
    }
    return @uniq;
}

sub set_interface {
    my Net::IMP::Cascade $factory = shift;
    my @if = @_;
    my $parts = $factory->{factory_args}{parts};

    my @new_parts;
    for(my $i=0;$i<@$parts;$i++) {
	my $np = $parts->[$i]->set_interface(@if)
	    or return; # cannot use interface
	$np == $parts->[$i] and next; # no change of part
	$new_parts[$i] = $np; # got new factory for part
    }

    return $factory if ! @new_parts; # interface supported by original factory

    # some parts changed, create new factory for this cascade
    for(my $i=0;$i<@$parts;$i++) {
	$new_parts[$i] ||= $parts->[$i]; # copy parts which did not change
    }

    return ref($factory)->new_factory( parts => \@new_parts );
}

sub new_analyzer {
    my ($factory,%args) = @_;

    my $p     = $factory->{factory_args}{parts};
    my $self  = $factory->SUPER::new_analyzer(%args);
    my @imp = map { $_->new_analyzer(%args) } @$p;

    # $parts[$dir][$pi] is the part for direction $dir, analyzer $pi
    # if part is optimized away due to IMP_PASS with IMP_MAXOFFSET
    # $parts[$dir][$pi] contains instead an integer for adjustments
    # from this part
    my @parts;

    # pause/continue handling
    # maintains pause status per part
    my @pause;

    # to make sure we don't leak due to cross-references
    weaken( my $wself = $self );

    my $new_buf = sub {
	lock_ref_keys( my $buf = {
	    start   => 0,  # start of buf relativ to part
	    end     => 0,  # end of buf relativ to part
	    data    => '', # data or undef for replace_later
	    dtype   => 0,  # data type
	    rtype   => IMP_PASS,  # IMP_PASS < IMP_PREPASS < IMP_REPLACE
	    gap     => 0,  # size of gap before buf?
	    gstart  => 0,  # start of buf relativ to cascade
	    gend    => 0,  # end of buf relativ to cascade
	    eof     => 0   # flag if last buf in this direction
	});
	%$buf = ( %$buf, @_ ) if @_;
	return $buf;
    };

    my $new_part = sub {
	lock_ref_keys( my $p = {
	    ibuf => [ &$new_buf ], # buffers, at least one
	    pass          => 0,    # can pass up to ..
	    prepass       => 0,    # can prepass up to ..
	    replace_later => 0,    # will replace_later up to ..
	    adjust        => 0,    # total local adjustments from forwarded bufs
	});
	return $p;
    };

    # initialize @parts
    for( my $i=0;$i<@imp;$i++ ) {
	$parts[0][$i] = $new_part->();       # client -> server, flow 0>1>2>..
	$parts[1][$#imp-$i] = $new_part->(); # server -> client, flow 9>8>7>..
    }

    my $dump_bufs = sub {
	my $bufs = shift;
	my @out;
	for my $i (@_ ? @_: 0..$#$bufs) {
	    my $buf = $bufs->[$i];
	    my $str = ! defined( $buf->{data} ) ? '<undef>' : do {
		local $_ = $buf->{data};
		$_ = substr($_,0,27).'...' if length($_)>30;
		s{([\\\n\r\t[:^print:]])}{ sprintf("\\%03o",ord($1)) }esg;
		$_
	    };
	    push @out, sprintf("#%02d %d..%d%s%s%s %s %s [%d,%d] '%s'",
		$i,
		$buf->{start},$buf->{end}, $buf->{eof} ? '$':'',
		$buf->{gap} ? " +$buf->{gap}":"",
		defined($buf->{data}) ? '':' RL',
		$buf->{dtype},$buf->{rtype},
		$buf->{gstart},$buf->{gend},
		$str
	    );
	}
	return join("\n",@out);
    };
    my $dump_parts = sub {
	my $dir = shift;
	my $out = '';
	for my $pi (@_ ? @_ : 0..$#imp) {
	    my $part = $parts[$dir][$pi];
	    if ( ! $part ) {
		$out .= "part[$dir][$pi] - skip\n";
		next;
	    }
	    $out .= sprintf("part[%d][%d] p|pp|rl=%d|%d|%d ibuf:\n",
		$dir,$pi,$part->{pass},$part->{prepass},$part->{replace_later});
	    my $ib = $part->{ibuf};
	    $out .= $dump_bufs->( $part->{ibuf});
	}

lib/Net/IMP/Cascade.pm  view on Meta::CPAN

			if ( $lbuf->{gend} == $buf->{gend} ) {
			    # same global end, merge data
			    $lbuf->{data} .= $buf->{data};
			} elsif ( $buf->{data} ne '' or $lbuf->{data} ne '' ) {
			    # either one not empty, no merge
			    next;
			}
		    } else {
			# unchanged, append
			$lbuf->{data} .= $buf->{data};
		    }
		    $DEBUG && debug("merge bufs ".$dump_bufs->([$lbuf,$buf]));
		    $lbuf->{gend} = $buf->{gend};
		    $lbuf->{end}  = $buf->{end};
		    $lbuf->{eof}  = $buf->{eof};
		    splice(@fwd,$i,1,());
		    $i--;
		    next;

		} else {
		    ($lpi,$ldir,$lbuf) = @{$fwd[$i]};
		    next;
		}
	    }
	}
	while ( my $fwd = shift(@fwd)) {
	    my $npi = my $pi = shift(@$fwd);
	    if ( ! defined $npi ) {
		# propagate prepared IMP callback
		$wself->run_callback($fwd);
		next;
	    }

	    my ($dir,$buf) = @$fwd;

	    if ( $buf ) {
		my $np;
		my $adjust = 0;
		while (1) {
		    $npi += $dir?-1:+1;
		    last if $npi<0 or $npi>=@imp;
		    last if ref( $np = $parts[$dir][$npi] );
		    $adjust += $np;
		    $DEBUG && debug("skipping pi=$npi");
		}

		if ( $buf->{eof} ) {
		    # add pass infinite to fwd to propagate eof
		    push @fwd,[ $pi,$dir,undef ];
		}
		if ( $np ) {
		    # feed into next part
		    my $nib = $np->{ibuf};
		    # adjust start,end based on end of npi and gap
		    $buf->{start} = $nib->[-1]{end} + $buf->{gap} + $adjust;
		    $buf->{end} = $buf->{start} + length($buf->{data});
		    $DEBUG && debug(
			"fwd_next[$dir][$pi>$npi] ".$dump_bufs->([$buf]));
		    $part_in->($npi,$dir,$buf);
		} else {
		    # output from cascade
		    my $cb = $fwd_up->($dir,$buf) or next;
		    $DEBUG && debug(
			"fwd_up[$dir][$pi>>] ".$dump_bufs->([$buf]));
		    $wself->run_callback($cb);
		}

	    # special - part is done with IMP_PASS IMP_MAXOFFSET
	    } else {
		# skip if we had a pass infinite already
		next if ! ref $parts[$dir][$pi];

		$parts[$dir][$pi] = $parts[$dir][$pi]->{adjust};
		if ( grep { ref($_) } @{ $parts[$dir] } ) {
		    # we have other unfinished parts, skip only this part
		    $DEBUG && debug(
			"part[$dir][$pi>$npi] will be skipped in future, adjust=$parts[$dir][$pi]");
		} else {
		    # everything can be skipped
		    $DEBUG && debug(
			"part[$dir][$pi>>] all parts will be skipped in future");
		    # pass rest
		    $wself->run_callback([ IMP_PASS,$dir,IMP_MAXOFFSET ]);
		}
	    }
	}
    };

    # the data function
    # called from sub data on new data and from $process when data are finished
    # in on part and should be transferred into the next part
    #  $pi    - index into parts
    #  $dir   - direction (e.g. target part is $parts[$dir][$pi])
    #  $buf   - new buffer from $new_buf->() which might be merged with existing
    $part_in = sub {
	my ($pi,$dir,$buf) = @_;
	$DEBUG && debug( "part_in[$dir][$pi]: ".$dump_bufs->([$buf]));

	my $part = $parts[$dir][$pi];
	my $ibuf = $part->{ibuf};
	my $lbuf = $ibuf->[-1];
	my $lend = $lbuf->{end};

	# some sanity checks
	if(1) {
	    die "data after eof [$dir][$pi] ".$dump_bufs->([$lbuf,$buf])
		if $lbuf->{eof};
	    if ( $buf->{start} != $lend ) {
		if ( $buf->{start} < $lend ) {
		    die "overlapping data off($buf->{start})<last.end($lend) in part[$dir][$pi]";
		} elsif ( ! $buf->{gap} ) {
		    die "gap should be set because off($buf->{start})>last.end($lend) in part[$dir][$pi]"
		}
	    } elsif ( $buf->{gap} ) {
		die "gap specified even if off($buf->{start}) == last.end"
	    }
	    $part->{pass} == IMP_MAXOFFSET and die
		"pass infinite should have been optimized by removing part[$dir][$pi]";
	}

	# add data to buf

lib/Net/IMP/Cascade.pm  view on Meta::CPAN

	} elsif ( $offset > $ibuf_end ) {
	    # gap between data
	    $buf{gstart} = $offset;
	    $buf{gap} = $offset - $ibuf_end;
	} else {
	    # there was no need for giving offset
	    $buf{gstart} = $ibuf_end;
	}
	$buf{gend}  = $buf{gstart} + length($data);
	$buf{start} = $buf{gstart} + $adjust;
	$buf{end}   = $buf{gend} + $adjust;

	$collect_callbacks ||= [];
	$part_in->( $pi,$dir, $new_buf->(%buf));

	while ( my $cb = shift(@$collect_callbacks)) {
	    $_imp_callback->(@$cb);
	}
	$collect_callbacks = undef
    };

    # wrapper which spools callbacks if within dataf
    $imp_callback = sub {
	if ( $collect_callbacks ) {
	    # only spool and execute later
	    push @$collect_callbacks, [ @_ ];
	    return;
	}
	return $_imp_callback->(@_)
    };

    # setup callbacks
    $imp[$_]->set_callback( $imp_callback,$_ ) for (0..$#imp);

    # make some closures available within methods
    $self->{dataf} = $global_in;
    $self->{closef} = sub {
	$global_in = $part_in = $imp_callback = $_imp_callback = undef;
	@parts = ();
    };
    return $self;
}

sub data {
    my $self = shift;
    $self->{dataf}(@_);
}

sub DESTROY {
    my $closef = shift->{closef};
    $closef->() if $closef;
}


1;

__END__

=head1 NAME

Net::IMP::Cascade - manages cascade of IMP filters

=head1 SYNOPSIS

    use Net::IMP::Cascade;
    use Net::IMP::Pattern;
    use Net::IMP::SessionLog;
    ...
    my $imp = Net::IMP::Cascade->new_factory( parts => [
	Net::IMP::Pattern->new_factory..,
	Net::IMP::SessionLog->new_factory..,
    ]);

=head1 DESCRIPTION

C<Net::IMP::Cascade> puts multiple IMP analyzers into a cascade.
Data get analyzed starting with part#0, then part#1... etc for direction 0
(client to server), while for direction 1 (server to client) the data get
analyzed the opposite way, ending in part#0.

The only argument special to C<new_factory> is C<parts>, which is an array
reference of L<Net::IMP> factory objects.
When C<new_analyzer> gets called on the L<Net::IMP::Cascade>> factory,
C<new_analyzer> will be called on the factory objects of the parts too, keeping
all arguments, except C<parts>.

=head1 TODO

Currently IMP_TOSENDER is not supported

=head1 BUGS

The code is way more complex than I originally hoped, even after a nearly
complete rewrite of the innards. So probably the problem itself is complex.
For debugging help see comments on top of code.

=head1 AUTHOR

Steffen Ullrich <sullr@cpan.org>

=head1 COPYRIGHT

Copyright by Steffen Ullrich.

This module is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.



( run in 0.889 second using v1.01-cache-2.11-cpan-39bf76dae61 )