Net-IMP
view release on metacpan or search on metacpan
lib/Net/IMP/Cascade.pm view on Meta::CPAN
for my $if ( $p->get_interface(@_)) {
# $if should require only return types I support
push @if,$if
if ! grep { ! $rtypes_implemented_myself{$_} } @{ $if->[1] };
}
@if or return; # nothing in common
push @if4part,\@if
}
# find interfaces which are supported by all parts
my @common;
for( my $i=0;$i<@if4part;$i++ ) {
for my $if_i ( @{ $if4part[$i] } ) {
my ($in_i,$out_i) = @$if_i;
# check if $if_i matches at least on interface description in
# all other parts, e.g. $if_i is same or included in $if_k
# - data type/proto: $in_k should be undef or same as $in_i
# - return types: $out_i should include $out_k
for( my $k=0;$k<@if4part;$k++ ) {
next if $i == $k; # same
for my $if_k ( @{ $if4part[$k] } ) {
my ($in_k,$out_k) = @$if_k;
# should be same data type or $in_k undef
next if $in_k and ( ! $in_i or $in_k != $in_i );
# $out_i should include all of $out_k
my %out_k = map { $_ => 1 } @$out_k;
delete @out_k{ @$out_i };
next if %out_k; # some in k are not in i
# junction if i and k
push @common,[ $in_k,$out_i ];
}
}
}
}
# remove duplicates from match
my (@uniq,%m);
for( @common ) {
my $key = ( $_->[0] // '<undef>' )."\0".join("\0",sort @{$_->[1]});
push @uniq,$_ if ! $m{$key}++;
}
return @uniq;
}
sub set_interface {
my Net::IMP::Cascade $factory = shift;
my @if = @_;
my $parts = $factory->{factory_args}{parts};
my @new_parts;
for(my $i=0;$i<@$parts;$i++) {
my $np = $parts->[$i]->set_interface(@if)
or return; # cannot use interface
$np == $parts->[$i] and next; # no change of part
$new_parts[$i] = $np; # got new factory for part
}
return $factory if ! @new_parts; # interface supported by original factory
# some parts changed, create new factory for this cascade
for(my $i=0;$i<@$parts;$i++) {
$new_parts[$i] ||= $parts->[$i]; # copy parts which did not change
}
return ref($factory)->new_factory( parts => \@new_parts );
}
sub new_analyzer {
my ($factory,%args) = @_;
my $p = $factory->{factory_args}{parts};
my $self = $factory->SUPER::new_analyzer(%args);
my @imp = map { $_->new_analyzer(%args) } @$p;
# $parts[$dir][$pi] is the part for direction $dir, analyzer $pi
# if part is optimized away due to IMP_PASS with IMP_MAXOFFSET
# $parts[$dir][$pi] contains instead an integer for adjustments
# from this part
my @parts;
# pause/continue handling
# maintains pause status per part
my @pause;
# to make sure we don't leak due to cross-references
weaken( my $wself = $self );
my $new_buf = sub {
lock_ref_keys( my $buf = {
start => 0, # start of buf relativ to part
end => 0, # end of buf relativ to part
data => '', # data or undef for replace_later
dtype => 0, # data type
rtype => IMP_PASS, # IMP_PASS < IMP_PREPASS < IMP_REPLACE
gap => 0, # size of gap before buf?
gstart => 0, # start of buf relativ to cascade
gend => 0, # end of buf relativ to cascade
eof => 0 # flag if last buf in this direction
});
%$buf = ( %$buf, @_ ) if @_;
return $buf;
};
my $new_part = sub {
lock_ref_keys( my $p = {
ibuf => [ &$new_buf ], # buffers, at least one
pass => 0, # can pass up to ..
prepass => 0, # can prepass up to ..
replace_later => 0, # will replace_later up to ..
adjust => 0, # total local adjustments from forwarded bufs
});
return $p;
};
# initialize @parts
for( my $i=0;$i<@imp;$i++ ) {
$parts[0][$i] = $new_part->(); # client -> server, flow 0>1>2>..
$parts[1][$#imp-$i] = $new_part->(); # server -> client, flow 9>8>7>..
}
my $dump_bufs = sub {
my $bufs = shift;
my @out;
for my $i (@_ ? @_: 0..$#$bufs) {
my $buf = $bufs->[$i];
my $str = ! defined( $buf->{data} ) ? '<undef>' : do {
local $_ = $buf->{data};
$_ = substr($_,0,27).'...' if length($_)>30;
s{([\\\n\r\t[:^print:]])}{ sprintf("\\%03o",ord($1)) }esg;
$_
};
push @out, sprintf("#%02d %d..%d%s%s%s %s %s [%d,%d] '%s'",
$i,
$buf->{start},$buf->{end}, $buf->{eof} ? '$':'',
$buf->{gap} ? " +$buf->{gap}":"",
defined($buf->{data}) ? '':' RL',
$buf->{dtype},$buf->{rtype},
$buf->{gstart},$buf->{gend},
$str
);
}
return join("\n",@out);
};
my $dump_parts = sub {
my $dir = shift;
my $out = '';
for my $pi (@_ ? @_ : 0..$#imp) {
my $part = $parts[$dir][$pi];
if ( ! $part ) {
$out .= "part[$dir][$pi] - skip\n";
next;
}
$out .= sprintf("part[%d][%d] p|pp|rl=%d|%d|%d ibuf:\n",
$dir,$pi,$part->{pass},$part->{prepass},$part->{replace_later});
my $ib = $part->{ibuf};
$out .= $dump_bufs->( $part->{ibuf});
}
lib/Net/IMP/Cascade.pm view on Meta::CPAN
if ( $lbuf->{gend} == $buf->{gend} ) {
# same global end, merge data
$lbuf->{data} .= $buf->{data};
} elsif ( $buf->{data} ne '' or $lbuf->{data} ne '' ) {
# either one not empty, no merge
next;
}
} else {
# unchanged, append
$lbuf->{data} .= $buf->{data};
}
$DEBUG && debug("merge bufs ".$dump_bufs->([$lbuf,$buf]));
$lbuf->{gend} = $buf->{gend};
$lbuf->{end} = $buf->{end};
$lbuf->{eof} = $buf->{eof};
splice(@fwd,$i,1,());
$i--;
next;
} else {
($lpi,$ldir,$lbuf) = @{$fwd[$i]};
next;
}
}
}
while ( my $fwd = shift(@fwd)) {
my $npi = my $pi = shift(@$fwd);
if ( ! defined $npi ) {
# propagate prepared IMP callback
$wself->run_callback($fwd);
next;
}
my ($dir,$buf) = @$fwd;
if ( $buf ) {
my $np;
my $adjust = 0;
while (1) {
$npi += $dir?-1:+1;
last if $npi<0 or $npi>=@imp;
last if ref( $np = $parts[$dir][$npi] );
$adjust += $np;
$DEBUG && debug("skipping pi=$npi");
}
if ( $buf->{eof} ) {
# add pass infinite to fwd to propagate eof
push @fwd,[ $pi,$dir,undef ];
}
if ( $np ) {
# feed into next part
my $nib = $np->{ibuf};
# adjust start,end based on end of npi and gap
$buf->{start} = $nib->[-1]{end} + $buf->{gap} + $adjust;
$buf->{end} = $buf->{start} + length($buf->{data});
$DEBUG && debug(
"fwd_next[$dir][$pi>$npi] ".$dump_bufs->([$buf]));
$part_in->($npi,$dir,$buf);
} else {
# output from cascade
my $cb = $fwd_up->($dir,$buf) or next;
$DEBUG && debug(
"fwd_up[$dir][$pi>>] ".$dump_bufs->([$buf]));
$wself->run_callback($cb);
}
# special - part is done with IMP_PASS IMP_MAXOFFSET
} else {
# skip if we had a pass infinite already
next if ! ref $parts[$dir][$pi];
$parts[$dir][$pi] = $parts[$dir][$pi]->{adjust};
if ( grep { ref($_) } @{ $parts[$dir] } ) {
# we have other unfinished parts, skip only this part
$DEBUG && debug(
"part[$dir][$pi>$npi] will be skipped in future, adjust=$parts[$dir][$pi]");
} else {
# everything can be skipped
$DEBUG && debug(
"part[$dir][$pi>>] all parts will be skipped in future");
# pass rest
$wself->run_callback([ IMP_PASS,$dir,IMP_MAXOFFSET ]);
}
}
}
};
# the data function
# called from sub data on new data and from $process when data are finished
# in on part and should be transferred into the next part
# $pi - index into parts
# $dir - direction (e.g. target part is $parts[$dir][$pi])
# $buf - new buffer from $new_buf->() which might be merged with existing
$part_in = sub {
my ($pi,$dir,$buf) = @_;
$DEBUG && debug( "part_in[$dir][$pi]: ".$dump_bufs->([$buf]));
my $part = $parts[$dir][$pi];
my $ibuf = $part->{ibuf};
my $lbuf = $ibuf->[-1];
my $lend = $lbuf->{end};
# some sanity checks
if(1) {
die "data after eof [$dir][$pi] ".$dump_bufs->([$lbuf,$buf])
if $lbuf->{eof};
if ( $buf->{start} != $lend ) {
if ( $buf->{start} < $lend ) {
die "overlapping data off($buf->{start})<last.end($lend) in part[$dir][$pi]";
} elsif ( ! $buf->{gap} ) {
die "gap should be set because off($buf->{start})>last.end($lend) in part[$dir][$pi]"
}
} elsif ( $buf->{gap} ) {
die "gap specified even if off($buf->{start}) == last.end"
}
$part->{pass} == IMP_MAXOFFSET and die
"pass infinite should have been optimized by removing part[$dir][$pi]";
}
# add data to buf
lib/Net/IMP/Cascade.pm view on Meta::CPAN
} elsif ( $offset > $ibuf_end ) {
# gap between data
$buf{gstart} = $offset;
$buf{gap} = $offset - $ibuf_end;
} else {
# there was no need for giving offset
$buf{gstart} = $ibuf_end;
}
$buf{gend} = $buf{gstart} + length($data);
$buf{start} = $buf{gstart} + $adjust;
$buf{end} = $buf{gend} + $adjust;
$collect_callbacks ||= [];
$part_in->( $pi,$dir, $new_buf->(%buf));
while ( my $cb = shift(@$collect_callbacks)) {
$_imp_callback->(@$cb);
}
$collect_callbacks = undef
};
# wrapper which spools callbacks if within dataf
$imp_callback = sub {
if ( $collect_callbacks ) {
# only spool and execute later
push @$collect_callbacks, [ @_ ];
return;
}
return $_imp_callback->(@_)
};
# setup callbacks
$imp[$_]->set_callback( $imp_callback,$_ ) for (0..$#imp);
# make some closures available within methods
$self->{dataf} = $global_in;
$self->{closef} = sub {
$global_in = $part_in = $imp_callback = $_imp_callback = undef;
@parts = ();
};
return $self;
}
sub data {
my $self = shift;
$self->{dataf}(@_);
}
sub DESTROY {
my $closef = shift->{closef};
$closef->() if $closef;
}
1;
__END__
=head1 NAME
Net::IMP::Cascade - manages cascade of IMP filters
=head1 SYNOPSIS
use Net::IMP::Cascade;
use Net::IMP::Pattern;
use Net::IMP::SessionLog;
...
my $imp = Net::IMP::Cascade->new_factory( parts => [
Net::IMP::Pattern->new_factory..,
Net::IMP::SessionLog->new_factory..,
]);
=head1 DESCRIPTION
C<Net::IMP::Cascade> puts multiple IMP analyzers into a cascade.
Data get analyzed starting with part#0, then part#1... etc for direction 0
(client to server), while for direction 1 (server to client) the data get
analyzed the opposite way, ending in part#0.
The only argument special to C<new_factory> is C<parts>, which is an array
reference of L<Net::IMP> factory objects.
When C<new_analyzer> gets called on the L<Net::IMP::Cascade>> factory,
C<new_analyzer> will be called on the factory objects of the parts too, keeping
all arguments, except C<parts>.
=head1 TODO
Currently IMP_TOSENDER is not supported
=head1 BUGS
The code is way more complex than I originally hoped, even after a nearly
complete rewrite of the innards. So probably the problem itself is complex.
For debugging help see comments on top of code.
=head1 AUTHOR
Steffen Ullrich <sullr@cpan.org>
=head1 COPYRIGHT
Copyright by Steffen Ullrich.
This module is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.
( run in 0.889 second using v1.01-cache-2.11-cpan-39bf76dae61 )