ApacheLog-Compressor

 view release on metacpan or  search on metacpan

lib/ApacheLog/Compressor.pm  view on Meta::CPAN

package ApacheLog::Compressor;
# ABSTRACT: Convert Apache/CLF data to binary format
use strict;
use warnings;

use Socket qw(inet_aton inet_ntoa);
use Date::Parse qw(str2time);
use List::Util qw(min);
use URI;
use URI::Escape qw(uri_unescape);
use DateTime;
use Encode qw(encode_utf8 decode_utf8 FB_DEFAULT is_utf8 FB_CROAK);
use POSIX qw{strftime};

our $VERSION = '0.005';

=head1 NAME

ApacheLog::Compressor - convert Apache / CLF log files into a binary format for transfer

=head1 VERSION

version 0.005

=head1 SYNOPSIS

 use ApacheLog::Compressor;
 use Sys::Hostname qw(hostname);

 # Write all data to bzip2-compressed output file
 open my $out_fh, '>', 'compressed.log.bz2' or die "Failed to create output file: $!";
 binmode $out_fh;
 my $zip = IO::Compress::Bzip2->new($out_fh, BlockSize100K => 9);

 # Provide a callback to send data through to the file
 my $alc = ApacheLog::Compressor->new(
	on_write	=> sub {
		my ($self, $pkt) = @_;
		$zip->write($pkt);
	}
 );

 # Input file - normally use whichever one's just been closed + rotated
 open my $fh, '<', '/var/log/apache2/access.log.1' or die "Failed to open log: $!";

 # Initial packet to identify which server this came from
 $alc->send_packet('server',
 	hostname	=> hostname(),
 );

 # Read and compress all the lines in the files
 while(my $line = <$fh>) {
	 $alc->compress($line);
 }
 close $fh or die $!;
 $zip->close;

 # Dump the stats in case anyone finds them useful
 $alc->stats;

=head1 DESCRIPTION

Converts data from standard Apache log format into a binary stream which is typically 20% - 60% the size of the original file.
Intended for cases where log data needs transferring from multiple high-volume servers for analysis (potentially in realtime
via tail -f).

The log format is a simple dictionary replacement algorithm: each field that cannot be represented in a fixed-width datatype
is replaced with an indexed value, allowing the basic log line packet to be fixed size with additional packets containing the
first instance of each variable-width data item.

Example:

lib/ApacheLog/Compressor.pm  view on Meta::CPAN

	my $method = 'handle_' . $self->{packet_handler}->{$type};
	return $self->$method($pkt) if $self->can($method);

	return unless index($$pkt, "\0", 5) >= 0;

	(undef, my $id, my $data) = unpack('C1N1Z*', $$pkt);
	substr $$pkt, 0, 6 + length($data), '';
	$self->set_key($self->{packet_handler}->{$type}, data => $data, id => $id);
}

=head2 handle_reset

Handle an incoming reset packet.

=cut

sub handle_reset {
	my $self = shift;
	my $pkt = shift;
	# Clear cache for all items
	$self->{entry_cache} = { };
	$self->{entry_index} = { };
	substr $$pkt, 0, 1, '';
}

=head2 handle_log

Handle an incoming log packet.

=cut

sub handle_log {
	my $self = shift;
	my $pkt = shift;
	return unless length $$pkt >= $self->{log_record_length};

	my %data;
	@data{@{ $self->{format_keys} }} = unpack($self->{log_format}, $$pkt);
	$_->($self, \%data) for @{$self->{log_process_out}};

	die "No timestamp" unless $self->{timestamp};
	$self->{on_log_line}->($self, \%data) if exists $self->{on_log_line};
	substr $$pkt, 0, $self->{log_record_length}, '';
}

=head2 data_hashref

Convert logline data to a hashref.

=cut

sub data_hashref {
	my $self = shift;
	my $data = shift;
	my %info = %$data;

	$info{$_} = $self->from_cache($_, $info{$_}) for qw(vhost user url query useragent refer);
	$info{server} = $self->{server};
	undef $info{$_} for grep { $info{$_} eq '-' } qw(user refer size useragent);
	undef $info{query} unless defined $info{query} && length $info{query};
#DateTime->from_epoch(epoch => $self->{timestamp})->strftime("%d/%b/%Y:%H:%M:%S %z");
	$info{timestamp} = strftime("%d/%b/%Y:%H:%M:%S %z", gmtime($self->{timestamp}));
	return \%info;
}

=head2 data_to_text

Internal method for converting the current log entry to a text string in
something approaching the 'standard' Apache log format (almost, but not quite,
CLF).

=cut

sub data_to_text {
	my $self = shift;
	my $data = shift;
	my $q = $self->from_cache('query', $data->{query});
	$q = '' unless defined $q;
	return join(' ',
		$self->from_cache('vhost', $data->{vhost}),
		$data->{duration},
		$data->{ip},
		'-',
		$self->from_cache('user', $data->{user}),
		'[' . DateTime->from_epoch(epoch => $self->{timestamp})->strftime("%d/%b/%Y:%H:%M:%S %z") . ']',
		'"' . $data->{method} . ' ' . $self->from_cache('url', $data->{url}) . (length $q ? "?$q" : "") . ' HTTP/' . ($data->{ver} ? '1.1' : '1.0') . '"',
		$data->{result},
		$data->{size},
		'"' . $self->from_cache('useragent', $data->{useragent}) . '"',
		'"' . $self->from_cache('refer', $data->{refer}) . '"',
	);
}

=head2 handle_server

Internal method for processing a server record (used to indicate the server
name subsequent records apply to).

=cut

sub handle_server {
	my $self = shift;
	my $pkt = shift;
	return unless index($$pkt, "\0", 1) >= 0;
	(undef, my $server) = unpack('C1Z*', $$pkt);
	substr $$pkt, 0, 2 + length($server), '';
	$self->{server} = $server;
	$self;
}

=head2 handle_timestamp

Internal method for processing a timestamp entry.

=cut

sub handle_timestamp {
	my $self = shift;
	my $pkt = shift;
	return unless length $$pkt >= 5;
	(undef, my $hostname) = unpack('C1N1', $$pkt);
	substr $$pkt, 0, 5, '';
	$self->{timestamp} = $hostname;
	warn "Zero timestamp?" unless $self->{timestamp};
	$self;
}

=head2 invoke_event

Internal method for invoking an event.

=cut

sub invoke_event {
	my $self = shift;
	my $event = shift;
	my $code = $self->{"on_" . $event} || $self->can("on_" . $event) or return;
	return $code->(@_);
}

=head2 stats

Print current stats - not all that useful since we clear cached values regularly.

=cut



( run in 0.679 second using v1.01-cache-2.11-cpan-39bf76dae61 )