ApacheLog-Compressor
view release on metacpan or search on metacpan
lib/ApacheLog/Compressor.pm view on Meta::CPAN
Returns the default format used for parsing log lines.
This is an arrayref containing key => value pairs, see L</FORMAT SPECIFICATION> for
more details.
=cut
sub default_format {
my $self = shift;
return [
type => { type => 'C1' },
vhost => { id => 0x03, type => 'n1', regex => qr{([^ ]+)} },
duration => { type => 'N1', regex => qr{(\d+)} },
ip => {
type => 'N1',
regex => qr{(\S+)\s+\S+},
process_in => sub {
my ($self, $data) = @_;
$data->{ip} = unpack('N1', inet_aton($data->{ip}));
},
process_out => sub {
my ($self, $data) = @_;
$data->{ip} = inet_ntoa(pack('N1', $data->{ip}));
}
},
user => { id => 0x04, type => 'n1', regex => qr{(\S+)} },
timestamp => {
id => 0x02,
regex => qr{\[([^\]]+)\]},
process_in => sub {
my ($self, $data) = @_;
$data->{timestamp} = str2time($data->{timestamp});
}
},
method => {
type => 'C1',
regex => qr{"([^ ]+)},
process_in => sub {
my ($self, $data) = @_;
$data->{method} = $HTTP_METHOD{$data->{method}};
},
process_out => sub {
my ($self, $data) = @_;
$data->{method} = $HTTP_METHOD_LIST[$data->{method}];
}
},
url => {
id => 0x07,
type => 'N1',
regex => qr{([^ ]+)},
process_in => sub {
my ($self, $data) = @_;
return $data->{url} = '' unless defined $data->{url};
($data->{url}, $data->{query}) = split /\?/, $data->{url}, 2;
# Dodgy UTF8 handling, currently disabled - no guarantee that URLs are UTF8 anyway
# if(length $data->{url}) {
# URI::Escape's uri_unescape but in byte mode so we can check utf8 decoding manually
# my $txt = $data->{url};
# $txt = encode_utf8($txt); # turn OFF utf8
# $txt =~ s/%([0-9A-Fa-f]{2})/pack("C1", hex($1))/ge; # expand
# $txt = decode_utf8($txt); # turn ON utf8 where applicable
# $data->{url} = $txt;
# }
# if(defined $data->{query} && length $data->{query}) {
# URI::Escape's uri_unescape but in byte mode so we can check utf8 decoding manually
# (my $txt = $data->{query}) =~ s/%([0-9A-Fa-f]{2})/pack("C1", hex($1))/eg;
# $data->{query} = decode_utf8($txt, FB_DEFAULT);
# }
}
},
query => { id => 0x0A, type => 'N1', },
ver => {
type => 'C1',
regex => qr{HTTP/(\d+\.\d+)"},
process_in => sub {
my ($self, $data) = @_;
$data->{ver} = ($data->{ver} eq '1.0' ? 0 : 1);
}, process_out => sub {
my ($self, $data) = @_;
$data->{ver} = ($data->{ver} ? '1.1' : '1.0');
}
},
result => { type => 'n1', regex => qr{(\d+)} },
size => {
type => 'N1',
regex => qr{(\d+|-)},
process_in => sub {
my ($self, $data) = @_;
$data->{size} = ($data->{size} eq '-') ? -1 : $data->{size};
}, process_out => sub {
my ($self, $data) = @_;
$data->{size} = ($data->{size} == 4294967295) ? '-' : $data->{size};
}
},
refer => { id => 0x06, type => 'n1', regex => qr{"([^"]*)"} },
useragent => { id => 0x05, type => 'n1', regex => qr{"([^"]*)"} },
];
}
=head2 update_mapping
Refresh the mapping from format keys and internal definitions.
=cut
sub update_mapping {
my $self = shift;
my %fmt = @{ $self->{format} };
$self->{format_hash} = \%fmt;
$self->{packet_handler} = {
0x00 => 'log',
0x01 => 'server',
0x80 => 'reset',
map { $fmt{$_}->{id} => $_ } grep { exists $fmt{$_}->{id} } keys %fmt
};
# Extract information from format strings so that we know how big the packets are and where the data goes
my @fmt = @{$self->{format}};
my $pack_str = '';
my $log_len = 0;
lib/ApacheLog/Compressor.pm view on Meta::CPAN
my $v = shift(@fmt);
$v = { type => $v } unless ref $v;
if(exists $v->{regex}) {
push @regex, $v->{regex};
push @{$self->{log_regex_keys}}, $k;
}
if(exists $v->{process_in}) {
push @{$self->{log_process}}, $v->{process_in};
}
if(exists $v->{process_out}) {
push @{$self->{log_process_out}}, $v->{process_out};
}
my $type = $v->{type};
next ITEM unless $type;
push @format_keys, $k;
$pack_str .= $type;
# Obviously these will need updating if we use any other pack() datatypes
if($type =~ /^C(\d+)/) {
$log_len += $1;
} elsif($type =~ /^n(\d+)/) {
$log_len += 2 * $1;
} elsif($type =~ /^N(\d+)/) {
$log_len += 4 * $1;
} else {
die "no idea what $type is";
}
}
my $regex = join(' ', @regex);
$self->{log_regex} = qr{^$regex};
$self->{log_format} = $pack_str;
$self->{log_record_length} = $log_len;
$self->{format_keys} = \@format_keys;
return $self;
}
=head2 cached
Returns the index for the given type and value, generating a packet if no previous value was found.
=cut
sub cached {
my $self = shift;
my ($type, $v) = @_;
$v = '' unless defined $v;
my $id = $self->{entry_cache}->{$type}->{$v};
unless(defined $id) {
push @{ $self->{entry_index}->{$type} }, $v;
++$self->{entry_count}->{$type};
$id = $self->{entry_cache}->{$type}->{$v} = scalar(@{ $self->{entry_index}->{$type} }) - 1;
$self->send_packet($type, id => $id, data => encode_utf8($v));
}
return $id;
}
=head2 from_cache
Read a value from the cache, for expanding compressed log format entries.
=cut
sub from_cache {
my $self = shift;
my ($type, $id) = @_;
die "ID $id not found for $type\n" unless defined $self->{entry_index}->{$type}->[$id];
return $self->{entry_index}->{$type}->[$id];
}
=head2 set_key
Set a cache index key to a value when expanding a packet stream.
=cut
sub set_key {
my $self = shift;
my $type = shift;
my %args = @_;
my $v = decode_utf8($args{data});
$self->{entry_cache}->{$type}->{$v} = $args{id};
$self->{entry_index}->{$type}->[$args{id}] = $v;
$self->{"on_set_$type"}->($self, $args{id}, $v) if $self->{"on_set_$type"};
$self->{"on_set_key"}->($self, $type, $args{id}, $v) if $self->{on_set_key};
return $self;
}
=head2 compress
General compression function. Given a line of data, sends packets as required to transmit that information.
=cut
sub compress {
my $self = shift;
my $txt = shift;
my %data;
@data{@{$self->{log_regex_keys}}} = $txt =~ m!$self->{log_regex}!
or return $self->invoke_event(bad_data => $txt);
$data{type} = 0;
$_->($self, \%data) for @{$self->{log_process}};
return if exists($self->{filter}) && !$self->{filter}->($self, \%data);
if(!defined($self->{timestamp}) || $data{timestamp} != $self->{timestamp}) {
$self->{timestamp} = $data{timestamp};
$self->send_packet('timestamp', timestamp => $self->{timestamp});
}
my @fmt = @{$self->{format}};
my @data;
while(@fmt) {
my $k = shift(@fmt);
my $v = shift(@fmt);
if($v->{type}) {
$data{$k} = $self->cached($k, $data{$k}) if exists $v->{id};
push @data, $data{$k};
}
}
$self->write_packet(pack($self->{log_format}, @data));
# Recycle everything after 5m entries
if($self->{log_packet_count}++ >= 5000000) {
$self->send_packet('reset');
$self->{log_packet_count} = 0;
}
return $self;
}
=head2 send_packet
Generate and send a packet for the given type.
lib/ApacheLog/Compressor.pm view on Meta::CPAN
my $type = shift;
# Try the specific method for this packet if we have one
my $method = "packet_$type";
return $self->write_packet($self->$method(@_)) if $self->can($method);
# Otherwise use the generic format for ASCIIZ mapping
my %args = @_;
return $self->write_packet(pack('C1N1Z*', $self->{format_hash}->{$type}->{id}, $args{id}, $args{data}));
}
=head2 packet_reset
Generate a reset packet and clear internal caches in the process.
=cut
sub packet_reset {
my $self = shift;
$self->{entry_cache} = {};
$self->{entry_index} = {};
return pack('C1', 0x80);
}
=head2 packet_server
Generate a server packet.
=cut
sub packet_server {
my $self = shift;
my %args = @_;
return pack('C1Z*', 1, $args{hostname});
}
=head2 packet_timestamp
Generate the timestamp packet.
=cut
sub packet_timestamp {
my $self = shift;
my %args = @_;
return pack('C1N1', 2, $args{timestamp});
}
=head2 write_packet
Write a packet to the output handler.
=cut
sub write_packet {
my ($self, $pkt) = @_;
$self->{on_write}->($self, $pkt);
return $self;
}
=head2 expand
Expand incoming data.
=cut
sub expand {
my $self = shift;
my $pkt = shift;
my $type = unpack('C1', $$pkt);
unless($self->{packet_handler}->{$type}) {
print substr $$pkt, 0, 16;
die "what is $type?";
}
my $method = 'handle_' . $self->{packet_handler}->{$type};
return $self->$method($pkt) if $self->can($method);
return unless index($$pkt, "\0", 5) >= 0;
(undef, my $id, my $data) = unpack('C1N1Z*', $$pkt);
substr $$pkt, 0, 6 + length($data), '';
$self->set_key($self->{packet_handler}->{$type}, data => $data, id => $id);
}
=head2 handle_reset
Handle an incoming reset packet.
=cut
sub handle_reset {
my $self = shift;
my $pkt = shift;
# Clear cache for all items
$self->{entry_cache} = { };
$self->{entry_index} = { };
substr $$pkt, 0, 1, '';
}
=head2 handle_log
Handle an incoming log packet.
=cut
sub handle_log {
my $self = shift;
my $pkt = shift;
return unless length $$pkt >= $self->{log_record_length};
my %data;
@data{@{ $self->{format_keys} }} = unpack($self->{log_format}, $$pkt);
$_->($self, \%data) for @{$self->{log_process_out}};
die "No timestamp" unless $self->{timestamp};
$self->{on_log_line}->($self, \%data) if exists $self->{on_log_line};
substr $$pkt, 0, $self->{log_record_length}, '';
}
=head2 data_hashref
Convert logline data to a hashref.
=cut
sub data_hashref {
my $self = shift;
( run in 0.834 second using v1.01-cache-2.11-cpan-5b529ec07f3 )