IPDR
view release on metacpan or search on metacpan
lib/IPDR/Collection/Client.pm view on Meta::CPAN
package IPDR::Collection::Client;
use warnings;
use strict;
use IO::Select;
use IO::Socket;
use IO::Socket::SSL qw(debug3);
use Unicode::MapUTF8 qw(to_utf8 from_utf8 utf8_supported_charset);
use Time::localtime;
use Time::HiRes qw( usleep ualarm gettimeofday tv_interval clock_gettime clock_getres );
use Math::BigInt;
$SIG{CHLD}="IGNORE";
=head1 NAME
IPDR::Collection::Client - IPDR Collection Client
=head1 VERSION
Version 0.41
=cut
our $VERSION = '0.41';
=head1 SYNOPSIS
This is a IPDR module primarily written to connect and collect data
using IPDR from a Motorola BSR6400 CMTS. Some work is still required.
It is not very pretty code, nor perhaps the best approach for some of
the code, but it does work and will hopefully save time for other people
attempting to decode the IPDR protocol (even using the specification it
is hard work).
An example configuration for Cisco is
cable metering destination 192.168.1.1 5000 192.168.1.2 4000 1 15 non-secure
The IP addresses and ports specified are those of a collector that
the CMTS will send data to. The Cisco implementation does not provide
all IPDR functionality. Setting up a secure connection is not too difficult
(this release does not support it) from a collector point of view however
the Cisco implementation for secure keys is somewhat painful.
This Cisco module opens a socket on the local server waiting for a connection
from a Cisco router.
An example configuration for Motorola BSR is
ipdr enable
ipdr collector 192.168.1.1 5000 3
ipdr collector 192.168.1.2 4000 2
The IP addresses and ports specicified are those of a collector that will
connect to the CMTS. You can have multiple collectors connected but only
the highest priority collector will receive data, all others will received
keep alives.
The Client module makes a connection to the destination IP/Port specified.
An example on how to use this module is shown below. It is relatively simple
use the different module for Cisco, all others use Client.
#!/usr/local/bin/perl
use strict;
use IPDR::Collection::Client;
my $ipdr_client = new IPDR::Collection::Client (
[
VendorID => 'IPDR Client',
ServerIP => '192.168.1.1',
ServerPort => '5000',
KeepAlive => 60,
Capabilities => 0x01,
DataHandler => \&display_data,
Timeout => 2,
]
);
# We send a connect message to the IPDR server
$ipdr_client->connect();
# If we do not connect stop.
if ( !$ipdr_client->connected )
{
print "Can not connect to destination.\n";
exit(0);
}
# We now send a connect message
$ipdr_client->check_data_available();
print "Error was '".$ipdr_client->get_error()."'\n";
exit(0);
sub display_data
{
my ( $remote_ip ) = shift;
my ( $remote_port ) = shift;
my ( $data ) = shift;
my ( $self ) = shift;
foreach my $sequence ( sort { $a<=>$b } keys %{$data} )
{
print "Sequence is '$sequence'\n";
foreach my $attribute ( keys %{${$data}{$sequence}} )
{
print "Sequence '$sequence' attribute '$attribute'";
print " value '${$data}{$sequence}{$attribute}'\n";
}
}
}
This is the most basic way to access the data. There are multiple scripts in
the examples directory which will allow you to collect and process the IPDR
lib/IPDR/Collection/Client.pm view on Meta::CPAN
if ( !$self->{_GLOBAL}{'RemoteSpeed'} )
{ $self->{_GLOBAL}{'RemoteSpeed'}=10; }
if ( !$self->{_GLOBAL}{'PacketDirectory'} )
{ $self->{_GLOBAL}{'PacketDirectory'}=""; }
if ( !$self->{_GLOBAL}{'XMLDirectory'} )
{ $self->{_GLOBAL}{'XMLDirectory'}=""; }
if ( !$self->{_GLOBAL}{'AckTimeOverride'} )
{ $self->{_GLOBAL}{'AckTimeOverride'}=0; }
if ( !$self->{_GLOBAL}{'AckSequenceOverride'} )
{ $self->{_GLOBAL}{'AckSequenceOverride'}=0; }
if ( !$self->{_GLOBAL}{'PollTime'} )
{ $self->{_GLOBAL}{'PollTime'}=900 }
if ( !$self->{_GLOBAL}{'MACFormat'} )
{ $self->{_GLOBAL}{'MACFormat'}=1 }
if ( !$self->{_GLOBAL}{'LogDirectory'} )
{ $self->{_GLOBAL}{'LogDirectory'}=""; }
if ( !$self->{_GLOBAL}{'LocalAddr'} )
{ $self->{_GLOBAL}{'LocalAddr'}=""; }
if ( !$self->{_GLOBAL}{'LogEnabled'} )
{ $self->{_GLOBAL}{'LogEnabled'}=0; }
if ( !$self->{_GLOBAL}{'BigLittleEndian'} )
{ $self->{_GLOBAL}{'BigLittleEndian'}=0; }
if ( !$self->{_GLOBAL}{'Warning64BitOff'} )
{ $self->{_GLOBAL}{'Warning64BitOff'}=0; }
if ( !$self->{_GLOBAL}{'hexBinarySingle'} )
{ $self->{_GLOBAL}{'hexBinarySingle'}=0; }
if ( !$self->{_GLOBAL}{'InitiatorID'} )
{ $self->{_GLOBAL}{'InitiatorID'}=""; }
if ( !$self->{_GLOBAL}{'OutputVersion'} )
{ $self->{_GLOBAL}{'OutputVersion'}="Basic"; }
$self->{_GLOBAL}{'data_ack'}=0;
$self->{_GLOBAL}{'ERROR'}="" ;
$self->{_GLOBAL}{'data_processing'}=0;
$self->{_GLOBAL}{'template'}= \%template;
$self->{_GLOBAL}{'sessioninfo'}= \%session;
$self->{_GLOBAL}{'current_data'}= \%current_data;
$self->{_GLOBAL}{'complete_decoded_data'} = \%complete_decoded_data;
$self->{_GLOBAL}{'AckTime'}=0;
$self->{_GLOBAL}{'AckSequence'}=0;
$self->{_GLOBAL}{'data_capture_running'}=0;
$self->{_GLOBAL}{'data_capture_running_time'}=0;
$self->{_GLOBAL}{'data_capture_data_count'}=0;
$self->{_GLOBAL}{'data_capture_keep_alive'}=0;
$self->{_GLOBAL}{'Session'}=0;
return $self;
}
sub return_keep_alive
{
my ( $self ) = shift;
return $self->{_GLOBAL}{'KeepAlive'};
}
sub construct_capabilities
{
my ( $self ) = shift;
my ( $required_capabilities ) = shift;
my ($set_capabilities);
# This must be a hash pointer, so that we can then
# generate the value required.
my ( %capabilities ) = (
'STRUCTURE' => 0x01,
'MULTISESSION' => 0x02,
'TEMPLATENEGO' => 0x03,
'REQUESTRESPONSE' => 0x04
);
foreach my $requested ( keys %{$required_capabilities} )
{ $set_capabilities+=$capabilities{$requested}; }
return $set_capabilities;
}
sub create_vendor_id
{
my ($vendor_name) =@_;
my $utf8string = to_utf8({ -string => $vendor_name, -charset => 'ISO-8859-1' });
return $utf8string;
}
sub generate_ipdr_message_header
{
my ( $self ) = shift;
my ( $version ) = shift;
my ( $message_id ) = shift;
my ( $length ) = shift;
# now we assume the length given is that of the payload
# we return the header, with the new length in the header.
my ( $session_id );
if ( $self->{_GLOBAL}{'Session'}>0 )
{
print "IPDR Header session is greater than 0 of '".$self->{_GLOBAL}{'Session'}."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
$session_id = $self->{_GLOBAL}{'Session'};
}
else
{
$session_id=0;
}
print "IPDR Header session id is '".$session_id."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
$message_id = _transpose_message_names($message_id);
# We know the header is 8 long, so we need to add that to
# the length of the payload size, thus making the total
lib/IPDR/Collection/Client.pm view on Meta::CPAN
}
sub decode_message_type
{
my ( $self ) = shift;
$self->{_GLOBAL}{'current_data'}={};
my ( $decode_data ) = $self->{_GLOBAL}{'current_data'};
# First we get the version and type
# version is not important ( but might be later )
# type is the message ID
# session is the current session ID
# flags should always be 0 at the moment
# length is the total message length
my ( $message ) = $self->{_GLOBAL}{'data_received'};
if ( !$message ) { return 0; }
if ( length($message)<8 ) { return 0; }
if ( $self->{_GLOBAL}{'DEBUG'}>0 )
{ ${$decode_data}{'RAWDATARETURNED'}=$message; }
my ( $version, $type, $session, $flags, $length ) = unpack ("CCCCN",$message);
${$decode_data}{'Version'}=$version;
${$decode_data}{'Type'}=_transpose_message_numbers($type);
${$decode_data}{'Session'}=$session;
${$decode_data}{'Flags'}=$flags;
${$decode_data}{'Length'}=$length;
$self->{_GLOBAL}{'data_processing'}=0;
if ( !${$decode_data}{'Type'} )
{
${$decode_data}{'Type'}="";
}
print "Message type in decoder is '".${$decode_data}{'Type'}."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
print "Message length in decode is '".${$decode_data}{'Length'}."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
print "Message length is '".length($message)."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
if ( length($message)<${$decode_data}{'Length'} )
{
print "Data lengths are incorrect skipping data.\n" if $self->{_GLOBAL}{'DEBUG'}>0;
${$decode_data}{'Type'}="DEAD";
return 1;
}
print "Length of data received is '".length( $self->{_GLOBAL}{'data_received'} )."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
$self->{_GLOBAL}{'data_received'} = substr( $self->{_GLOBAL}{'data_received'}, ${$decode_data}{'Length'},
length($self->{_GLOBAL}{'data_received'})-(${$decode_data}{'Length'}) );
print "Length of data after new block is '".length( $self->{_GLOBAL}{'data_received'} )."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
if ( length($message)>${$decode_data}{'Length'} )
{
$self->{_GLOBAL}{'data_processing'}=1;
}
$message=substr($message,8,length($message)-8);
if ( ${$decode_data}{'Type'}=~/^connect_response$/i )
{
my ( $caps, $keepalive ) = unpack ( "SN",$message );
my ( $vendor ) = substr($message,6,length($message)-6);
${$decode_data}{'Capabilities'}=$caps;
${$decode_data}{'KeepAlive'}=$keepalive;
${$decode_data}{'VendorID'}=$vendor;
if ( $self->{_GLOBAL}{'DEBUG'}>0 )
{
print "Connect response decoded.\n";
foreach my $key ( keys %{$decode_data} )
{
next if $key=~/^RAWDATARETURNED$/i;
next if $key=~/^Next_Message$/i;
print "Variable is '$key' value is '${$decode_data}{$key}'\n";
}
}
return 1;
}
if ( ${$decode_data}{'Type'}=~/^template_data$/i )
{
my ( $config, $template_flags,$something ) = unpack ( "SCN",$message );
${$decode_data}{'Template_Config'} = $config;
${$decode_data}{'Template_Flags'} = $template_flags;
${$decode_data}{'Template_PreData'} = $something;
#${$decode_data}{'Template_Data'} = substr($message,7,length($message)-7);
$self->_extract_template_data( substr($message,7,length($message)-7), $self->{_GLOBAL}{'template'} );
#$self->_extract_template_data( substr($message,7,length($message)-7), $decode_data );
if ( $self->{_GLOBAL}{'DEBUG'}>0 )
{
print "Template Data response decoded.\n";
foreach my $key ( keys %{$decode_data} )
{
next if $key=~/^RAWDATARETURNED$/i;
print "Variable is '$key' value is '${$decode_data}{$key}'\n";
}
foreach my $key ( keys %{$self->{_GLOBAL}{'template'}} )
{
print "Key is '$key'\n";
}
}
#$self->template_store( $template_info );
return 1;
}
if ( ${$decode_data}{'Type'}=~/^session_start$/i )
{
my ( $uptime ) = unpack("N",$message); $message = substr($message,4,length($message)-4);
my ( $records ) = decode_64bit_number($message); $message = substr($message,8,length($message)-8);
my ( $gap_records ) = decode_64bit_number($message); $message = substr($message,8,length($message)-8);
my ( $primary, $ack_time, $ack_sequence, $document_id ) = unpack ( "CNNS",$message );
${$decode_data}{'Uptime'} = $uptime;
${$decode_data}{'Records'} = $records;
${$decode_data}{'GapRecords'} = $gap_records;
${$decode_data}{'Primary'} = $primary;
${$decode_data}{'AckTime'} = $ack_time;
${$decode_data}{'AckSequence'} = $ack_sequence;
${$decode_data}{'DocumentID'} = $document_id;
# added timer for acktime
# Added some timer margin so AckTime should not fail
my ( $margin_time ) = $ack_time*0.05;
if ( $margin_time>15 ) { $margin_time=15; }
$ack_time = $ack_time-$margin_time;
if ( !$self->{_GLOBAL}{'AckTime'} || $self->{_GLOBAL}{'AckTime'}==0 || $ack_time<$self->{_GLOBAL}{'AckTime'})
{
$self->{_GLOBAL}{'AckTime'} = $ack_time;
print "Ack time is set to '".$self->{_GLOBAL}{'AckTime'}."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
}
if ( !$self->{_GLOBAL}{'AckSequence'} || $self->{_GLOBAL}{'AckSequence'}==0 || $ack_sequence<$self->{_GLOBAL}{'AckSequence'} )
{
$self->{_GLOBAL}{'AckSequence'} = $ack_sequence;
print "Ack time is set to '".$self->{_GLOBAL}{'AckSequence'}."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
}
if ( $self->{_GLOBAL}{'AckSequenceOverride'}>0 )
{
$self->{_GLOBAL}{'AckSequence'} = $self->{_GLOBAL}{'AckSequenceOverride'};
}
if ( $self->{_GLOBAL}{'AckTimeOverride'} > 0 )
{
$self->{_GLOBAL}{'AckTime'} = $self->{_GLOBAL}{'AckTimeOverride'};
}
if ( $self->{_GLOBAL}{'DEBUG'}>0 )
{
print "Session start decoded.\n";
foreach my $key ( keys %{$decode_data} )
{
next if $key=~/^RAWDATARETURNED$/i;
print "Variable is '$key' value is '${$decode_data}{$key}'\n";
}
}
return 1;
}
if ( ${$decode_data}{'Type'}=~/^get_sessions_response$/i )
{
# There is something odd here, the spec says it should be a short
# the data returned signifies an int ...
my ( $request_id ) = unpack ("S",$message );
print "Request id is '$request_id'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
${$decode_data}{'SESSIONS_RequestID'} = $request_id;
$self->_extract_session_data( substr($message,2,length($message)-2), $self->{_GLOBAL}{'sessioninfo'} );
$self->update_session_parameters();
return 1;
}
if ( ${$decode_data}{'Type'}=~/^data$/i )
{
if ( !$self->{_GLOBAL}{'data_capture_running'} )
{
$self->{_GLOBAL}{'data_capture_running_time'}=time();
$self->{_GLOBAL}{'data_capture_running'}=0;
}
if ( !$self->{_GLBOAL}{'data_capture_keep_alive'} )
{
$self->{_GLBOAL}{'data_capture_keep_alive'}=time();
}
$self->{_GLOBAL}{'data_capture_running'}++;
$self->{_GLOBAL}{'data_capture_data_count'}++;
my ( $template_id, $config_id, $flags ) = unpack("SSC",$message);
$message = substr($message,5,length($message)-5);
my ( $sequence_num ) = decode_64bit_number($message); $message = substr($message,8,length($message)-8);
my ( $record_type );
${$decode_data}{'DATA_TemplateID'}=$template_id;
${$decode_data}{'DATA_ConfigID'}=$config_id;
${$decode_data}{'DATA_Flags'}=$flags;
${$decode_data}{'DATA_Sequence'}=$sequence_num;
${$decode_data}{'DATA_Data'} = $message;
print "Data Epoch is '".time()."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
print "TemplateID is '${$decode_data}{'DATA_TemplateID'}'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
print "ConfigID is '${$decode_data}{'DATA_ConfigID'}'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
print "Flags is '${$decode_data}{'DATA_Flags'}'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
print "Sequence is '${$decode_data}{'DATA_Sequence'}'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
#${$decode_data}{'records'}=_decode_data_record( ${$decode_data}{'DATA_Data'} );
return 1;
}
if ( ${$decode_data}{'Type'}=~/^session_stop$/i )
{
my ( $reason_code ) = unpack ("S", $message ); $message = substr($message,2,length($message)-2);
my ( $reason , $message ) = _extract_utf8_string ( $message );
${$decode_data}{'reasonCode'} = $reason_code;
${$decode_data}{'reason'} = $reason;
if ( $self->{_GLOBAL}{'DEBUG'}>0 )
{
print "SessionStop response decoded.\n";
foreach my $key ( keys %{$decode_data} )
{
next if $key=~/^RAWDATARETURNED$/i;
print "Variable is '$key' value is '${$decode_data}{$key}'\n";
}
}
return 1;
}
if ( ${$decode_data}{'Type'}=~/^error$/i )
{
my ( $time, $error_code ) = unpack ("NS",$message ) ; $message = substr($message,6,length($message)-6);
my ( $reason , $message ) = _extract_utf8_string ( $message );
${$decode_data}{'timeStamp'} = $time;
${$decode_data}{'errorCode'} = $error_code;
${$decode_data}{'reason'} = $reason;
if ( $self->{_GLOBAL}{'DEBUG'}>0 )
{
print "Error response decoded.\n";
foreach my $key ( keys %{$decode_data} )
{
next if $key=~/^RAWDATARETURNED$/i;
print "Variable is '$key' value is '${$decode_data}{$key}'\n";
}
}
return 1;
}
if ( $self->{_GLOBAL}{'DEBUG'}>0 )
lib/IPDR/Collection/Client.pm view on Meta::CPAN
my ( $self ) = shift;
my ( $data ) = shift;
my ( $result ) = $self->send_message( $self->construct_disconnect() );
return $result;
}
sub send_flow_stop
{
my ( $self ) = shift;
my ( $data ) = shift;
my ( $code ) = shift;
my ( $reason ) = shift;
my ( $result ) = $self->send_message( $self->construct_flow_stop($code,$reason) );
return $result;
}
sub max_records_segment
{
my ( $self ) = shift;
$self->{_GLOBAL}{'data_capture_running'}=0;
my $child;
if ($child=fork)
{ } elsif (defined $child)
{
my $xml_transform;
#print "Remote IP is '".$self->{_GLOBAL}{'RemoteIP'}."'\n";
#print "Remote Port is '".$self->{_GLOBAL}{'RemotePort'}."'\n";
if ( ($self->{_GLOBAL}{'RemoteIP'} &&
$self->{_GLOBAL}{'RemotePort'}) || length($self->{_GLOBAL}{'XMLDirectory'})>5 )
{
print "Transformed into XML\n\n$xml_transform\n\n" if $self->{_GLOBAL}{'DEBUG'}>0;
$xml_transform = $self->_transform_into_xml($self->{_GLOBAL}{'complete_decoded_data'});
}
if ( $self->{_GLOBAL}{'RemoteIP'} && $self->{_GLOBAL}{'RemotePort'} )
{
$self->_send_to_clear_destination ($xml_transform);
}
if ( length($self->{_GLOBAL}{'XMLDirectory'})>5 )
{
if ( open (__FILE,">".$self->{_GLOBAL}{'XMLDirectory'}."/".$self->{_GLOBAL}{'ServerIP'} ) )
{
print __FILE $xml_transform;
close __FILE;
}
}
$self->{_GLOBAL}{'DataHandler'}->(
$self->{_GLOBAL}{'ServerIP'},
$self->{_GLOBAL}{'ServerPort'},
$self->{_GLOBAL}{'complete_decoded_data'},
$self
);
waitpid($child,0);
exit(0);
}
$self->{_GLOBAL}{'current_data'}={};
$self->{_GLOBAL}{'complete_decoded_data'}={};
return 1;
}
sub send_get_keepalive
{
my ( $self ) = shift;
my ( $data ) = shift;
if ( $self->get_internal_value('data_ack') )
{
print "Data ACK is set\n" if $self->{_GLOBAL}{'DEBUG'}>0;
$self->send_data_ack(
$self->get_internal_value('dsn_configID'),
$self->get_internal_value('dsn_sequence')
);
# we also need to reset the capture_count
$self->{_GLOBAL}{'data_capture_running'}=0;
$self->{_GLOBAL}{'data_capture_running_time'}=0;
$self->{_GLOBAL}{'data_capture_data_count'}=0;
# here we need to add the remote sending of the extracted
# data. More than likely a fork is required so not to stall
# the collection process. A fork maybe needed anyway as if
# the dataset exceeds, say 10,000 entries ( easily done )
# and being processed locally, any data store *may* not be
# quick enough.
my $child;
if ($child=fork)
{ } elsif (defined $child)
{
my $xml_transform="";
#print "Remote IP is '".$self->{_GLOBAL}{'RemoteIP'}."'\n";
#print "Remote Port is '".$self->{_GLOBAL}{'RemotePort'}."'\n";
if ( ($self->{_GLOBAL}{'RemoteIP'} &&
$self->{_GLOBAL}{'RemotePort'}) || length($self->{_GLOBAL}{'XMLDirectory'})>5 )
{
print "Transformed into XML\n\n$xml_transform\n\n" if $self->{_GLOBAL}{'DEBUG'}>0;
$xml_transform = $self->_transform_into_xml($self->{_GLOBAL}{'complete_decoded_data'});
}
if ( $self->{_GLOBAL}{'RemoteIP'} && $self->{_GLOBAL}{'RemotePort'} )
{
$self->_send_to_clear_destination ($xml_transform);
}
if ( length($self->{_GLOBAL}{'XMLDirectory'})>5 )
{
if ( open (__FILE,">".$self->{_GLOBAL}{'XMLDirectory'}."/".$self->{_GLOBAL}{'ServerIP'} ) )
{
print __FILE $xml_transform;
close __FILE;
}
}
$self->{_GLOBAL}{'DataHandler'}->(
$self->{_GLOBAL}{'ServerIP'},
$self->{_GLOBAL}{'ServerPort'},
$self->{_GLOBAL}{'complete_decoded_data'},
$self
);
waitpid($child,0);
exit(0);
}
$self->{_GLOBAL}{'complete_decoded_data'}={};
$self->set_internal_value('data_ack',0);
$self->{_GLOBAL}{'current_data'}={};
}
my ( $result ) = $self->send_message( $self->construct_get_keepalive() );
return $result;
}
sub send_get_sessions
{
my ( $self ) = shift;
my ( $data ) = shift;
my ( $result ) = $self->send_message( $self->construct_get_sessions() );
return $result;
}
sub send_data_ack
{
my ( $self ) = shift;
my ( $config_id ) = shift;
my ( $seq_number ) = shift;
print "ACK data config_id is '$config_id' sequence number is '$seq_number'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
my ( $result ) = $self->send_message( $self->construct_data_ack($config_id,$seq_number) );
return $result;
}
sub send_final_template_data_ack
{
my ( $self ) = shift;
my ( $data ) = shift;
my ( $result ) = $self->send_message( $self->construct_final_template_data_ack() );
return $result;
}
sub send_flow_start_message
{
my ( $self ) = shift;
my ( $data ) = shift;
my ( $result ) = $self->send_message( $self->construct_flow_start() );
return $result;
}
sub send_connect_message
{
my ( $self ) = shift;
my $result = $self->send_message( $self->construct_connect_message() );
return $result;
}
sub construct_data_ack
{
my ( $self ) = shift;
my ( $config_id ) = shift;
my ( $sequence ) = shift;
print "Constructed id is '$config_id'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
print "Constructed sequence us '$sequence'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
my ( $message ) = pack("S",$config_id);
my ( $sequence_encode ) = encode_64bit_number($sequence);
$message.=$sequence_encode;
if ( $self->{_GLOBAL}{'DEBUG'}>0 )
{
print "Packed message is - " if $self->{_GLOBAL}{'DEBUG'}>0;
for($a=0;$a<length($message);$a++)
{
print ord (substr($message,$a,1))." ";
}
print "\n";
}
my ( $header ) = $self->generate_ipdr_message_header(
2,"DATA_ACK",length($message));
$header.=$message;
return $header;
}
sub construct_final_template_data_ack
{
my ( $self ) = shift;
my ( $header ) = $self->generate_ipdr_message_header(
2,"FINAL_TEMPLATE_DATA_ACK",0);
return $header;
}
sub construct_flow_stop
{
my ( $self ) = shift;
my ( $code ) = shift;
my ( $reason ) = shift;
my ( $message ) = pack("S",$code); $message.=$reason;
my ( $header ) = $self->generate_ipdr_message_header(
2,"FLOW_STOP",length($message));
$header.=$message;
return $header;
}
sub construct_disconnect
{
my ( $self ) = shift;
my ( $header ) = $self->generate_ipdr_message_header(
2,"DISCONNECT",0);
return $header;
}
sub construct_get_sessions
{
my ( $self ) = shift;
my ( $message ) = pack("S",4096);
my ( $header ) = $self->generate_ipdr_message_header(
2,"GET_SESSIONS",length($message));
$header.=$message;
return $header;
}
sub construct_get_keepalive
{
my ( $self ) = shift;
my ( $header ) = $self->generate_ipdr_message_header(
2,"KEEP_ALIVE",0);
return $header;
}
sub construct_flow_start
{
my ( $self ) = shift;
if ( !$self->create_initiator_id() )
{ return 0; }
my ( $header ) = $self->generate_ipdr_message_header(
2,"FLOW_START",0);
return $header;
}
sub construct_connect_message
{
my ( $self ) = shift;
if ( !$self->create_initiator_id() )
{
return 0;
}
# so we know all the below
my ( $message ) = pack("NSNN",
$self->create_initiator_id(),
$self->{_GLOBAL}{'LocalPort'},
$self->{_GLOBAL}{'Capabilities'},
$self->{_GLOBAL}{'KeepAlive'} );
$message.=$self->{_GLOBAL}{'VendorID'};
my ( $header ) = $self->generate_ipdr_message_header(
2,"CONNECT",length($message));
$header.=$message;
return $header;
}
sub disconnect
{
my ( $self ) = shift;
$self->{_GLOBAL}{'Handle'}->close();
return 1;
}
sub connect
{
my ( $self ) = shift;
if ( !$self->test_64_bit() )
{
# if you forgot to run make test, this will clobber
# your run anyway.
if ( $self->{_GLOBAL}{'Warning64BitOff'}!=1 )
{
warn '64Bit support not available using BigInt - Milleage will vary! Turn off with Warning64BitOff => 1.';
}
}
my $lsn;
if ( length($self->{_GLOBAL}{'LocalAddr'})>0 )
{
$lsn = IO::Socket::INET->new
(
PeerAddr => $self->{_GLOBAL}{'ServerIP'},
PeerPort => $self->{_GLOBAL}{'ServerPort'},
LocalAddr => $self->{_GLOBAL}{'LocalAddr'},
ReuseAddr => 1,
Proto => 'tcp',
Timeout => $self->{_GLOBAL}{'Timeout'}
);
}
else
{
$lsn = IO::Socket::INET->new
(
PeerAddr => $self->{_GLOBAL}{'ServerIP'},
PeerPort => $self->{_GLOBAL}{'ServerPort'},
ReuseAddr => 1,
Proto => 'tcp',
Timeout => $self->{_GLOBAL}{'Timeout'}
);
}
if (!$lsn)
{
$self->{_GLOBAL}{'STATUS'}="Failed To Connect";
$self->{_GLOBAL}{'ERROR'}=$!;
return 0;
}
if ( length($self->{_GLOBAL}{'InitiatorID'})>0 )
{
$self->{_GLOBAL}{'LocalIP'}=$self->{_GLOBAL}{'InitiatorID'};
}
else
{
$self->{_GLOBAL}{'LocalIP'}=$lsn->sockhost();
}
$self->{_GLOBAL}{'LocalPort'}=$lsn->sockport();
$self->{_GLOBAL}{'Handle'} = $lsn;
$self->{_GLOBAL}{'Selector'}=new IO::Select( $lsn );
$self->{_GLOBAL}{'STATUS'}="Success Connected";
$self->{_GLOBAL}{'data_ack'}=0;
$self->{_GLOBAL}{'ERROR'}="" ;
$self->{_GLOBAL}{'data_processing'}=0;
$self->{_GLOBAL}{'template'}= {};
$self->{_GLOBAL}{'sessioninfo'}= {};
$self->{_GLOBAL}{'current_data'}= {};
$self->{_GLOBAL}{'complete_decoded_data'} = {};
$self->{_GLOBAL}{'AckTime'}=0;
$self->{_GLOBAL}{'AckSequence'}=0;
$self->{_GLOBAL}{'data_capture_running'}=0;
$self->{_GLOBAL}{'data_capture_running_time'}=0;
$self->{_GLOBAL}{'data_capture_data_count'}=0;
$self->{_GLOBAL}{'data_capture_keep_alive'}=0;
$self->{_GLOBAL}{'Session'}=0;
if ( $self->{_GLOBAL}{'DEBUG'} > 0 )
{
my $test = $self->{_GLOBAL};
foreach my $setting ( keys %{$test} )
{
print "Global setting '$setting' value is '${$test}{$setting}'\n";
}
}
return 1;
}
sub connected
{
my ( $self ) = shift;
return $self->{_GLOBAL}{'Selector'};
}
sub send_message
{
my ( $self ) = shift;
my ( $message ) = shift;
if ( !$self->{_GLOBAL}{'Handle'} ) { return 0; }
my ( $length_sent ) = 0;
eval {
local $SIG{ALRM} = sub { die "alarm\n" };
alarm 5;
$length_sent = syswrite ( $self->{_GLOBAL}{'Handle'}, $message );
alarm 0;
};
if ( $@=~/alarm/i )
{ return 0; }
print "Sending message of size '".length($message)."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
if ( $self->{_GLOBAL}{'DEBUG'}>4 )
{
for($a=0;$a<length($message);$a++)
{
printf("%02x-", ord(substr($message,$a,2)));
}
print "\n";
}
if ( $length_sent==length($message) )
{ return 1; }
return 0;
}
sub create_initiator_id
{
my ( $self ) = @_;
my ( $initiator_id ) = $self->_IpQuadToInt( $self->{_GLOBAL}{'LocalIP'} );
return $initiator_id;
}
sub _IpQuadToInt
lib/IPDR/Collection/Client.pm view on Meta::CPAN
## It seems Q does not work, well not for me
## and this is the quickest way to fix it.
## You STILL NEED 64 BIT SUPPORT!!
#my ( $number ) = @_;
## any bit to 64bit number in.
#my($test1) = $number & 0xFFFFFFFF; $number >>= 32;
#my($test2) = $number & 0xFFFFFFFF;
#my $message = pack("NN",$test2,$test1);
#return $message;
#}
sub check_data_available
{
my ( $self ) = shift;
$self->send_connect_message();
# Check for data from the IPDR server.
while ( $self->check_data_handles && $self->{_GLOBAL}{'ERROR'}!~/not connected/i )
{
$self->get_data_segment();
while ( $self->{_GLOBAL}{'data_processing'}==1 )
{
# If we manage to get some data correctly, decode the message
# during decoding we may also store information, such as template
# and data sequencing, however this is done internally to avoid
# complex code here.
$self->decode_message_type();
my $last_message = $self->return_current_type();
print "Last message was '$last_message'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
if ( $last_message=~/NULL/i || !$last_message )
{
$self->{_GLOBAL}{'data_processing'}=0;
}
# If the message is a connect_response, send a flow_start
if ( $last_message=~/^CONNECT_RESPONSE$/i )
{
$self->log("CONNECT_RESPONSE");
$self->send_get_sessions();
# $self->update_session_parameters();
}
if ( $last_message=~/^GET_SESSIONS_RESPONSE$/i )
{ $self->send_flow_start_message(); }
# If the message is a template data, store the template
# and ack the template
if ( $last_message=~/^TEMPLATE_DATA$/i )
{
$self->log("TEMPLATE_DATA");
$self->send_final_template_data_ack();
}
# If the message is a session_start just send a keep
# alive.
if ( $last_message=~/^SESSION_START$/i )
{
$self->log("SESSION_START");
$self->send_get_keepalive();
}
# If the message is a keep alive, send one back.
# This function does a little more, but has been
# made a wrapper to keep the code clean.
if ( $self->return_current_type()=~/^KEEP_ALIVE$/i )
{
$self->log("KEEP_ALIVE");
$self->send_get_keepalive();
}
# If the message is a data message, process it.
# This also sends one keepalive upon receipt
# of the first data segment, so keeping to the
# specification and allowing DSN generation.
if ( $self->return_current_type()=~/^DATA$/i )
{
$self->decode_data( );
}
# We need to make sure we decoded the last message
# before checking if we can throw it out.
if ( $self->{_GLOBAL}{'data_capture_running'}>=$self->{_GLOBAL}{'MaxRecords'}
&& $self->{_GLOBAL}{'MaxRecords'}>0)
{
print "Max records reached was '".$self->{_GLOBAL}{'data_capture_running'}."'\n";
print "Max records limit was '".$self->{_GLOBAL}{'MaxRecords'}."\n\n" if $self->{_GLOBAL}{'DEBUG'}>0;
$self->{_GLOBAL}{'data_capture_running'}=0;
$self->max_records_segment();
}
# so if you are receiving more data than a keepalive you may need
# to send a data_ack
if ( ((time()-$self->{_GLOBAL}{'data_capture_running_time'})
> $self->{_GLOBAL}{'AckTime'})
&& $self->return_current_type()=~/^DATA$/i )
{
if ( defined( $self->get_internal_value('dsn_sequence')) )
{
$self->{_GLOBAL}{'data_capture_running_time'}=time();
$self->{_GLOBAL}{'data_capture_data_count'}=0;
print "Sending AckTime data ack.\n\n" if $self->{_GLOBAL}{'DEBUG'}>0;
$self->send_data_ack(
$self->get_internal_value('dsn_configID'),
$self->get_internal_value('dsn_sequence')
);
}
}
if ( ($self->{_GLOBAL}{'data_capture_data_count'}
>= $self->{_GLOBAL}{'AckSequence'})
&& $self->return_current_type()=~/^DATA$/i )
{
if ( defined( $self->get_internal_value('dsn_sequence')) )
{
$self->{_GLOBAL}{'data_capture_data_count'}=0;
print "Sending AckSequence data ack.\n\n" if $self->{_GLOBAL}{'DEBUG'}>0;
$self->send_data_ack(
$self->get_internal_value('dsn_configID'),
$self->get_internal_value('dsn_sequence')
);
}
}
if ( (time()-$self->{_GLOBAL}{'data_capture_keep_alive'})
> $self->{_GLOBAL}{'KeepAlive'} )
{
$self->send_message( $self->construct_get_keepalive() );
$self->{_GLOBAL}{'data_capture_keep_alive'}=time();
}
# If the message is a session_stop, we should probably
# send a disconnect, but we dont as yet.
# with session stop you need to send a keepalive, as
# session stop is not always a disconnect.
if ( $self->return_current_type()=~/^SESSION_STOP$/i )
{
$self->log("SESSION_STOP");
$self->send_get_keepalive();
#$ipdr_client->{_GLOBAL}{'Selector'}->remove( $ipdr_client->{_GLOBAL}{'Handle'} );
}
# If the message is an error message, stop, something
# went wrong somewhere.
if ( $self->return_current_type()=~/^ERROR$/i )
{
$self->log("ERROR");
print "Disconnect and closed TCP.\n" if $self->{_GLOBAL}{'DEBUG'}>0;
return 0;
}
}
}
print "Disconnect and closed TCP.\n" if $self->{_GLOBAL}{'DEBUG'}>0;
return 1;
}
# ***************************************************************
sub log
{
my ( $self ) = shift;
my ( $message ) = shift;
my ( $decode_data ) = $self->{_GLOBAL}{'current_data'};
my ( $time ) = time();
return 1;
}
sub check_data_handles
{
my ( $self ) = shift;
my ( @handle ) = $self->{_GLOBAL}{'Selector'}->can_read( $self->{_GLOBAL}{'KeepAlive'} );
if ( !@handle ) { $self->{_GLOBAL}{'ERROR'}="Not Connected"; }
$self->{_GLOBAL}{'ready_handles'}=\@handle;
}
sub get_data_segment
{
my ( $self ) = shift;
my ( $header );
my ( $buffer ) = "";
my ( $dataset ) ;
#$self->{_GLOBAL}{'data_received'} = "";
my $link;
my ( $version, $type, $session, $flags, $length );
my ( $handles ) = $self->{_GLOBAL}{'ready_handles'};
foreach my $handle ( @{$handles} )
{
eval {
local $SIG{ALRM} = sub { die "alarm\n" };
alarm 5;
$link = sysread($handle,$buffer,1024);
alarm 0;
lib/IPDR/Collection/Client.pm view on Meta::CPAN
}
print "Length in buffer is '".length($self->{_GLOBAL}{'data_received'})."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
$self->{_GLOBAL}{'data_processing'}=1;
}
sub ReturnPollTime
{
my ( $self ) = shift;
return $self->{_GLOBAL}{'PollTime'};
}
sub get_error
{
my ( $self ) = shift;
return $self->{_GLOBAL}{'ERROR'};
}
sub get_internal_value
{
my ( $self ) = shift;
my ( $attribute ) = shift;
return $self->{_GLOBAL}{$attribute};
}
sub set_internal_value
{
my ( $self ) = shift;
my ( $attrib ) = shift;
my ( $value ) = shift;
$self->{_GLOBAL}{$attrib}=$value;
}
sub decode_data
{
my ( $self ) = shift;
my ( %template_params ) = template_value_definitions();
my ( $resulting_value ) = "";
my ( $exported_data ) = $self->{_GLOBAL}{'complete_decoded_data'};
my ( $record ) = $self->{_GLOBAL}{'current_data'};
my ( $template ) = $self->{_GLOBAL}{'template'};
my ( $template_id ) = ${$record}{'DATA_TemplateID'};
my ( $data ) = ${$record}{'DATA_Data'};
if ( length( $self->{_GLOBAL}{'PacketDirectory'} ) > 0 )
{
my $location = $self->{_GLOBAL}{'PacketDirectory'};
my $epoch = time();
my $rand = int(rand(100000));
open (__PACKET_DATA,">$location/packet_$epoch\_$rand");
print __PACKET_DATA $data;
close (__PACKET_DATA);
}
$self->set_internal_value('dsn_sequence',${$record}{'DATA_Sequence'} );
$self->set_internal_value('dsn_configID',${$record}{'DATA_ConfigID'} );
if ( !$self->get_internal_value('data_ack') )
{
$self->set_internal_value('data_ack',1);
$self->send_message( $self->construct_get_keepalive() );
}
my ( $int_or_dir ) = unpack("N",$data);
# If you can figure out the first line, better person than I
# All i figured out was 'possibly' direction, but this
# might also be interface number so it has not been added
$data = substr($data,4,length($data)-4);
#${$template}{'Templates'}{$template_id}{'fields'}{$a}{'name'}=$field_name;
foreach my $variable ( sort {$a<=> $b } keys %{${$template}{'Templates'}{$template_id}{'fields'}} )
{
print "Type id is '${$template}{'Templates'}{$template_id}{'fields'}{$variable}{'typeID'}' field is '$variable'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
my $type = $template_params{ ${$template}{'Templates'}{$template_id}{'fields'}{$variable}{'typeID'} };
my $template_type = ${$template}{'Templates'}{$template_id}{'fields'}{$variable}{'name'};
print "Type name is '".$type."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
print "Template variable name is '".$template_type."'\n" if $self->{_GLOBAL}{'DEBUG'}>0;
if ( $type=~/^string$/i )
{ ( $resulting_value, $data ) = _extract_utf8_string ( $data ); }
if ( $type=~/^network_ip$/i )
{ ( $resulting_value, $data ) = _extract_ip_string ( $data ); }
if ( $type=~/^network_int_u$/i )
{ ( $resulting_value, $data ) = _extract_int_u ( $data ); }
if ( $type=~/^network_int$/i )
{ ( $resulting_value, $data ) = $self->_extract_int ( $data ); }
if ( $type=~/^unknown_/i )
{ ( $data ) = _extract_unknown ( $data, $type ); }
if ( $type=~/^mac$/i )
{ ( $resulting_value, $data ) = _extract_mac ( $self, $data ); }
if ( $type=~/^long$/i )
{ ( $resulting_value, $data ) = _extract_long ( $data ); }
if ( $type=~/^long_u$/i )
{ ( $resulting_value, $data ) = _extract_long_u ( $data ); }
if ( $type=~/^float$/i )
{ ( $resulting_value, $data ) = _extract_float ( $data ); }
if ( $type=~/^double$/i )
{ ( $resulting_value, $data ) = _extract_double ( $data ); }
if ( $type=~/^boolean$/i )
{ ( $resulting_value, $data ) = _extract_boolean ( $data ); }
if ( $type=~/^byte$/i )
{ ( $resulting_value, $data ) = _extract_char ( $data ); }
if ( $type=~/^byte_u$/i )
{ ( $resulting_value, $data ) = _extract_char_u ( $data ); }
if ( $type=~/^short$/i )
{ ( $resulting_value, $data ) = _extract_short ( $data ); }
if ( $type=~/^short_u$/i )
{ ( $resulting_value, $data ) = _extract_short_u ( $data ); }
( run in 2.137 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )