App-RabbitTail
view release on metacpan or search on metacpan
lib/App/RabbitTail.pm view on Meta::CPAN
on_close => sub {
warn(sprintf("RabbitMQ connection to %s:%s closed!\n", $self->host, $self->port));
$self->_clear_ch;
$self->_clear_rf;
$self->_cv->send("ARGH");
},
map { $_ => $self->$_ }
qw/ host port user pass vhost /
);
}
catch {
warn($_);
sleep 2;
};
}
return $rf_conn;
}
my %defaults = (
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
exchange_type => 'direct',
exchange_name => 'logs',
exchange_durable => 0,
);
foreach my $k (keys %defaults) {
has $k => ( is => 'ro', isa => Str, default => $defaults{$k} );
}
has _ch => (
is => 'ro',
lazy => 1,
builder => '_build_ch',
clearer => '_clear_ch',
predicate => '_has_ch',
);
sub _build_ch {
my ($self) = @_;
my $ch = $self->_rf->open_channel;
my $exch_frame = $ch->declare_exchange(
type => $self->exchange_type,
durable => $self->exchange_durable,
exchange => $self->exchange_name,
)->method_frame;
die Dumper($exch_frame) unless blessed $exch_frame
and $exch_frame->isa('Net::AMQP::Protocol::Exchange::DeclareOk');
return $ch;
}
sub run {
my $self = shift;
my $tail_started = 0;
while (1) {
$self->_clear_cv;
$self->_ch; # Build channel before going into the event loop
$self->tail # Setup all the timers
unless $tail_started++;
$self->_cv->recv; # Enter event loop. We will leave here if channel dies..
}
}
sub tail {
my $self = shift;
my $rkeys = $self->routing_key;
foreach my $fn ($self->filename->flatten) {
my $rk = $rkeys->shift;
$rkeys->unshift($rk) unless $rkeys->length;
# warn("Setup tail for $fn on $rk");
my $ft = $self->setup_tail($fn, $rk);
$ft->tail;
}
}
sub setup_tail {
my ($self, $file, $routing_key) = @_;
App::RabbitTail::FileTailer->new(
max_sleep => $self->max_sleep,
cb => sub {
my $message = shift;
chomp($message);
# warn("SENT $message to " . $self->exchange_name . " with " . $routing_key);
if (!$self->_has_ch) {
warn("DROPPED $message to " . $self->exchange_name . " with " . $routing_key . "\n");
return;
}
$self->_ch->publish(
body => $message,
exchange => $self->exchange_name,
routing_key => $routing_key,
);
},
fn => $file,
);
}
__PACKAGE__->meta->make_immutable;
__END__
=head1 NAME
App::RabbitTail - Log tailer which broadcasts log lines into RabbitMQ exchanges.
=head1 SYNOPSIS
See the rabbit_tail script shipped with the distribution for simple CLI useage.
use App::RabbitTail;
use AnyEvent; # Not strictly needed, but you probably want to
# use it yourself if you're doing this manually.
my $tailer = App::RabbitTail->new(
# At least 1 filename must be supplied
filename => [qw/ file1 file2 /],
# Optional args, defaults below
routing_key => [qw/ # /],
host => 'localhost',
( run in 0.671 second using v1.01-cache-2.11-cpan-adec679a428 )