AnyEvent-Eris
view release on metacpan or search on metacpan
lib/AnyEvent/eris/Server.pm view on Meta::CPAN
}
sub handle_quit {
my ( $self, $handle, $SID ) = @_;
$handle->push_write('Terminating connection on your request.');
$self->hangup_client($SID);
$self->{'_cv'}->send;
}
sub hangup_client {
my ( $self, $SID ) = @_;
delete $self->clients->{$SID};
AE::log debug => "Client Termination Posted: $SID";
}
sub remove_stream {
my ( $self, $SID, $stream ) = @_;
AE::log debug => "Removing '$stream' for $SID";
my $client_streams = delete $self->clients->{$SID}{'streams'}{$stream};
# FIXME:
# I *think* what this is supposed to do is delete assists
# that were registered for this client, which it doesn't
# - it deletes global assists instead - this needs to be
# looked into
if ($client_streams) {
if ( my $assist = $_STREAM_ASSISTERS{$stream} ) {
foreach my $key ( keys %{$client_streams} ) {
--$self->{'assists'}{$assist}{$key} <= 0
and delete $self->{'assists'}{$assist}{$key}
}
}
}
}
sub remove_all_streams {
my ( $self, $SID ) = @_;
foreach my $stream (@_STREAM_NAMES) {
$self->remove_stream( $SID, $stream );
}
}
sub new {
my $class = shift;
my $hostname = ( split '.', hostname )[0];
my $self = bless {
ListenAddress => '127.0.0.1', # "localhost" doesn't work :/
ListenPort => 9514,
GraphitePort => 2003,
GraphitePrefix => 'eris.dispatcher',
hostname => $hostname,
@_,
clients => {},
buffers => {},
}, $class;
my ( $host, $port ) = @{$self}{qw<ListenAddress ListenPort>};
Scalar::Util::weaken( my $inner_self = $self );
$self->{'_tcp_server_guard'} ||= tcp_server $host, $port, sub {
my ($fh) = @_
or return $inner_self->_server_error($!);
my $handle; $handle = AnyEvent::Handle->new(
fh => $fh,
on_error => sub {
my ( $hdl, $fatal, $msg ) = @_;
my $SID = $inner_self->_session_id($hdl);
$inner_self->hangup_client($SID);
$inner_self->_server_error( $msg, $fatal );
$hdl->destroy;
},
on_eof => sub {
my ($hdl) = @_;
my $SID = $inner_self->_session_id($hdl);
$inner_self->hangup_client($SID);
$hdl->destroy;
AE::log debug => "SERVER, client $SID disconnected.";
},
on_read => sub {
my ($hdl) = @_;
chomp( my $line = delete $hdl->{'rbuf'} );
my $SID = $inner_self->_session_id($hdl);
foreach my $command ( keys %client_commands ) {
my $regex = $client_commands{$command};
if ( my ($args) = ( $line =~ /$regex/i ) ) {
my $method = "handle_$command";
return $inner_self->$method( $hdl, $SID, $args );
}
}
$hdl->push_write("UNKNOWN COMMAND, Ignored.\015\012");
},
);
my $SID = $inner_self->_session_id($handle);
$handle->push_write("EHLO Streamer (KERNEL: $$:$SID)\n");
$inner_self->register_client( $SID, $handle );
};
$self->{'_timers'}{'flush'} = AE::timer 0.1, 0.1, sub {
$inner_self->flush_client;
};
$self->{'_timers'}{'stats'} = AE::timer 0, 60, sub {
$inner_self->stats;
};
# Statistics Tracking
$self->{'config'}{'GraphiteHost'}
and $self->graphite_connect;
return $self;
}
( run in 0.417 second using v1.01-cache-2.11-cpan-39bf76dae61 )