AnyEvent-Eris

 view release on metacpan or  search on metacpan

lib/AnyEvent/eris/Client.pm  view on Meta::CPAN

package AnyEvent::eris::Client;
# ABSTRACT: eris pub/sub Client

use strict;
use warnings;
use Carp;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use List::Util;
use Scalar::Util;
use Parse::Syslog::Line 'parse_syslog_line';

# we recognize these
my @PROTOCOL_LINE_PREFIXES = (
    'Subscribe to :',
    'Receiving ',
    'Full feed enabled',
    'EHLO Streamer',
);

sub new {
    my ( $class, %opts ) = @_;

    my $self = bless {
        RemoteAddress  => '127.0.0.1',
        RemotePort     => 9514,
        ReturnType     => 'hash',
        Subscribe      => undef,
        Match          => undef,
        MessageHandler => undef,
        %opts,
    }, $class;

    $opts{'MessageHandler'}
        or AE::log fatal => 'You must provide a MessageHandler';

    ref $opts{'MessageHandler'} eq 'CODE'
        or AE::log fatal => 'You need to specify a subroutine reference to the \'MessageHandler\' parameter.';

    $self->_connect;

    return $self;
}

sub _connect {
    my $self = shift;

    my $block           = $self->{'ReturnType'} eq 'block';
    my $separator       = $block ? "\n" : '';
    my ( $addr, $port ) = @{$self}{qw<RemoteAddress RemotePort>};

    # FIXME: TODO item for this
    #        in second thought, this should just be removed because
    #        it's meant for internal manual buffering, which we don't need
    $block
        and AE::log fatal => 'Block option not supported';

    Scalar::Util::weaken( my $inner_self = $self );

    $self->{'_client'} ||= tcp_connect $addr, $port, sub {
        my ($fh) = @_
            or AE::log fatal => "Connect failed: $!";

        my $hdl; $hdl = AnyEvent::Handle->new(
            fh       => $fh,
            on_error => sub {
                AE::log error => $_[2];
                $_[0]->destroy;
                $inner_self->{'_reconnect_timer'} = AE::timer 10, 0, sub {
                    undef $inner_self->{'_reconnect_timer'};
                    $inner_self->_connect;
                };
            },

            on_eof   => sub { $hdl->destroy; AE::log info => 'Done.' },

            on_read  => sub {
                $hdl->push_read (line => sub {
                    my ($hdl, $line) = @_;

                    List::Util::first {
                        substr( $line, 0, length $_ ) eq $_
                    } @PROTOCOL_LINE_PREFIXES and return;

                    $inner_self->handle_message( $line, $hdl );
                });
            },
        );

        $inner_self->{'buffer'} = '';

        # FIXME: should this really be in a timer?
        # all the actions relating to a socket are deferred anyway
        $inner_self->{'_setup_pipe_timer'} = AE::timer 0, 0, sub {
            undef $inner_self->{'_setup_pipe_timer'};
            $inner_self->setup_pipe($hdl);
        };
    };

    return $self;
}

sub setup_pipe {
    my ( $self, $handle ) = @_;

    # Parse for Subscriptions or Matches
    my %data;
    foreach my $target (qw(Subscribe Match)) {
        if ( defined $self->{$target} ) {
            my @data = ref $self->{$target} eq 'ARRAY'
                     ? @{ $self->{$target} }
                     : $self->{$target};

            @data = map lc, @data if $target eq 'Subscribe';
            next unless scalar @data > 0;
            $data{$target} = \@data;
        }
    }



( run in 1.595 second using v1.01-cache-2.11-cpan-39bf76dae61 )