AnyEvent-Eris

 view release on metacpan or  search on metacpan

lib/AnyEvent/eris/Server.pm  view on Meta::CPAN

package AnyEvent::eris::Server;
# ABSTRACT: eris pub/sub Server

use strict;
use warnings;
use Scalar::Util;
use Sys::Hostname;
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::Graphite;

my @_STREAM_NAMES     = qw(subscription match debug full regex);
my %_STREAM_ASSISTERS = (
    subscription => 'programs',
    match        => 'words',
);

# Precompiled Regular Expressions
my %_PRE = (
    program => qr/\s+\d+:\d+:\d+\s+\S+\s+([^:\s]+)(:|\s)/,
);

sub _server_error {
    my ( $self, $err_str, $fatal ) = @_;
    my $err_num = $!+0;
    AE::log debug => "SERVER ERROR: $err_num, $err_str";

    $fatal and $self->{'_cv'}->send;
}

my %client_commands = (
    fullfeed    => qr{^fullfeed},
    nofullfeed  => qr{^nofull(feed)?},
    subscribe   => qr{^sub(?:scribe)?\s(.*)},
    unsubscribe => qr{^unsub(?:scribe)?\s(.*)},
    match       => qr{^match (.*)},
    nomatch     => qr{^nomatch (.*)},
    debug       => qr{^debug},
    nobug       => qr{^no(de)?bug},
    regex       => qr{^re(?:gex)?\s(.*)},
    noregex     => qr{^nore(gex)?},
    status      => qr{^status},
    dump        => qr{^dump\s(\S+)},
    quit        => qr{(exit|q(uit)?)},
);

sub handle_subscribe {
    my ( $self, $handle, $SID, $args ) = @_;

    $self->remove_stream( $SID, 'full' );

    my @programs = map lc, split /[\s,]+/, $args;
    foreach my $program (@programs) {
        $self->clients->{$SID}{'subscription'}{$program} = 1;

        # number of registered programs
        $self->{'programs'}{$program}++;
    }

    $handle->push_write(
        'Subscribed to : '     .
        join( ',', @programs ) .
        "\n"
    );
}

sub handle_unsubscribe {
    my ( $self, $handle, $SID, $args ) = @_;

    my @programs = map lc, split /[\s,]+/, $args;
    foreach my $program (@programs) {
        delete $self->clients->{$SID}{'subscription'}{$program};

        --$self->{'programs'}{$program} <= 0
            and delete $self->{'programs'}{$program};
    }

    delete $self->clients->{$SID}{'subscription'};

    $handle->push_write(
        'Subscription removed for : ' .
        join( ',', @programs )        .
        "\n"
    );
}

sub handle_fullfeed {
    my ( $self, $handle, $SID ) = @_;

    $self->remove_all_streams($SID);

    $self->clients->{$SID}{'full'} = 1;

    $handle->push_write(
        "Full feed enabled, all other functions disabled.\n"
    );
}

sub handle_nofullfeed {
    my ( $self, $handle, $SID ) = @_;

    $self->remove_all_streams($SID);

    # XXX: Not in original implementation
    delete $self->clients->{$SID}{'full'};

    $handle->push_write("Full feed disabled.\n");
}

sub handle_match {
    my ( $self, $handle, $SID, $args ) = @_;

    $self->remove_stream( $SID, 'full' );

    my @words = map lc, split /[\s,]+/, $args;
    foreach my $word (@words) {
        $self->{'words'}{$word}++;
        $self->clients->{$SID}{'match'}{$word} = 1;
    }

    $handle->push_write(
        'Receiving messages matching : ' .
        join( ', ', @words )             .
        "\n"
    );
}

sub handle_nomatch {
    my ( $self, $handle, $SID, $args ) = @_;

    my @words = map lc, split /[\s,]+/, $args;
    foreach my $word (@words) {
        delete $self->clients->{$SID}{'match'}{$word};

        # Remove the word from searching if this was the last client
        --$self->{'words'}{$word} <= 0
            and delete $self->{'words'}{$word};
    }

    $handle->push_write(
        'No longer receiving messages matching : ' .
        join( ', ', @words )                       .
        "\n"
    );
}

sub handle_debug {
    my ( $self, $handle, $SID ) = @_;

    $self->remove_stream( $SID, 'full' );

    $self->clients->{$SID}{'debug'} = 1;
    $handle->push_write("Debugging enabled.\n");
}

sub handle_nobug {
    my ( $self, $handle, $SID ) = @_;

    $self->remove_stream( $SID, 'debug' );
    delete $self->clients->{$SID}{'debug'};
    $handle->push_write("Debugging disabled.\n");
}

sub handle_regex {
    my ( $self, $handle, $SID, $args ) = @_;

    # do not handle a regex if it's already full subscription
    $self->clients->{$SID}{'full'}
        and return;

    my $regex;
    eval {
        defined $args && length $args
            and $regex = qr{$args};
        1;
    } or do {
        my $error = $@ || 'Zombie error';

        $handle->push_write(
            "Invalid regular expression '$args', see: perldoc perlre\n"
        );

        return;
    };

    $self->clients->{$SID}{'regex'}{$regex} = 1;
    $handle->push_write(
        "Receiving messages matching regex : $args\n"
    );
}

