AnyEvent-Eris
view release on metacpan or search on metacpan
lib/AnyEvent/eris/Server.pm view on Meta::CPAN
my $self = bless {
ListenAddress => '127.0.0.1', # "localhost" doesn't work :/
ListenPort => 9514,
GraphitePort => 2003,
GraphitePrefix => 'eris.dispatcher',
hostname => $hostname,
@_,
clients => {},
buffers => {},
}, $class;
my ( $host, $port ) = @{$self}{qw<ListenAddress ListenPort>};
Scalar::Util::weaken( my $inner_self = $self );
$self->{'_tcp_server_guard'} ||= tcp_server $host, $port, sub {
my ($fh) = @_
or return $inner_self->_server_error($!);
my $handle; $handle = AnyEvent::Handle->new(
fh => $fh,
on_error => sub {
my ( $hdl, $fatal, $msg ) = @_;
my $SID = $inner_self->_session_id($hdl);
$inner_self->hangup_client($SID);
$inner_self->_server_error( $msg, $fatal );
$hdl->destroy;
},
on_eof => sub {
my ($hdl) = @_;
my $SID = $inner_self->_session_id($hdl);
$inner_self->hangup_client($SID);
$hdl->destroy;
AE::log debug => "SERVER, client $SID disconnected.";
},
on_read => sub {
my ($hdl) = @_;
chomp( my $line = delete $hdl->{'rbuf'} );
my $SID = $inner_self->_session_id($hdl);
foreach my $command ( keys %client_commands ) {
my $regex = $client_commands{$command};
if ( my ($args) = ( $line =~ /$regex/i ) ) {
my $method = "handle_$command";
return $inner_self->$method( $hdl, $SID, $args );
}
}
$hdl->push_write("UNKNOWN COMMAND, Ignored.\015\012");
},
);
my $SID = $inner_self->_session_id($handle);
$handle->push_write("EHLO Streamer (KERNEL: $$:$SID)\n");
$inner_self->register_client( $SID, $handle );
};
$self->{'_timers'}{'flush'} = AE::timer 0.1, 0.1, sub {
$inner_self->flush_client;
};
$self->{'_timers'}{'stats'} = AE::timer 0, 60, sub {
$inner_self->stats;
};
# Statistics Tracking
$self->{'config'}{'GraphiteHost'}
and $self->graphite_connect;
return $self;
}
sub flush_client {
my $self = shift;
my $clients = $self->{'clients'};
my $buffers = $self->{'buffers'};
foreach my $SID ( keys %{$buffers} ) {
my $msgs = $buffers->{$SID};
@{$msgs} > 0 or next;
# write the messages to the SID
my $msgs_str = join "\n", @{$msgs};
$clients->{$SID}{'handle'}->push_write("$msgs_str\n");
$buffers->{$SID} = [];
}
}
sub graphite_connect {
my $self = shift;
eval {
$self->{'_graphite'} = AnyEvent::Graphite->new(
host => $self->{'config'}{'GraphiteHost'},
port => $self->{'config'}{'GraphitePort'},
);
1;
} or do {
my $error = $@ || 'Zombie error';
AE::log debug => "Graphite server setup failed: $error";
}
}
sub stats {
my $self = shift;
if ( ! exists $self->{'stats'} ) {
$self->{'stats'} = {
map +( $_ => 0 ), qw<
received received_bytes dispatched dispatched _bytes
>
};
return;
}
my $stats = delete $self->{'stats'};
if ( $self->{'_graphite'} ) {
my $time = AE::now;
( run in 1.345 second using v1.01-cache-2.11-cpan-13bb782fe5a )