App-EventStreamr
view release on metacpan or search on metacpan
bin/station-mgr.pl view on Meta::CPAN
#!/usr/bin/perl
use strict;
use v5.10;
use FindBin qw($Bin);
use lib "$Bin/../lib";
use Proc::Daemon; # libproc-daemon-perl
use JSON; # libjson-perl
use Config::JSON; # libconfig-json-perl
use HTTP::Tiny; # libhttp-tiny-perl
use Log::Log4perl; # liblog-log4perl-perl
use POSIX;
use File::Path qw(make_path);
use File::Basename;
use experimental 'switch';
use Getopt::Long;
use Data::Dumper;
# PODNAME: station-mgr
# ABSTRACT: station-mgr - Core Station Manager script
our $VERSION = '0.5'; # VERSION
my $DEBUG = 0;
my $DAEMON = 1;
my $getopts_rc = GetOptions(
"debug!" => \$DEBUG,
"daemon!" => \$DAEMON,
"help|?" => \&print_usage,
);
# setup signal handlers and daemon stuff
# POSIX unmasks the sigprocmask properly
$SIG{CHLD} = 'IGNORE';
my $sigset = POSIX::SigSet->new();
my $update = POSIX::SigAction->new( 'self_update',
$sigset,
&POSIX::SA_NODEFER);
my $exit = POSIX::SigAction->new( 'sig_exit',
$sigset,
&POSIX::SA_NODEFER);
my $pipe = POSIX::SigAction->new( 'sig_pipe',
$sigset,
&POSIX::SA_NODEFER);
my $get = POSIX::SigAction->new( 'get_config',
$sigset,
&POSIX::SA_NODEFER);
my $post = POSIX::SigAction->new( 'post_config',
$sigset,
&POSIX::SA_NODEFER);
# Handle INT/Term
POSIX::sigaction(&POSIX::SIGTERM, $exit);
POSIX::sigaction(&POSIX::SIGINT, $exit);
# SIGHUP updates and execs
POSIX::sigaction(&POSIX::SIGHUP, $update);
POSIX::sigaction(&POSIX::SIGPIPE, $pipe);
# USR1/2 for get/post config
POSIX::sigaction(&POSIX::SIGUSR1, $get);
POSIX::sigaction(&POSIX::SIGUSR2, $post);
our $daemon = Proc::Daemon->new(
work_dir => "$Bin/../",
);
our $daemons;
if ( $DAEMON ) {
$daemon->Init();
}
# set umask
umask 0027;
bin/station-mgr.pl view on Meta::CPAN
if ($self->{config}{run} == 2) {$self->{config}{run} = 1;}
# Logging
unless ( $DEBUG ) {
$self->{loglevel} = 'INFO, LOG1' ;
} else {
$self->{loglevel} = 'DEBUG, LOG1, SCREEN' ;
}
unless (-d "$Bin/../logs/") {
make_path("$Bin/../logs/");
}
my $log_conf = qq(
log4perl.rootLogger = $self->{loglevel}
log4perl.appender.SCREEN = Log::Log4perl::Appender::Screen
log4perl.appender.SCREEN.stderr = 0
log4perl.appender.SCREEN.layout = Log::Log4perl::Layout::PatternLayout
log4perl.appender.SCREEN.layout.ConversionPattern = %m %n
log4perl.appender.LOG1 = Log::Log4perl::Appender::File
log4perl.appender.LOG1.utf8 = 1
log4perl.appender.LOG1.filename = $Bin/../logs/station-mgr.log
log4perl.appender.LOG1.mode = append
log4perl.appender.LOG1.layout = Log::Log4perl::Layout::PatternLayout
log4perl.appender.LOG1.layout.ConversionPattern = %d %p %m %n
);
Log::Log4perl::init(\$log_conf);
our $logger = Log::Log4perl->get_logger();
$logger->info("manager starting: pid=$$, station_id=$self->{config}->{macaddress}");
$daemons->{main}{run} = 1;
# HTTP
our $http = HTTP::Tiny->new(timeout => 15);
# Start the API
api();
# Register with controller
$logger->info("Registering with controller $localconfig->{controller}/api/station/$self->{config}{macaddress}");
my $response = $http->post("$localconfig->{controller}/api/station/$self->{config}{macaddress}");
# Controller responds with created 201, post our config
if ($response->{status} == 201) {
$logger->info("Posting config $localconfig->{controller}");
# Status Post Data
my $json = to_json($self->{config});
my %headers = (
'station-mgr' => 1,
'Content-Type' => 'application/json',
);
my %post_data = (
content => $json,
headers => \%headers,
);
$response = $http->post("$localconfig->{controller}/api/station", \%post_data);
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $response}) if ($logger->is_debug());
}
if ($response->{status} == 200 ) {
my $content = from_json($response->{content});
$self->{controller}{running} = 1;
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $content}) if ($logger->is_debug());
if (defined $content && $content ne 'true') {
$self->{config} = $content->{settings};
write_config();
} else {
write_config();
}
# Run all connected devices - need to get devices to return an array
if ($self->{config}{devices} eq 'all') {
$self->{config}{devices} = $self->{devices}{array};
}
$self->{config}{manager}{pid} = $$;
post_config();
} elsif ($response->{status} == 204){
$self->{controller}{running} = 1;
$self->{config}{manager}{pid} = $$;
$logger->warn("Connected but not registered");
$logger->info("Falling back to local config");
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $response}) if ($logger->is_debug());
# Run all connected devices - need to get devices to return an array
if ($self->{config}{devices} eq 'all') {
$self->{config}{devices} = $self->{devices}{array};
}
post_config();
} else {
chomp $response->{content};
$self->{controller}{running} = 0;
$self->{config}{manager}{pid} = $$;
$logger->warn("Failed to connect: $response->{content}");
$logger->info("Falling back to local config");
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $response}) if ($logger->is_debug());
# Run all connected devices - need to get devices to return an array
if ($self->{config}{devices} eq 'all') {
$self->{config}{devices} = $self->{devices}{array};
}
post_config();
}
# Debug logging of data
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $self}) if ($logger->is_debug());
# Log when started
if ($self->{config}{run}) {
$logger->info("Manager started, starting devices");
} else {
$logger->info("Manager started, configuration set to not start devices.");
}
# Post start clearing of data
$self->{device_control}{record}{recordpath} = 0;
# Main Daemon Loop
while ($daemons->{main}{run}) {
# If we're restarting, we should trigger check for dvswitch
if ($self->{config}{run} == 2) {
$logger->info("Restart Trigged");
$self->{dvswitch}{check} = 1;
}
# Process the internal commands
api();
devmon();
# Process the roles
foreach my $role (@{$self->{config}->{roles}}) {
given ( $role ) {
when ("mixer") { mixer(); }
when ("ingest") { ingest(); }
when ("stream") { stream(); }
when ("record") { record(); }
}
}
# 2 is the restart all processes trigger
# $daemon->Kill_Daemon does a Kill -9, so if we get here they procs should be dead.
if ($self->{config}{run} == 2) {
$self->{config}{run} = 1;
}
# Until found check for dvswitch - continuously hitting dvswitch with an unknown client caused high cpu load
unless ( $self->{dvswitch}{running} && ! $self->{dvswitch}{check} ) {
if ( $utils->port($self->{config}->{mixer}{host},$self->{config}->{mixer}{port}) ) {
$logger->info("DVswitch found Running");
$self->{dvswitch}{running} = 1;
$self->{dvswitch}{check} = 0; # We can set this to 1 and it will check dvswitch again.
}
}
# Uncomment to enable heartbeat
## Post a hearbeat to the controller/mixer
#if ((time % 10) == 0) {
# $logger->debug("Heartbeat!") if ($logger->is_debug());
# $self->{heartbeat} = time;
# post_config();
#}
# Update date if it's changed - I wonder if there is a better way to trigger this? Cron (requires more OS config)?
unless ( $self->{date} == strftime "%Y%m%d", localtime) {
$self->{date} = strftime "%Y%m%d", localtime;
bin/station-mgr.pl view on Meta::CPAN
sleep 1;
}
# ---- SUBROUTINES ----------------------------------------------------------
sub sig_exit {
$logger->info("manager exiting...");
$daemons->{main}{run} = 0;
$daemon->Kill_Daemon($self->{device_control}{api}{pid});
$daemon->Kill_Daemon($self->{device_control}{devmon}{pid});
$daemon->Kill_Daemon($self->{device_control}{sync}{pid});
}
sub sig_pipe {
$logger->debug( "caught SIGPIPE" ) if ( $logger->is_debug() );
}
sub self_update {
$logger->info("Performing self update");
$logger->debug("Update host: $Bin/../../baseimage/update-host.sh") if ($logger->is_debug());
system("$Bin/../../baseimage/update-host.sh");
sig_exit();
my $options;
$options = "--debug" if $DEBUG;
$options = "$options --no-daemon" unless $DAEMON;
my $script = File::Basename::basename($0);
$logger->debug("Restart Manger: $Bin/$script $options") if ($logger->is_debug());
exec("$Bin/$script $options") or $logger->logdie("Couldn't restart: $!");
}
sub print_usage {
say "
Usage: station-mgr.pl [OPTIONS]
Options:
--no-deaemon disable daemon
--debug turn on debugging
--help this help text
";
exit 0;
}
# Config triggers
sub post_config {
# Refresh devices
$self->{devices} = $devices->all();
# Post to manager api
my $json = to_json($self);
my %post_data = (
content => $json,
'content-type' => 'application/json',
'content-length' => length($json),
);
my $post = $http->post("http://127.0.0.1:3000/internal/settings", \%post_data);
$logger->info("Config Posted to API");
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $post}) if ($logger->is_debug());
# Status information
my $status;
$status->{status} = $self->{status};
$status->{macaddress} = $self->{config}{macaddress};
$status->{nickname} = $self->{config}{nickname};
# Uncomment for heartbeat
#$status->{heartbeat} = $self->{heartbeat};
# Post Headers
my %headers = (
'station-mgr' => 1,
'Content-Type' => 'application/json',
);
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $status}) if ($logger->is_debug());
# Status Post Data
$json = to_json($status);
%post_data = (
content => $json,
headers => \%headers,
);
# Post Status to Mixer
$post = $http->post("http://$self->{config}{mixer}{host}:3000/status/$self->{config}{macaddress}", \%post_data);
$logger->info("Status Posted to Mixer API -> http://$self->{config}{mixer}{host}:3000/status/$self->{config}{macaddress}");
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $post}) if ($logger->is_debug());
# Post Status + devices to Controller
if ($self->{controller}{running}) {
# Build post object
my $data;
$data->[0]{key} = "status";
$data->[0]{value} = $status->{status};
$data->[1]{key} = "devices";
$data->[1]{value} = $self->{devices}{all};
delete $data->[1]{value}{all};
# Post data
$json = to_json($data);
# some bug and it's late this could cause hideous issues if
# a device id has a / in it, but this should be unlikely
$json =~ s{/|\.}{}g;
%post_data = (
content => $json,
headers => \%headers,
);
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $json}) if ($logger->is_debug());
$post = $http->post("$localconfig->{controller}/api/stations/$self->{config}{macaddress}/partial", \%post_data);
$logger->info("Status Posted to Controller API - $localconfig->{controller}/api/stations/$self->{config}{macaddress}/partial");
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $post}) if ($logger->is_debug());
}
return;
}
sub get_config {
my $get = $http->get("http://127.0.0.1:3000/internal/settings");
my $content = from_json($get->{content});
$self->{config} = $content->{config};
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $get}) if ($logger->is_debug());
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $self}) if ($logger->is_debug());
$logger->info("Config recieved from API");
write_config();
return;
}
sub write_config {
$stationconfig->{config} = $self->{config};
$stationconfig->write;
$logger->info("Config written to disk");
return;
}
## api
sub api {
my $device;
unless ($logger->is_debug()) {
$self->{device_commands}{api}{command} = "/usr/bin/plackup -s Twiggy -p 3000 $Bin/station-api.pl --daemon --environment production";
} else{
$self->{device_commands}{api}{command} = "/usr/bin/plackup -s Twiggy -p 3000 $Bin/station-api.pl";
}
$device->{role} = "api";
$device->{id} = "api";
$device->{type} = "internal";
run_stop($device);
return;
}
## devmon
sub devmon {
my $device;
$self->{device_commands}{devmon}{command} = "$Bin/station-devmon.pl";
$device->{role} = "devmon";
$device->{id} = "devmon";
$device->{type} = "internal";
run_stop($device);
return;
}
## Ingest
sub ingest {
if ($self->{dvswitch}{running} == 1) {
foreach my $device (@{$self->{config}{devices}}) {
# Set Role
$device->{role} = "ingest";
if ($device->{type} eq "dv") {
# Check dv exists
if (-e $self->{devices}{dv}{$device->{id}}{path}) {
run_stop($device);
# If we're restarting we should refresh the devices and try again
} elsif ($self->{config}{device_control}{$device->{id}}{run} == 1) {
$logger->warn("$device->{id} has been disconnected");
# It's not ideal, but dvgrab hangs if no camera exist. devmon will restart it when it's plugged in again.
$self->{config}{device_control}{$device->{id}}{run} = 0;
run_stop($device);
# Set status
$self->{device_control}{$device->{id}}{timestamp} = time;
$self->{status}{$device->{id}}{running} = 0;
bin/station-mgr.pl view on Meta::CPAN
$self->{config}{device_control}{$device->{id}}{run} = 1;
# Get the running state + pid if it exists
my $state;
if ($self->{device_control}{$device->{id}}{pid}) {
$state = $utils->get_pid_state($self->{device_control}{$device->{id}}{pid});
} else {
$state = $utils->get_pid_command($device->{id},$self->{device_commands}{$device->{id}}{command},$device->{type});
}
unless ($state->{running}) {
# notice process is down, record timestamp when it went down
if ( ! defined $self->{device_control}{$device->{id}}{timestamp} ) {
$self->{device_control}{$device->{id}}{timestamp} = $time;
$self->{device_control}{$device->{id}}{runcount} = 0;
$self->{status}{$device->{id}}{running} = 0;
$self->{status}{$device->{id}}{status} = "starting";
$self->{status}{$device->{id}}{state} = "soft";
$self->{status}{$device->{id}}{type} = $device->{type};
$self->{status}{$device->{id}}{timestamp} = $self->{device_control}{$device->{id}}{timestamp};
$logger->debug("Timestamp and Run Count initialised for $device->{id}");
}
# if above restart threshold then slow down restarts to every 10 seconds
if ( $self->{device_control}{$device->{id}}{runcount} > 5 && ($time % 10) != 0 ) {
return;
}
# log dvswitch start or device connecting
if ($device->{type} eq "mixer") {
$logger->info("Starting DVswitch");
} elsif ($device->{type} eq "internal") {
$logger->info("Starting $device->{id}");
} else {
$logger->info("Connect $device->{id} to DVswitch");
}
# build daemon option
my %proc_opts;
unless ($logger->is_debug()) {
%proc_opts = (
exec_command => $self->{device_commands}{$device->{id}}{command},
);
} else {
%proc_opts = (
child_STDOUT => "/tmp/$device->{id}-STDOUT.log",
child_STDERR => "/tmp/$device->{id}-STDERR.log",
exec_command => $self->{device_commands}{$device->{id}}{command},
);
}
# run process
my $proc = $daemon->Init( \%proc_opts );
# give the process some time to settle
sleep 1;
# Set the running state + pid
$state = $utils->get_pid_command($device->{id},$self->{device_commands}{$device->{id}}{command},$device->{type});
$logger->debug({filter => \&Data::Dumper::Dumper,
value => $state}) if ($logger->is_debug());
# Increase runcount
$self->{device_control}{$device->{id}}{runcount}++;
my $age = $time - $self->{device_control}{$device->{id}}{timestamp};
if ($age > 1 && ! $state->{running}) {
# Log!
$logger->warn("$device->{id} failed to start (count=$self->{device_control}{$device->{id}}{runcount}, died=$age secs ago)");
# Refresh devices
$self->{devices} = $devices->all();
# Force command rebuild
$self->{device_commands}{$device->{id}}{command} = undef;
# post to the api/controller
post_config();
}
}
# If state has changed set it and post the config
if (! defined $self->{device_control}{$device->{id}}{running} ||
($self->{device_control}{$device->{id}}{running} != $state->{running} ||
$self->{device_control}{$device->{id}}{pid} != $state->{pid})) {
# Log
$logger->debug("$device->{id} has changed state");
# Set state
$self->{device_control}{$device->{id}}{pid} = $state->{pid};
$self->{device_control}{$device->{id}}{running} = $state->{running};
# Status Defaults
$self->{status}{$device->{id}}{type} = $device->{type};
$self->{status}{$device->{id}}{timestamp} = $time;
# Status flag
if ($state->{running}) {
$self->{status}{$device->{id}}{running} = 1;
$self->{status}{$device->{id}}{status} = "started";
$self->{status}{$device->{id}}{state} = "hard";
$self->{device_control}{$device->{id}}{timestamp} = $time;
} else {
$self->{status}{$device->{id}}{running} = 0;
$self->{status}{$device->{id}}{status} = "stopped";
$self->{status}{$device->{id}}{state} = "hard";
}
post_config();
}
} elsif (defined $self->{device_control}{$device->{id}}{pid}) {
# Kill The Child
if ($daemon->Kill_Daemon($self->{device_control}{$device->{id}}{pid})) {
# Log
$logger->info("Stop $device->{id}");
# Set device state
$self->{device_control}{$device->{id}}{running} = 0;
$self->{device_control}{$device->{id}}{pid} = undef;
# Set device status
( run in 1.030 second using v1.01-cache-2.11-cpan-39bf76dae61 )