App-RabbitTail

 view release on metacpan or  search on metacpan

lib/App/RabbitTail.pm  view on Meta::CPAN

                on_close => sub {
                    warn(sprintf("RabbitMQ connection to %s:%s closed!\n", $self->host, $self->port));
                    $self->_clear_ch;
                    $self->_clear_rf;
                    $self->_cv->send("ARGH");
                },
                map { $_ => $self->$_ }
                qw/ host port user pass vhost /
            );
        }
        catch {
            warn($_);
            sleep 2;
        };
    }
    return $rf_conn;
}

my %defaults = (
    host => 'localhost',
    port => 5672,
    user => 'guest',
    pass => 'guest',
    vhost => '/',
    exchange_type => 'direct',
    exchange_name => 'logs',
    exchange_durable => 0,
);

foreach my $k (keys %defaults) {
    has $k => ( is => 'ro', isa => Str, default => $defaults{$k} );
}

has _ch => (
    is => 'ro',
    lazy => 1,
    builder => '_build_ch',
    clearer => '_clear_ch',
    predicate => '_has_ch',
);

sub _build_ch {
    my ($self) = @_;
    my $ch = $self->_rf->open_channel;
    my $exch_frame = $ch->declare_exchange(
        type => $self->exchange_type,
        durable => $self->exchange_durable,
        exchange => $self->exchange_name,
    )->method_frame;
    die Dumper($exch_frame) unless blessed $exch_frame
        and $exch_frame->isa('Net::AMQP::Protocol::Exchange::DeclareOk');
    return $ch;
}

sub run {
    my $self = shift;
    my $tail_started = 0;
    while (1) {
        $self->_clear_cv;
        $self->_ch; # Build channel before going into the event loop
        $self->tail # Setup all the timers
            unless $tail_started++;
        $self->_cv->recv; # Enter event loop. We will leave here if channel dies..
    }
}

sub tail {
    my $self = shift;
    my $rkeys = $self->routing_key;
    foreach my $fn ($self->filename->flatten) {
        my $rk = $rkeys->shift;
        $rkeys->unshift($rk) unless $rkeys->length;
#        warn("Setup tail for $fn on $rk");
        my $ft = $self->setup_tail($fn, $rk);
        $ft->tail;
    }
}

sub setup_tail {
    my ($self, $file, $routing_key) = @_;
    App::RabbitTail::FileTailer->new(
        max_sleep => $self->max_sleep,
        cb => sub {
            my $message = shift;
            chomp($message);
#            warn("SENT $message to " . $self->exchange_name . " with " . $routing_key);
            if (!$self->_has_ch) {
                warn("DROPPED $message to " . $self->exchange_name . " with " . $routing_key . "\n");
                return;
            }
            $self->_ch->publish(
                body => $message,
                exchange => $self->exchange_name,
                routing_key => $routing_key,
            );
        },
        fn => $file,
    );
}

__PACKAGE__->meta->make_immutable;
__END__

=head1 NAME

App::RabbitTail - Log tailer which broadcasts log lines into RabbitMQ exchanges.

=head1 SYNOPSIS

    See the rabbit_tail script shipped with the distribution for simple CLI useage.

    use App::RabbitTail;
    use AnyEvent; # Not strictly needed, but you probably want to
                  # use it yourself if you're doing this manually.

    my $tailer = App::RabbitTail->new(
        # At least 1 filename must be supplied
        filename => [qw/ file1 file2 /],
        # Optional args, defaults below
        routing_key => [qw/ # /],
        host => 'localhost',



( run in 0.671 second using v1.01-cache-2.11-cpan-adec679a428 )