sub handle_noregex {
    my ( $self, $handle, $SID ) = @_;

    $self->remove_stream( $SID, 'regex' );
    delete $self->clients->{$SID}{'regex'};
    $handle->push_write("No longer receiving regex-based matches\n");
}

sub handle_status {
    my ( $self, $handle, $SID ) = @_;
    my $clients      = $self->clients;
    my $client_count = scalar keys %{$clients};

    my @details = ();
    foreach my $stream (@_STREAM_NAMES) {
        # add streams from all SIDs
        my $stream_count = 0;
        my $assist_count = 0;
        foreach my $SID ( keys %{$clients} ) {
            $clients->{$SID}{$stream}
                and $stream_count++;

            my $assist; $assist = $_STREAM_ASSISTERS{$stream}
                and $assist_count += scalar keys %{ $self->{$assist} || {} };
        }

        $stream_count == 0
            and next;

lib/AnyEvent/eris/Server.pm  view on Meta::CPAN

            my @details = ();
            foreach my $asst ( values %_STREAM_ASSISTERS ) {
                $self->{$asst} or next;
                my @SIDs = grep $clients->{$_}{$asst}, keys %{$clients};
                push @details,
                     "$asst -> " . join ',', keys %{ $self->{$asst} };
            }

            return @details;
        },

        stats => sub {
            my @details = map +(
                "$_ -> $self->{'stats'}{$_}"
            ), keys %{ $self->{'stats'} };

            return @details;
        },

        streams => sub {
            my @details = ();

            foreach my $stream (@_STREAM_NAMES) {
                my @SIDs;
                foreach my $SID ( keys %{$clients} ) {
                    $clients->{$SID}{$stream}
                        or next;

                    my $stream_data = $clients->{$SID}{$stream};
                    push @SIDs, ref $stream_data eq 'HASH'
                              ? "$SID:" . join ',', keys %{$stream_data}
                              : $SID;
                }

                push @details, "$stream -> " . join '; ', @SIDs;
            }

            return @details;
        },
    );

    if ( my $cb = $dispatch{$type} ) {
        my @msgs = $cb->();
        my $msgs = join( "\n", @msgs ) . "\n";
        $handle->push_write($msgs);
    } else {
        $handle->push_write("DUMP[-1]: No comprende.\n");
    }
}

sub handle_quit {
    my ( $self, $handle, $SID ) = @_;
    $handle->push_write('Terminating connection on your request.');
    $self->hangup_client($SID);
    $self->{'_cv'}->send;
}

sub hangup_client {
    my ( $self, $SID ) = @_;
    delete $self->clients->{$SID};
    AE::log debug => "Client Termination Posted: $SID";
}

sub remove_stream {
    my ( $self, $SID, $stream ) = @_;
    AE::log debug => "Removing '$stream' for $SID";

    my $client_streams = delete $self->clients->{$SID}{'streams'}{$stream};

    # FIXME:
    # I *think* what this is supposed to do is delete assists
    # that were registered for this client, which it doesn't
    # - it deletes global assists instead - this needs to be
    # looked into
    if ($client_streams) {
        if ( my $assist = $_STREAM_ASSISTERS{$stream} ) {
            foreach my $key ( keys %{$client_streams} ) {
                --$self->{'assists'}{$assist}{$key} <= 0
                    and delete $self->{'assists'}{$assist}{$key}
            }
        }
    }
}

sub remove_all_streams {
    my ( $self, $SID ) = @_;
    foreach my $stream (@_STREAM_NAMES) {
        $self->remove_stream( $SID, $stream );
    }
}

