AnyEvent-Eris

 view release on metacpan or  search on metacpan

MANIFEST  view on Meta::CPAN

META.yml
Makefile.PL
README
README.md
lib/AnyEvent/eris.pm
lib/AnyEvent/eris/Client.pm
lib/AnyEvent/eris/Server.pm
t/client/basic.t
t/lib/Eris/Test.pm
t/server/basic.t
t/server/debug.t
t/server/dispatching_messages.t
t/server/dump.t
t/server/flush_client.t
t/server/fullfeed.t
t/server/match.t
t/server/regex.t
t/server/register_client.t
t/server/run.t
t/server/status.t
t/server/subscribe.t

lib/AnyEvent/eris/Server.pm  view on Meta::CPAN

# ABSTRACT: eris pub/sub Server

use strict;
use warnings;
use Scalar::Util;
use Sys::Hostname;
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::Graphite;

my @_STREAM_NAMES     = qw(subscription match debug full regex);
my %_STREAM_ASSISTERS = (
    subscription => 'programs',
    match        => 'words',
);

# Precompiled Regular Expressions
my %_PRE = (
    program => qr/\s+\d+:\d+:\d+\s+\S+\s+([^:\s]+)(:|\s)/,
);

sub _server_error {
    my ( $self, $err_str, $fatal ) = @_;
    my $err_num = $!+0;
    AE::log debug => "SERVER ERROR: $err_num, $err_str";

    $fatal and $self->{'_cv'}->send;
}

my %client_commands = (
    fullfeed    => qr{^fullfeed},
    nofullfeed  => qr{^nofull(feed)?},
    subscribe   => qr{^sub(?:scribe)?\s(.*)},
    unsubscribe => qr{^unsub(?:scribe)?\s(.*)},
    match       => qr{^match (.*)},
    nomatch     => qr{^nomatch (.*)},
    debug       => qr{^debug},
    nobug       => qr{^no(de)?bug},
    regex       => qr{^re(?:gex)?\s(.*)},
    noregex     => qr{^nore(gex)?},
    status      => qr{^status},
    dump        => qr{^dump\s(\S+)},
    quit        => qr{(exit|q(uit)?)},
);

sub handle_subscribe {
    my ( $self, $handle, $SID, $args ) = @_;

lib/AnyEvent/eris/Server.pm  view on Meta::CPAN

            and delete $self->{'words'}{$word};
    }

    $handle->push_write(
        'No longer receiving messages matching : ' .
        join( ', ', @words )                       .
        "\n"
    );
}

sub handle_debug {
    my ( $self, $handle, $SID ) = @_;

    $self->remove_stream( $SID, 'full' );

    $self->clients->{$SID}{'debug'} = 1;
    $handle->push_write("Debugging enabled.\n");
}

sub handle_nobug {
    my ( $self, $handle, $SID ) = @_;

    $self->remove_stream( $SID, 'debug' );
    delete $self->clients->{$SID}{'debug'};
    $handle->push_write("Debugging disabled.\n");
}

