ControlFreak

 view release on metacpan or  search on metacpan

lib/ControlFreak/Proxy/Process.pm  view on Meta::CPAN

package ControlFreak::Proxy::Process;

use strict;
use warnings;

use JSON::XS;
use Try::Tiny;
use POSIX 'SIGTERM';
use IO::Select;

$SIG{PIPE} = 'IGNORE';

=head1 NAME

ControlFreak::Proxy::Process - The Perl implementation of a proxy process.

=head1 DESCRIPTION

This class is used by L<cfk-share-mem-proxy.pl> to implement the controlling
process of proxied services.

=cut

sub new {
    my $class = shift;
    my %param = @_;
    my $proxy = bless { %param }, ref $class || $class;
    $proxy->init;
    return $proxy;
}

sub log {
    my $proxy = shift;
    my ($type, $msg) = @_;
    my $pipe = $proxy->{log_hdl} or return;
    $proxy->write_log("$type:-:$msg");
}

sub write_log {
    my $proxy = shift;
    my $fh = $proxy->{log_fh};
    return unless $fh;
    ## check buffer size XXX
    push @{ $proxy->{log_buffer} }, shift;
    $proxy->{write_select}->add($fh);
    return;
}

sub init {
    my $proxy = shift;

    #set_nonblocking($proxy->{$_}) for (qw/command_fh status_fh log_fh/);

    ## where we buffer the writes until our wout fh are ready
    $proxy->{log_buffer}    = [];
    $proxy->{status_buffer} = [];

    ## callbacks
    $proxy->{readers}{ $proxy->{command_fh} } = sub { $proxy->command_cb(@_) };
    $proxy->{writers}{ $proxy->{log_fh}     } = $proxy->{log_buffer}
        if $proxy->{log_fh};
    $proxy->{writers}{ $proxy->{status_fh}  } = $proxy->{status_buffer};

    my $fh = $proxy->{command_fh};
    $proxy->{read_select} = IO::Select->new;
    $proxy->{write_select} = IO::Select->new;
    $proxy->{read_select}->add($proxy->{command_fh});
    $proxy->{write_select}->add($proxy->{log_fh})
        if $proxy->{log_fh};
    $proxy->{write_select}->add($proxy->{status_fh});
}

sub command_cb {
    my $proxy = shift;
    my $command = shift;
    chomp $command;
    $proxy->process_command($_) for (split /\n/, $$command);
}

sub process_command {
    my $proxy = shift;
    my $command = shift;

    my $param = try {
        decode_json($command)
    } catch {
        $proxy->log('err', "parse error in command $command: $_");
        return;
    };
    my $c = $param->{command};
    if ($c && $c eq 'start') {
        $proxy->start_service($param);
    }
    elsif ($c && $c eq 'stop') {
        $proxy->stop_service($param);
    }
    else {
        $proxy->log('err', "couldn't understand command $command: $_");
    }
    return;
}

sub xfer_log {
    my $proxy = shift;
    my ($type, $svc) = @_;
    my $watcher_cb = sub {
        my $msg = shift;
        return unless defined $msg;
        chomp $$msg if $$msg;
        my $name = $svc->{name} || "";
        my @msgs = split /\n/, $$msg;
        $proxy->write_log("$type:$name:$_") for @msgs;
        return;
    };
    return $watcher_cb;
}

sub start_service {



( run in 1.237 second using v1.01-cache-2.11-cpan-140bd7fdf52 )