AnyEvent-Eris

 view release on metacpan or  search on metacpan

lib/AnyEvent/eris/Server.pm  view on Meta::CPAN

    my $self     = bless {
        ListenAddress  => '127.0.0.1', # "localhost" doesn't work :/
        ListenPort     => 9514,
        GraphitePort   => 2003,
        GraphitePrefix => 'eris.dispatcher',
        hostname       => $hostname,

        @_,

        clients        => {},
        buffers        => {},
    }, $class;

    my ( $host, $port ) = @{$self}{qw<ListenAddress ListenPort>};
    Scalar::Util::weaken( my $inner_self = $self );

    $self->{'_tcp_server_guard'} ||= tcp_server $host, $port, sub {
        my ($fh) = @_
           or return $inner_self->_server_error($!);

        my $handle; $handle = AnyEvent::Handle->new(
            fh       => $fh,
            on_error => sub {
                my ( $hdl, $fatal, $msg ) = @_;
                my $SID = $inner_self->_session_id($hdl);
                $inner_self->hangup_client($SID);
                $inner_self->_server_error( $msg, $fatal );
                $hdl->destroy;
            },

            on_eof => sub {
                my ($hdl) = @_;
                my $SID = $inner_self->_session_id($hdl);
                $inner_self->hangup_client($SID);
                $hdl->destroy;
                AE::log debug => "SERVER, client $SID disconnected.";
            },

            on_read => sub {
                my ($hdl) = @_;
                chomp( my $line = delete $hdl->{'rbuf'} );
                my $SID = $inner_self->_session_id($hdl);

                foreach my $command ( keys %client_commands ) {
                    my $regex = $client_commands{$command};
                    if ( my ($args) = ( $line =~ /$regex/i ) ) {
                        my $method = "handle_$command";
                        return $inner_self->$method( $hdl, $SID, $args );
                    }
                }

                $hdl->push_write("UNKNOWN COMMAND, Ignored.\015\012");
            },
        );

        my $SID = $inner_self->_session_id($handle);
        $handle->push_write("EHLO Streamer (KERNEL: $$:$SID)\n");
        $inner_self->register_client( $SID, $handle );
    };

    $self->{'_timers'}{'flush'} = AE::timer 0.1, 0.1, sub {
        $inner_self->flush_client;
    };

    $self->{'_timers'}{'stats'} = AE::timer 0, 60, sub {
        $inner_self->stats;
    };

    # Statistics Tracking
    $self->{'config'}{'GraphiteHost'}
        and $self->graphite_connect;

    return $self;
}

sub flush_client {
    my $self    = shift;
    my $clients = $self->{'clients'};
    my $buffers = $self->{'buffers'};

    foreach my $SID ( keys %{$buffers} ) {
        my $msgs = $buffers->{$SID};
        @{$msgs} > 0 or next;

        # write the messages to the SID
        my $msgs_str = join "\n", @{$msgs};

        $clients->{$SID}{'handle'}->push_write("$msgs_str\n");
        $buffers->{$SID} = [];
    }
}

sub graphite_connect {
    my $self = shift;

    eval {
        $self->{'_graphite'} = AnyEvent::Graphite->new(
            host => $self->{'config'}{'GraphiteHost'},
            port => $self->{'config'}{'GraphitePort'},
        );

        1;
    } or do {
        my $error = $@ || 'Zombie error';
        AE::log debug => "Graphite server setup failed: $error";
    }
}

sub stats {
    my $self = shift;

    if ( ! exists $self->{'stats'} ) {
        $self->{'stats'} = {
            map +( $_ => 0 ), qw<
                received received_bytes dispatched dispatched _bytes
            >
        };

        return;
    }

    my $stats = delete $self->{'stats'};

    if ( $self->{'_graphite'} ) {
        my $time = AE::now;



( run in 1.345 second using v1.01-cache-2.11-cpan-13bb782fe5a )