sub new {
    my $class    = shift;
    my $hostname = ( split '.', hostname )[0];
    my $self     = bless {
        ListenAddress  => '127.0.0.1', # "localhost" doesn't work :/
        ListenPort     => 9514,
        GraphitePort   => 2003,
        GraphitePrefix => 'eris.dispatcher',
        hostname       => $hostname,

        @_,

        clients        => {},
        buffers        => {},
    }, $class;

    my ( $host, $port ) = @{$self}{qw<ListenAddress ListenPort>};
    Scalar::Util::weaken( my $inner_self = $self );

    $self->{'_tcp_server_guard'} ||= tcp_server $host, $port, sub {
        my ($fh) = @_
           or return $inner_self->_server_error($!);

        my $handle; $handle = AnyEvent::Handle->new(
            fh       => $fh,
            on_error => sub {
                my ( $hdl, $fatal, $msg ) = @_;
                my $SID = $inner_self->_session_id($hdl);
                $inner_self->hangup_client($SID);
                $inner_self->_server_error( $msg, $fatal );
                $hdl->destroy;
            },

            on_eof => sub {
                my ($hdl) = @_;
                my $SID = $inner_self->_session_id($hdl);
                $inner_self->hangup_client($SID);
                $hdl->destroy;
                AE::log debug => "SERVER, client $SID disconnected.";
            },

            on_read => sub {
                my ($hdl) = @_;
                chomp( my $line = delete $hdl->{'rbuf'} );
                my $SID = $inner_self->_session_id($hdl);

                foreach my $command ( keys %client_commands ) {
                    my $regex = $client_commands{$command};
                    if ( my ($args) = ( $line =~ /$regex/i ) ) {
                        my $method = "handle_$command";
                        return $inner_self->$method( $hdl, $SID, $args );
                    }
                }

                $hdl->push_write("UNKNOWN COMMAND, Ignored.\015\012");
            },
        );

        my $SID = $inner_self->_session_id($handle);
        $handle->push_write("EHLO Streamer (KERNEL: $$:$SID)\n");
        $inner_self->register_client( $SID, $handle );
    };

    $self->{'_timers'}{'flush'} = AE::timer 0.1, 0.1, sub {
        $inner_self->flush_client;
    };

    $self->{'_timers'}{'stats'} = AE::timer 0, 60, sub {
        $inner_self->stats;
    };

    # Statistics Tracking
    $self->{'config'}{'GraphiteHost'}
        and $self->graphite_connect;

    return $self;
}

sub flush_client {
    my $self    = shift;
    my $clients = $self->{'clients'};
    my $buffers = $self->{'buffers'};

    foreach my $SID ( keys %{$buffers} ) {
        my $msgs = $buffers->{$SID};
        @{$msgs} > 0 or next;

        # write the messages to the SID
        my $msgs_str = join "\n", @{$msgs};

        $clients->{$SID}{'handle'}->push_write("$msgs_str\n");
        $buffers->{$SID} = [];
    }
}

sub graphite_connect {
    my $self = shift;

    eval {
        $self->{'_graphite'} = AnyEvent::Graphite->new(
            host => $self->{'config'}{'GraphiteHost'},
            port => $self->{'config'}{'GraphitePort'},
        );

        1;
    } or do {
        my $error = $@ || 'Zombie error';
        AE::log debug => "Graphite server setup failed: $error";
    }
}

sub stats {
    my $self = shift;

    if ( ! exists $self->{'stats'} ) {
        $self->{'stats'} = {
            map +( $_ => 0 ), qw<
                received received_bytes dispatched dispatched _bytes
            >
        };

        return;
    }

    my $stats = delete $self->{'stats'};

    if ( $self->{'_graphite'} ) {
        my $time = AE::now;
        foreach my $stat ( keys %{$stats}) {
            my $metric = join '.', $self->{'config'}{'GraphitePrefix'},
                                   $self->{'hostname'},
                                   $stat;
            eval {
                $self->{'_graphite'}->send($metric, $stats->{$stat}, $time);
                1;
            } or do {
                my $error = $@ || 'Zombie error';
                AE::log debug => 'Error sending statistics, reconnecting.';
                $self->graphite_connect;
                last;
            }
        }
    }

    AE::log debug => 'STATS: ' .
                     join ', ', map "$_:$stats->{$_}", keys %{$stats};
}

sub run {
    my $self       = shift;
    $self->{'_cv'} = shift || AE::cv;
    $self->{'_cv'}->recv;
}

sub clients {
    my $self = shift;
    $self->{'clients'} ||= {};
}

sub register_client {
    my ( $self, $SID, $handle ) = @_;

    $self->clients->{$SID} = { handle => $handle };
}

sub dispatch_message {
    my ( $self, $msg ) = @_;
    $self->_dispatch_messages( [$msg] );
}

sub dispatch_messages {
    my ( $self, $msgs ) = @_;
    $self->_dispatch_messages( [ split /\n/, $msgs ] );
}

sub _dispatch_messages {
    my ( $self, $msgs ) = @_;

    my $clients    = $self->{'clients'};
    my $buffers    = $self->{'buffers'};
    my $dispatched = 0;
    my $bytes      = 0;

    # Handle fullfeeds
    foreach my $SID ( keys %{$clients} ) {
        push @{ $buffers->{$SID} }, @{$msgs};
        $dispatched += scalar @{$msgs};
        $bytes      += length $_ for @{$msgs};
    }

    foreach my $msg ( @{$msgs} ) {
        # Grab statitics;
        $self->{'stats'}{'received'}++;
        $self->{'stats'}{'received_bytes'} += length $msg;

        # Program based subscriptions
        if ( my ($program) = map lc, ( $msg =~ $_PRE{'program'} ) ) {
            # remove the sub process and PID from the program
            $program =~ s/\(.*//g;
            $program =~ s/\[.*//g;

            if ( exists $self->{'programs'}{$program} && $self->{'programs'}{$program} > 0 ) {
                foreach my $SID ( keys %{$clients} ) {
                    exists $clients->{$SID}{'subscription'}{$program}
                        or next;



( run in 1.173 second using v1.01-cache-2.11-cpan-e93a5daba3e )