sub handle_regex {
    my ( $self, $handle, $SID, $args ) = @_;

    # do not handle a regex if it's already full subscription
    $self->clients->{$SID}{'full'}
        and return;

lib/AnyEvent/eris/Server.pm  view on Meta::CPAN

sub handle_quit {
    my ( $self, $handle, $SID ) = @_;
    $handle->push_write('Terminating connection on your request.');
    $self->hangup_client($SID);
    $self->{'_cv'}->send;
}

sub hangup_client {
    my ( $self, $SID ) = @_;
    delete $self->clients->{$SID};
    AE::log debug => "Client Termination Posted: $SID";
}

sub remove_stream {
    my ( $self, $SID, $stream ) = @_;
    AE::log debug => "Removing '$stream' for $SID";

    my $client_streams = delete $self->clients->{$SID}{'streams'}{$stream};

    # FIXME:
    # I *think* what this is supposed to do is delete assists
    # that were registered for this client, which it doesn't
    # - it deletes global assists instead - this needs to be
    # looked into
    if ($client_streams) {
        if ( my $assist = $_STREAM_ASSISTERS{$stream} ) {

lib/AnyEvent/eris/Server.pm  view on Meta::CPAN

                $inner_self->hangup_client($SID);
                $inner_self->_server_error( $msg, $fatal );
                $hdl->destroy;
            },

            on_eof => sub {
                my ($hdl) = @_;
                my $SID = $inner_self->_session_id($hdl);
                $inner_self->hangup_client($SID);
                $hdl->destroy;
                AE::log debug => "SERVER, client $SID disconnected.";
            },

            on_read => sub {
                my ($hdl) = @_;
                chomp( my $line = delete $hdl->{'rbuf'} );
                my $SID = $inner_self->_session_id($hdl);

                foreach my $command ( keys %client_commands ) {
                    my $regex = $client_commands{$command};
                    if ( my ($args) = ( $line =~ /$regex/i ) ) {

lib/AnyEvent/eris/Server.pm  view on Meta::CPAN


    eval {
        $self->{'_graphite'} = AnyEvent::Graphite->new(
            host => $self->{'config'}{'GraphiteHost'},
            port => $self->{'config'}{'GraphitePort'},
        );

        1;
    } or do {
        my $error = $@ || 'Zombie error';
        AE::log debug => "Graphite server setup failed: $error";
    }
}

sub stats {
    my $self = shift;

    if ( ! exists $self->{'stats'} ) {
        $self->{'stats'} = {
            map +( $_ => 0 ), qw<
                received received_bytes dispatched dispatched _bytes

lib/AnyEvent/eris/Server.pm  view on Meta::CPAN

        my $time = AE::now;
        foreach my $stat ( keys %{$stats}) {
            my $metric = join '.', $self->{'config'}{'GraphitePrefix'},
                                   $self->{'hostname'},
                                   $stat;
            eval {
                $self->{'_graphite'}->send($metric, $stats->{$stat}, $time);
                1;
            } or do {
                my $error = $@ || 'Zombie error';
                AE::log debug => 'Error sending statistics, reconnecting.';
                $self->graphite_connect;
                last;
            }
        }
    }

    AE::log debug => 'STATS: ' .
                     join ', ', map "$_:$stats->{$_}", keys %{$stats};
}

sub run {
    my $self       = shift;
    $self->{'_cv'} = shift || AE::cv;
    $self->{'_cv'}->recv;
}

sub clients {

t/server/debug.t  view on Meta::CPAN

    my $hdl; $hdl = AnyEvent::Handle->new(
        fh       => $fh,
        on_error => sub { AE::log error => $_[2]; $_[0]->destroy },
        on_eof   => sub { $hdl->destroy; AE::log info => 'Done.' },
        on_read  => sub {
            my ($hdl) = @_;
            chomp( my $line = delete $hdl->{'rbuf'} );

            if ( $line =~ /^EHLO Streamer \(KERNEL: \d+:(.+)\)/ ) {
                $SID = $1;
                $hdl->push_write("debug\n");

                is(
                    $server->clients->{$SID}{'debug'},
                    undef,
                    'No debugging for client yet',
                );
            } elsif ( $line =~ /^Debugging enabled/ ) {
                is(
                    $server->clients->{$SID}{'debug'},
                    1,
                    'Client has debugging enabled',
                );

                $hdl->push_write("nodebug\n");
            } elsif ( $line =~ /^Debugging disabled/ ) {
                is(
                    $server->clients->{$SID}{'debug'},
                    undef,
                    'Client does not have debugging enabled',
                );

                $cv->send('OK');
            } else {
                $cv->send("Unknown response: $line");
            }
        },
    );
};

t/server/dump.t  view on Meta::CPAN

            chomp( my $line = delete $hdl->{'rbuf'} );

            if ( $line =~ /^EHLO/ ) {
                ($SID) = $line =~ /\(KERNEL:\s\d+:([a-fA-F0-9]+)\)/;
                $hdl->push_write("fullfeed\n");
            } elsif ( $line =~ /^Full feed enabled/ ) {
                $hdl->push_write("dump streams\n");
            } else {
                is(
                    $line,
                    "subscription -> \nmatch -> \ndebug -> \n" .
                    "full -> $SID\n" .
                    "regex -> ",
                    'Correct dump output'
                );

                $cv->send('OK');
            }
        },
    );
};



( run in 1.738 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )