AnyEvent-Eris
view release on metacpan or search on metacpan
lib/AnyEvent/eris/Client.pm view on Meta::CPAN
use List::Util;
use Scalar::Util;
use Parse::Syslog::Line 'parse_syslog_line';
# we recognize these
my @PROTOCOL_LINE_PREFIXES = (
'Subscribe to :',
'Receiving ',
'Full feed enabled',
'EHLO Streamer',
);
sub new {
my ( $class, %opts ) = @_;
my $self = bless {
RemoteAddress => '127.0.0.1',
RemotePort => 9514,
ReturnType => 'hash',
Subscribe => undef,
Match => undef,
MessageHandler => undef,
%opts,
}, $class;
$opts{'MessageHandler'}
or AE::log fatal => 'You must provide a MessageHandler';
ref $opts{'MessageHandler'} eq 'CODE'
or AE::log fatal => 'You need to specify a subroutine reference to the \'MessageHandler\' parameter.';
$self->_connect;
return $self;
}
sub _connect {
my $self = shift;
my $block = $self->{'ReturnType'} eq 'block';
my $separator = $block ? "\n" : '';
my ( $addr, $port ) = @{$self}{qw<RemoteAddress RemotePort>};
# FIXME: TODO item for this
# in second thought, this should just be removed because
# it's meant for internal manual buffering, which we don't need
$block
and AE::log fatal => 'Block option not supported';
Scalar::Util::weaken( my $inner_self = $self );
$self->{'_client'} ||= tcp_connect $addr, $port, sub {
my ($fh) = @_
or AE::log fatal => "Connect failed: $!";
my $hdl; $hdl = AnyEvent::Handle->new(
fh => $fh,
on_error => sub {
AE::log error => $_[2];
$_[0]->destroy;
$inner_self->{'_reconnect_timer'} = AE::timer 10, 0, sub {
undef $inner_self->{'_reconnect_timer'};
$inner_self->_connect;
};
},
on_eof => sub { $hdl->destroy; AE::log info => 'Done.' },
on_read => sub {
$hdl->push_read (line => sub {
my ($hdl, $line) = @_;
List::Util::first {
substr( $line, 0, length $_ ) eq $_
} @PROTOCOL_LINE_PREFIXES and return;
$inner_self->handle_message( $line, $hdl );
});
},
);
$inner_self->{'buffer'} = '';
# FIXME: should this really be in a timer?
# all the actions relating to a socket are deferred anyway
$inner_self->{'_setup_pipe_timer'} = AE::timer 0, 0, sub {
undef $inner_self->{'_setup_pipe_timer'};
$inner_self->setup_pipe($hdl);
};
};
return $self;
}
sub setup_pipe {
my ( $self, $handle ) = @_;
# Parse for Subscriptions or Matches
my %data;
foreach my $target (qw(Subscribe Match)) {
if ( defined $self->{$target} ) {
my @data = ref $self->{$target} eq 'ARRAY'
? @{ $self->{$target} }
: $self->{$target};
@data = map lc, @data if $target eq 'Subscribe';
next unless scalar @data > 0;
$data{$target} = \@data;
}
}
# Check to make sure we're doing something
keys %data
or AE::log fatal => 'Must specify a subscription or a match parameters!';
# Send the Subscription
foreach my $target ( sort keys %data ) {
my $subname = 'do_' . lc $target;
$self->$subname( $handle, $data{$target} );
}
}
sub do_subscribe {
my ( $self, $handle, $subs ) = @_;
if ( List::Util::first { $_ eq 'fullfeed' } @{$subs} ) {
$handle->push_write("fullfeed\n");
} else {
$handle->push_write(
'sub ' .
join( ', ', @{$subs} ) .
"\n"
);
}
}
sub do_match {
my ( $self, $handle, $matches ) = @_;
$handle->push_write(
'match ' .
join( ', ', @{$matches} ) .
"\n"
);
}
sub handle_message {
my ( $self, $line, $handle ) = @_;
( run in 0.825 second using v1.01-cache-2.11-cpan-39bf76dae61 )