ControlFreak
view release on metacpan or search on metacpan
lib/ControlFreak/Proxy/Process.pm view on Meta::CPAN
package ControlFreak::Proxy::Process;
use strict;
use warnings;
use JSON::XS;
use Try::Tiny;
use POSIX 'SIGTERM';
use IO::Select;
$SIG{PIPE} = 'IGNORE';
=head1 NAME
ControlFreak::Proxy::Process - The Perl implementation of a proxy process.
=head1 DESCRIPTION
This class is used by L<cfk-share-mem-proxy.pl> to implement the controlling
process of proxied services.
=cut
sub new {
my $class = shift;
my %param = @_;
my $proxy = bless { %param }, ref $class || $class;
$proxy->init;
return $proxy;
}
sub log {
my $proxy = shift;
my ($type, $msg) = @_;
my $pipe = $proxy->{log_hdl} or return;
$proxy->write_log("$type:-:$msg");
}
sub write_log {
my $proxy = shift;
my $fh = $proxy->{log_fh};
return unless $fh;
## check buffer size XXX
push @{ $proxy->{log_buffer} }, shift;
$proxy->{write_select}->add($fh);
return;
}
sub init {
my $proxy = shift;
#set_nonblocking($proxy->{$_}) for (qw/command_fh status_fh log_fh/);
## where we buffer the writes until our wout fh are ready
$proxy->{log_buffer} = [];
$proxy->{status_buffer} = [];
## callbacks
$proxy->{readers}{ $proxy->{command_fh} } = sub { $proxy->command_cb(@_) };
$proxy->{writers}{ $proxy->{log_fh} } = $proxy->{log_buffer}
if $proxy->{log_fh};
$proxy->{writers}{ $proxy->{status_fh} } = $proxy->{status_buffer};
my $fh = $proxy->{command_fh};
$proxy->{read_select} = IO::Select->new;
$proxy->{write_select} = IO::Select->new;
$proxy->{read_select}->add($proxy->{command_fh});
$proxy->{write_select}->add($proxy->{log_fh})
if $proxy->{log_fh};
$proxy->{write_select}->add($proxy->{status_fh});
}
sub command_cb {
my $proxy = shift;
my $command = shift;
chomp $command;
$proxy->process_command($_) for (split /\n/, $$command);
}
sub process_command {
my $proxy = shift;
my $command = shift;
my $param = try {
decode_json($command)
} catch {
$proxy->log('err', "parse error in command $command: $_");
return;
};
my $c = $param->{command};
if ($c && $c eq 'start') {
$proxy->start_service($param);
}
elsif ($c && $c eq 'stop') {
$proxy->stop_service($param);
}
else {
$proxy->log('err', "couldn't understand command $command: $_");
}
return;
}
sub xfer_log {
my $proxy = shift;
my ($type, $svc) = @_;
my $watcher_cb = sub {
my $msg = shift;
return unless defined $msg;
chomp $$msg if $$msg;
my $name = $svc->{name} || "";
my @msgs = split /\n/, $$msg;
$proxy->write_log("$type:$name:$_") for @msgs;
return;
};
return $watcher_cb;
}
sub start_service {
( run in 1.237 second using v1.01-cache-2.11-cpan-140bd7fdf52 )