AnyEvent-Eris
view release on metacpan or search on metacpan
META.yml
Makefile.PL
README
README.md
lib/AnyEvent/eris.pm
lib/AnyEvent/eris/Client.pm
lib/AnyEvent/eris/Server.pm
t/client/basic.t
t/lib/Eris/Test.pm
t/server/basic.t
t/server/debug.t
t/server/dispatching_messages.t
t/server/dump.t
t/server/flush_client.t
t/server/fullfeed.t
t/server/match.t
t/server/regex.t
t/server/register_client.t
t/server/run.t
t/server/status.t
t/server/subscribe.t
lib/AnyEvent/eris/Server.pm view on Meta::CPAN
# ABSTRACT: eris pub/sub Server
use strict;
use warnings;
use Scalar::Util;
use Sys::Hostname;
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::Graphite;
my @_STREAM_NAMES = qw(subscription match debug full regex);
my %_STREAM_ASSISTERS = (
subscription => 'programs',
match => 'words',
);
# Precompiled Regular Expressions
my %_PRE = (
program => qr/\s+\d+:\d+:\d+\s+\S+\s+([^:\s]+)(:|\s)/,
);
sub _server_error {
my ( $self, $err_str, $fatal ) = @_;
my $err_num = $!+0;
AE::log debug => "SERVER ERROR: $err_num, $err_str";
$fatal and $self->{'_cv'}->send;
}
my %client_commands = (
fullfeed => qr{^fullfeed},
nofullfeed => qr{^nofull(feed)?},
subscribe => qr{^sub(?:scribe)?\s(.*)},
unsubscribe => qr{^unsub(?:scribe)?\s(.*)},
match => qr{^match (.*)},
nomatch => qr{^nomatch (.*)},
debug => qr{^debug},
nobug => qr{^no(de)?bug},
regex => qr{^re(?:gex)?\s(.*)},
noregex => qr{^nore(gex)?},
status => qr{^status},
dump => qr{^dump\s(\S+)},
quit => qr{(exit|q(uit)?)},
);
sub handle_subscribe {
my ( $self, $handle, $SID, $args ) = @_;
lib/AnyEvent/eris/Server.pm view on Meta::CPAN
and delete $self->{'words'}{$word};
}
$handle->push_write(
'No longer receiving messages matching : ' .
join( ', ', @words ) .
"\n"
);
}
sub handle_debug {
my ( $self, $handle, $SID ) = @_;
$self->remove_stream( $SID, 'full' );
$self->clients->{$SID}{'debug'} = 1;
$handle->push_write("Debugging enabled.\n");
}
sub handle_nobug {
my ( $self, $handle, $SID ) = @_;
$self->remove_stream( $SID, 'debug' );
delete $self->clients->{$SID}{'debug'};
$handle->push_write("Debugging disabled.\n");
}
sub handle_regex {
my ( $self, $handle, $SID, $args ) = @_;
# do not handle a regex if it's already full subscription
$self->clients->{$SID}{'full'}
and return;
lib/AnyEvent/eris/Server.pm view on Meta::CPAN
sub handle_quit {
my ( $self, $handle, $SID ) = @_;
$handle->push_write('Terminating connection on your request.');
$self->hangup_client($SID);
$self->{'_cv'}->send;
}
sub hangup_client {
my ( $self, $SID ) = @_;
delete $self->clients->{$SID};
AE::log debug => "Client Termination Posted: $SID";
}
sub remove_stream {
my ( $self, $SID, $stream ) = @_;
AE::log debug => "Removing '$stream' for $SID";
my $client_streams = delete $self->clients->{$SID}{'streams'}{$stream};
# FIXME:
# I *think* what this is supposed to do is delete assists
# that were registered for this client, which it doesn't
# - it deletes global assists instead - this needs to be
# looked into
if ($client_streams) {
if ( my $assist = $_STREAM_ASSISTERS{$stream} ) {
lib/AnyEvent/eris/Server.pm view on Meta::CPAN
$inner_self->hangup_client($SID);
$inner_self->_server_error( $msg, $fatal );
$hdl->destroy;
},
on_eof => sub {
my ($hdl) = @_;
my $SID = $inner_self->_session_id($hdl);
$inner_self->hangup_client($SID);
$hdl->destroy;
AE::log debug => "SERVER, client $SID disconnected.";
},
on_read => sub {
my ($hdl) = @_;
chomp( my $line = delete $hdl->{'rbuf'} );
my $SID = $inner_self->_session_id($hdl);
foreach my $command ( keys %client_commands ) {
my $regex = $client_commands{$command};
if ( my ($args) = ( $line =~ /$regex/i ) ) {
lib/AnyEvent/eris/Server.pm view on Meta::CPAN
eval {
$self->{'_graphite'} = AnyEvent::Graphite->new(
host => $self->{'config'}{'GraphiteHost'},
port => $self->{'config'}{'GraphitePort'},
);
1;
} or do {
my $error = $@ || 'Zombie error';
AE::log debug => "Graphite server setup failed: $error";
}
}
sub stats {
my $self = shift;
if ( ! exists $self->{'stats'} ) {
$self->{'stats'} = {
map +( $_ => 0 ), qw<
received received_bytes dispatched dispatched _bytes
lib/AnyEvent/eris/Server.pm view on Meta::CPAN
my $time = AE::now;
foreach my $stat ( keys %{$stats}) {
my $metric = join '.', $self->{'config'}{'GraphitePrefix'},
$self->{'hostname'},
$stat;
eval {
$self->{'_graphite'}->send($metric, $stats->{$stat}, $time);
1;
} or do {
my $error = $@ || 'Zombie error';
AE::log debug => 'Error sending statistics, reconnecting.';
$self->graphite_connect;
last;
}
}
}
AE::log debug => 'STATS: ' .
join ', ', map "$_:$stats->{$_}", keys %{$stats};
}
sub run {
my $self = shift;
$self->{'_cv'} = shift || AE::cv;
$self->{'_cv'}->recv;
}
sub clients {
t/server/debug.t view on Meta::CPAN
my $hdl; $hdl = AnyEvent::Handle->new(
fh => $fh,
on_error => sub { AE::log error => $_[2]; $_[0]->destroy },
on_eof => sub { $hdl->destroy; AE::log info => 'Done.' },
on_read => sub {
my ($hdl) = @_;
chomp( my $line = delete $hdl->{'rbuf'} );
if ( $line =~ /^EHLO Streamer \(KERNEL: \d+:(.+)\)/ ) {
$SID = $1;
$hdl->push_write("debug\n");
is(
$server->clients->{$SID}{'debug'},
undef,
'No debugging for client yet',
);
} elsif ( $line =~ /^Debugging enabled/ ) {
is(
$server->clients->{$SID}{'debug'},
1,
'Client has debugging enabled',
);
$hdl->push_write("nodebug\n");
} elsif ( $line =~ /^Debugging disabled/ ) {
is(
$server->clients->{$SID}{'debug'},
undef,
'Client does not have debugging enabled',
);
$cv->send('OK');
} else {
$cv->send("Unknown response: $line");
}
},
);
};
t/server/dump.t view on Meta::CPAN
chomp( my $line = delete $hdl->{'rbuf'} );
if ( $line =~ /^EHLO/ ) {
($SID) = $line =~ /\(KERNEL:\s\d+:([a-fA-F0-9]+)\)/;
$hdl->push_write("fullfeed\n");
} elsif ( $line =~ /^Full feed enabled/ ) {
$hdl->push_write("dump streams\n");
} else {
is(
$line,
"subscription -> \nmatch -> \ndebug -> \n" .
"full -> $SID\n" .
"regex -> ",
'Correct dump output'
);
$cv->send('OK');
}
},
);
};
( run in 1.738 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )