AnyEvent-Eris

 view release on metacpan or  search on metacpan

lib/AnyEvent/eris/Server.pm  view on Meta::CPAN

}

sub handle_quit {
    my ( $self, $handle, $SID ) = @_;
    $handle->push_write('Terminating connection on your request.');
    $self->hangup_client($SID);
    $self->{'_cv'}->send;
}

sub hangup_client {
    my ( $self, $SID ) = @_;
    delete $self->clients->{$SID};
    AE::log debug => "Client Termination Posted: $SID";
}

sub remove_stream {
    my ( $self, $SID, $stream ) = @_;
    AE::log debug => "Removing '$stream' for $SID";

    my $client_streams = delete $self->clients->{$SID}{'streams'}{$stream};

    # FIXME:
    # I *think* what this is supposed to do is delete assists
    # that were registered for this client, which it doesn't
    # - it deletes global assists instead - this needs to be
    # looked into
    if ($client_streams) {
        if ( my $assist = $_STREAM_ASSISTERS{$stream} ) {
            foreach my $key ( keys %{$client_streams} ) {
                --$self->{'assists'}{$assist}{$key} <= 0
                    and delete $self->{'assists'}{$assist}{$key}
            }
        }
    }
}

sub remove_all_streams {
    my ( $self, $SID ) = @_;
    foreach my $stream (@_STREAM_NAMES) {
        $self->remove_stream( $SID, $stream );
    }
}

sub new {
    my $class    = shift;
    my $hostname = ( split '.', hostname )[0];
    my $self     = bless {
        ListenAddress  => '127.0.0.1', # "localhost" doesn't work :/
        ListenPort     => 9514,
        GraphitePort   => 2003,
        GraphitePrefix => 'eris.dispatcher',
        hostname       => $hostname,

        @_,

        clients        => {},
        buffers        => {},
    }, $class;

    my ( $host, $port ) = @{$self}{qw<ListenAddress ListenPort>};
    Scalar::Util::weaken( my $inner_self = $self );

    $self->{'_tcp_server_guard'} ||= tcp_server $host, $port, sub {
        my ($fh) = @_
           or return $inner_self->_server_error($!);

        my $handle; $handle = AnyEvent::Handle->new(
            fh       => $fh,
            on_error => sub {
                my ( $hdl, $fatal, $msg ) = @_;
                my $SID = $inner_self->_session_id($hdl);
                $inner_self->hangup_client($SID);
                $inner_self->_server_error( $msg, $fatal );
                $hdl->destroy;
            },

            on_eof => sub {
                my ($hdl) = @_;
                my $SID = $inner_self->_session_id($hdl);
                $inner_self->hangup_client($SID);
                $hdl->destroy;
                AE::log debug => "SERVER, client $SID disconnected.";
            },

            on_read => sub {
                my ($hdl) = @_;
                chomp( my $line = delete $hdl->{'rbuf'} );
                my $SID = $inner_self->_session_id($hdl);

                foreach my $command ( keys %client_commands ) {
                    my $regex = $client_commands{$command};
                    if ( my ($args) = ( $line =~ /$regex/i ) ) {
                        my $method = "handle_$command";
                        return $inner_self->$method( $hdl, $SID, $args );
                    }
                }

                $hdl->push_write("UNKNOWN COMMAND, Ignored.\015\012");
            },
        );

        my $SID = $inner_self->_session_id($handle);
        $handle->push_write("EHLO Streamer (KERNEL: $$:$SID)\n");
        $inner_self->register_client( $SID, $handle );
    };

    $self->{'_timers'}{'flush'} = AE::timer 0.1, 0.1, sub {
        $inner_self->flush_client;
    };

    $self->{'_timers'}{'stats'} = AE::timer 0, 60, sub {
        $inner_self->stats;
    };

    # Statistics Tracking
    $self->{'config'}{'GraphiteHost'}
        and $self->graphite_connect;

    return $self;
}



( run in 0.417 second using v1.01-cache-2.11-cpan-39bf76dae61 )