App-EventStreamr

 view release on metacpan or  search on metacpan

bin/station-mgr.pl  view on Meta::CPAN

#!/usr/bin/perl

use strict;
use v5.10;
use FindBin qw($Bin);
use lib "$Bin/../lib";
use Proc::Daemon; # libproc-daemon-perl
use JSON; # libjson-perl
use Config::JSON; # libconfig-json-perl
use HTTP::Tiny; # libhttp-tiny-perl
use Log::Log4perl; # liblog-log4perl-perl
use POSIX;
use File::Path qw(make_path);
use File::Basename;
use experimental 'switch';
use Getopt::Long;
use Data::Dumper;

# PODNAME: station-mgr

# ABSTRACT: station-mgr - Core Station Manager script

our $VERSION = '0.5'; # VERSION


my $DEBUG  = 0;
my $DAEMON = 1;

my $getopts_rc = GetOptions(
    "debug!"        => \$DEBUG,
    "daemon!"       => \$DAEMON,

    "help|?"        => \&print_usage,
);

# setup signal handlers and daemon stuff
# POSIX unmasks the sigprocmask properly
$SIG{CHLD} = 'IGNORE';
my $sigset = POSIX::SigSet->new();
my $update = POSIX::SigAction->new( 'self_update',
                                    $sigset,
                                    &POSIX::SA_NODEFER);
my $exit = POSIX::SigAction->new(   'sig_exit',
                                    $sigset,
                                    &POSIX::SA_NODEFER);
my $pipe = POSIX::SigAction->new(   'sig_pipe',
                                    $sigset,
                                    &POSIX::SA_NODEFER);
my $get = POSIX::SigAction->new(   'get_config',
                                    $sigset,
                                    &POSIX::SA_NODEFER);
my $post = POSIX::SigAction->new(   'post_config',
                                    $sigset,
                                    &POSIX::SA_NODEFER);
# Handle INT/Term
POSIX::sigaction(&POSIX::SIGTERM, $exit);
POSIX::sigaction(&POSIX::SIGINT, $exit);

# SIGHUP updates and execs
POSIX::sigaction(&POSIX::SIGHUP, $update);
POSIX::sigaction(&POSIX::SIGPIPE, $pipe);

# USR1/2 for get/post config
POSIX::sigaction(&POSIX::SIGUSR1, $get);
POSIX::sigaction(&POSIX::SIGUSR2, $post);

our $daemon = Proc::Daemon->new(
  work_dir => "$Bin/../",
);

our $daemons;
if ( $DAEMON ) {
  $daemon->Init();
}

# set umask
umask 0027;

bin/station-mgr.pl  view on Meta::CPAN

if ($self->{config}{run} == 2) {$self->{config}{run} = 1;}

# Logging
unless ( $DEBUG ) {
  $self->{loglevel} = 'INFO, LOG1' ;
} else {
  $self->{loglevel} = 'DEBUG, LOG1, SCREEN' ;
}

unless (-d "$Bin/../logs/") {
 make_path("$Bin/../logs/"); 
}

my $log_conf = qq(
  log4perl.rootLogger              = $self->{loglevel}
  log4perl.appender.SCREEN         = Log::Log4perl::Appender::Screen
  log4perl.appender.SCREEN.stderr  = 0
  log4perl.appender.SCREEN.layout  = Log::Log4perl::Layout::PatternLayout
  log4perl.appender.SCREEN.layout.ConversionPattern = %m %n
  log4perl.appender.LOG1           = Log::Log4perl::Appender::File
  log4perl.appender.LOG1.utf8      = 1
  log4perl.appender.LOG1.filename  = $Bin/../logs/station-mgr.log
  log4perl.appender.LOG1.mode      = append
  log4perl.appender.LOG1.layout    = Log::Log4perl::Layout::PatternLayout
  log4perl.appender.LOG1.layout.ConversionPattern = %d %p %m %n
);

Log::Log4perl::init(\$log_conf);
our $logger = Log::Log4perl->get_logger();
$logger->info("manager starting: pid=$$, station_id=$self->{config}->{macaddress}");

$daemons->{main}{run} = 1;

# HTTP
our $http = HTTP::Tiny->new(timeout => 15);

# Start the API
api();

# Register with controller
$logger->info("Registering with controller $localconfig->{controller}/api/station/$self->{config}{macaddress}");
my $response =  $http->post("$localconfig->{controller}/api/station/$self->{config}{macaddress}");

# Controller responds with created 201, post our config 
if ($response->{status} == 201) {
  $logger->info("Posting config $localconfig->{controller}");
  
  # Status Post Data
  my $json = to_json($self->{config});
  my %headers = (
        'station-mgr' => 1,
        'Content-Type' => 'application/json', 
  );
  my %post_data = ( 
        content => $json, 
        headers => \%headers,
  );

  $response =  $http->post("$localconfig->{controller}/api/station", \%post_data);
  
  $logger->debug({filter => \&Data::Dumper::Dumper,
                  value  => $response}) if ($logger->is_debug());
}


if ($response->{status} == 200 ) {
  my $content = from_json($response->{content});
  $self->{controller}{running} = 1;
  $logger->debug({filter => \&Data::Dumper::Dumper,
                  value  => $content}) if ($logger->is_debug());

  if (defined $content && $content ne 'true') {
    $self->{config} = $content->{settings};
    write_config();
  } else {
    write_config();
  }

  # Run all connected devices - need to get devices to return an array
  if ($self->{config}{devices} eq 'all') {
    $self->{config}{devices} = $self->{devices}{array};
  }

  $self->{config}{manager}{pid} = $$;
  post_config();
} elsif ($response->{status} == 204){
  $self->{controller}{running} = 1;
  $self->{config}{manager}{pid} = $$;
  $logger->warn("Connected but not registered");
  $logger->info("Falling back to local config");
  $logger->debug({filter => \&Data::Dumper::Dumper,
                  value  => $response}) if ($logger->is_debug());

  # Run all connected devices - need to get devices to return an array
  if ($self->{config}{devices} eq 'all') {
    $self->{config}{devices} = $self->{devices}{array};
  }
  post_config();
} else {
  chomp $response->{content};
  $self->{controller}{running} = 0;
  $self->{config}{manager}{pid} = $$;
  $logger->warn("Failed to connect: $response->{content}");
  $logger->info("Falling back to local config");
  $logger->debug({filter => \&Data::Dumper::Dumper,
                  value  => $response}) if ($logger->is_debug());

  # Run all connected devices - need to get devices to return an array
  if ($self->{config}{devices} eq 'all') {
    $self->{config}{devices} = $self->{devices}{array};
  }
  post_config();
}

# Debug logging of data
$logger->debug({filter => \&Data::Dumper::Dumper,
                value  => $self}) if ($logger->is_debug());

# Log when started
if ($self->{config}{run}) {
  $logger->info("Manager started, starting devices");
} else {
  $logger->info("Manager started, configuration set to not start devices.");
}

# Post start clearing of data
$self->{device_control}{record}{recordpath} = 0;

# Main Daemon Loop
while ($daemons->{main}{run}) {
  # If we're restarting, we should trigger check for dvswitch
  if ($self->{config}{run} == 2) {
    $logger->info("Restart Trigged");
    $self->{dvswitch}{check} = 1;
  }

  # Process the internal commands
  api();
  devmon();

  # Process the roles
  foreach my $role (@{$self->{config}->{roles}}) {
    given ( $role ) {
      when ("mixer")    { mixer();  }
      when ("ingest")   { ingest(); }
      when ("stream")   { stream(); }
      when ("record")   { record(); }
    }
  }

  # 2 is the restart all processes trigger
  # $daemon->Kill_Daemon does a Kill -9, so if we get here they procs should be dead. 
  if ($self->{config}{run} == 2) {
    $self->{config}{run} = 1;
  }

  # Until found check for dvswitch - continuously hitting dvswitch with an unknown client caused high cpu load
  unless ( $self->{dvswitch}{running} && ! $self->{dvswitch}{check} ) {
    if ( $utils->port($self->{config}->{mixer}{host},$self->{config}->{mixer}{port}) ) {
      $logger->info("DVswitch found Running");
      $self->{dvswitch}{running} = 1;
      $self->{dvswitch}{check} = 0; # We can set this to 1 and it will check dvswitch again.
    }
  }

  # Uncomment to enable heartbeat
  ## Post a hearbeat to the controller/mixer
  #if ((time % 10) == 0) {
  #  $logger->debug("Heartbeat!") if ($logger->is_debug());
  #  $self->{heartbeat} = time;
  #  post_config();
  #}
  
  # Update date if it's changed - I wonder if there is a better way to trigger this? Cron (requires more OS config)?
  unless ( $self->{date} == strftime "%Y%m%d", localtime) {
    $self->{date} = strftime "%Y%m%d", localtime;

bin/station-mgr.pl  view on Meta::CPAN

  sleep 1;
}



# ---- SUBROUTINES ----------------------------------------------------------

sub sig_exit {
      $logger->info("manager exiting...");
      $daemons->{main}{run} = 0;
      $daemon->Kill_Daemon($self->{device_control}{api}{pid}); 
      $daemon->Kill_Daemon($self->{device_control}{devmon}{pid}); 
      $daemon->Kill_Daemon($self->{device_control}{sync}{pid}); 
}

sub sig_pipe {
    $logger->debug( "caught SIGPIPE" ) if ( $logger->is_debug() );
}

sub self_update {
  $logger->info("Performing self update");
  $logger->debug("Update host: $Bin/../../baseimage/update-host.sh") if ($logger->is_debug());
  system("$Bin/../../baseimage/update-host.sh");
  sig_exit();
  my $options;
  $options = "--debug" if $DEBUG;
  $options = "$options --no-daemon" unless $DAEMON;
  my $script = File::Basename::basename($0);
  $logger->debug("Restart Manger: $Bin/$script $options") if ($logger->is_debug());
  exec("$Bin/$script $options") or $logger->logdie("Couldn't restart: $!");
}

sub print_usage {
  say "
Usage: station-mgr.pl [OPTIONS]

Options:
  --no-deaemon  disable daemon

  --debug       turn on debugging
  --help        this help text
";
  exit 0;
}

# Config triggers
sub post_config {
  # Refresh devices 
  $self->{devices} = $devices->all();

  # Post to manager api
  my $json = to_json($self);
  my %post_data = ( 
        content => $json, 
        'content-type' => 'application/json', 
        'content-length' => length($json),
  );

  my $post = $http->post("http://127.0.0.1:3000/internal/settings", \%post_data);
  $logger->info("Config Posted to API");
  $logger->debug({filter => \&Data::Dumper::Dumper,
                value  => $post}) if ($logger->is_debug());

  # Status information
  my $status;
  $status->{status} = $self->{status};
  $status->{macaddress} = $self->{config}{macaddress};
  $status->{nickname} = $self->{config}{nickname};
  # Uncomment for heartbeat
  #$status->{heartbeat} = $self->{heartbeat};

  # Post Headers
  my %headers = (
        'station-mgr' => 1,
        'Content-Type' => 'application/json',
  );

  $logger->debug({filter => \&Data::Dumper::Dumper,
                value  => $status}) if ($logger->is_debug());

  # Status Post Data
  $json = to_json($status);
  %post_data = ( 
        content => $json, 
        headers => \%headers, 
  );

  # Post Status to Mixer
  $post = $http->post("http://$self->{config}{mixer}{host}:3000/status/$self->{config}{macaddress}", \%post_data);
  $logger->info("Status Posted to Mixer API -> http://$self->{config}{mixer}{host}:3000/status/$self->{config}{macaddress}");
  $logger->debug({filter => \&Data::Dumper::Dumper,
                value  => $post}) if ($logger->is_debug());

  # Post Status + devices to Controller
  if ($self->{controller}{running}) {
    # Build post object
    my $data;
    $data->[0]{key} = "status";
    $data->[0]{value} = $status->{status};
    $data->[1]{key} = "devices";
    $data->[1]{value} = $self->{devices}{all};
    delete $data->[1]{value}{all};

    # Post data
    $json = to_json($data);
    # some bug and it's late this could cause hideous issues if 
    # a device id has a / in it, but this should be unlikely
    $json =~ s{/|\.}{}g;

    %post_data = ( 
          content => $json, 
          headers => \%headers, 
    );
    $logger->debug({filter => \&Data::Dumper::Dumper,
                  value  => $json}) if ($logger->is_debug());
    $post = $http->post("$localconfig->{controller}/api/stations/$self->{config}{macaddress}/partial", \%post_data);
    $logger->info("Status Posted to Controller API - $localconfig->{controller}/api/stations/$self->{config}{macaddress}/partial");
    $logger->debug({filter => \&Data::Dumper::Dumper,
                  value  => $post}) if ($logger->is_debug());
  }
  
  return;
}

sub get_config {
  my $get = $http->get("http://127.0.0.1:3000/internal/settings");
  my $content = from_json($get->{content});
  $self->{config} = $content->{config};
  $logger->debug({filter => \&Data::Dumper::Dumper,
                value  => $get}) if ($logger->is_debug());
  $logger->debug({filter => \&Data::Dumper::Dumper,
                value  => $self}) if ($logger->is_debug());
  $logger->info("Config recieved from API");
  write_config();
  return;
}

sub write_config {
  $stationconfig->{config} = $self->{config};
  $stationconfig->write;
  $logger->info("Config written to disk");
  return;
}

## api 
sub api {
  my $device;
  unless ($logger->is_debug()) {
    $self->{device_commands}{api}{command} = "/usr/bin/plackup -s Twiggy -p 3000 $Bin/station-api.pl --daemon --environment production";
  } else{
    $self->{device_commands}{api}{command} = "/usr/bin/plackup -s Twiggy -p 3000 $Bin/station-api.pl";
  }
  $device->{role} = "api";
  $device->{id} = "api";
  $device->{type} = "internal";
  run_stop($device);
  return;
}

## devmon 
sub devmon {
  my $device;
  $self->{device_commands}{devmon}{command} = "$Bin/station-devmon.pl";
  $device->{role} = "devmon";
  $device->{id} = "devmon";
  $device->{type} = "internal";
  run_stop($device);
  return;
}

## Ingest
sub ingest {
  if ($self->{dvswitch}{running} == 1) {
    foreach my $device (@{$self->{config}{devices}}) {
      # Set Role
      $device->{role} = "ingest";

      if ($device->{type} eq "dv") {
        # Check dv exists
        if (-e $self->{devices}{dv}{$device->{id}}{path}) {
          run_stop($device);
        # If we're restarting we should refresh the devices and try again
        } elsif ($self->{config}{device_control}{$device->{id}}{run} == 1) {
          $logger->warn("$device->{id} has been disconnected");
          # It's not ideal, but dvgrab hangs if no camera exist. devmon will restart it when it's plugged in again.
          $self->{config}{device_control}{$device->{id}}{run} = 0;
          run_stop($device);

          # Set status
          $self->{device_control}{$device->{id}}{timestamp} = time;
          $self->{status}{$device->{id}}{running} = 0;

bin/station-mgr.pl  view on Meta::CPAN

    $self->{config}{device_control}{$device->{id}}{run} = 1;

    # Get the running state + pid if it exists
    my $state;
    if ($self->{device_control}{$device->{id}}{pid}) {
      $state = $utils->get_pid_state($self->{device_control}{$device->{id}}{pid}); 
    } else {
      $state = $utils->get_pid_command($device->{id},$self->{device_commands}{$device->{id}}{command},$device->{type}); 
    }

    unless ($state->{running}) {

      # notice process is down, record timestamp when it went down
      if ( ! defined $self->{device_control}{$device->{id}}{timestamp} ) {
        $self->{device_control}{$device->{id}}{timestamp} = $time;
        $self->{device_control}{$device->{id}}{runcount} = 0;

        $self->{status}{$device->{id}}{running} = 0;
        $self->{status}{$device->{id}}{status} = "starting";
        $self->{status}{$device->{id}}{state} = "soft";
        $self->{status}{$device->{id}}{type} = $device->{type};
        $self->{status}{$device->{id}}{timestamp} = $self->{device_control}{$device->{id}}{timestamp};

        $logger->debug("Timestamp and Run Count initialised for $device->{id}");
      }

      # if above restart threshold then slow down restarts to every 10 seconds
      if ( $self->{device_control}{$device->{id}}{runcount} > 5 && ($time % 10) != 0 ) {
        return;
      }

      # log dvswitch start or device connecting 
      if ($device->{type} eq "mixer") {
        $logger->info("Starting DVswitch");
      } elsif ($device->{type} eq "internal") {
        $logger->info("Starting $device->{id}");
      } else {
        $logger->info("Connect $device->{id} to DVswitch");
      }
      
      # build daemon option
      my %proc_opts;
      unless ($logger->is_debug()) {
        %proc_opts = (
           exec_command => $self->{device_commands}{$device->{id}}{command},
        );
      } else {
        %proc_opts = (
           child_STDOUT => "/tmp/$device->{id}-STDOUT.log",
           child_STDERR => "/tmp/$device->{id}-STDERR.log", 
           exec_command => $self->{device_commands}{$device->{id}}{command},
        );
      }       
      # run process
      my $proc = $daemon->Init( \%proc_opts );

      # give the process some time to settle
      sleep 1;
      # Set the running state + pid
      $state = $utils->get_pid_command($device->{id},$self->{device_commands}{$device->{id}}{command},$device->{type}); 
      $logger->debug({filter => \&Data::Dumper::Dumper,
                      value  => $state}) if ($logger->is_debug());
      
      # Increase runcount
      $self->{device_control}{$device->{id}}{runcount}++;
      my $age = $time - $self->{device_control}{$device->{id}}{timestamp};
      if ($age > 1 && ! $state->{running}) {
        # Log!
        $logger->warn("$device->{id} failed to start (count=$self->{device_control}{$device->{id}}{runcount}, died=$age secs ago)");

        # Refresh devices
        $self->{devices} = $devices->all();
        
        # Force command rebuild
        $self->{device_commands}{$device->{id}}{command} = undef;

        # post to the api/controller
        post_config();
      }
    }
    
    # If state has changed set it and post the config
    if (! defined $self->{device_control}{$device->{id}}{running} || 
      ($self->{device_control}{$device->{id}}{running} != $state->{running} ||
      $self->{device_control}{$device->{id}}{pid} != $state->{pid})) {
      # Log
      $logger->debug("$device->{id} has changed state");

      # Set state
      $self->{device_control}{$device->{id}}{pid} = $state->{pid};
      $self->{device_control}{$device->{id}}{running} = $state->{running};
      
      # Status Defaults
      $self->{status}{$device->{id}}{type} = $device->{type};
      $self->{status}{$device->{id}}{timestamp} = $time;

      # Status flag
      if ($state->{running}) {
        $self->{status}{$device->{id}}{running} = 1;
        $self->{status}{$device->{id}}{status} = "started";
        $self->{status}{$device->{id}}{state} = "hard";
        $self->{device_control}{$device->{id}}{timestamp} = $time;
      } else {
        $self->{status}{$device->{id}}{running} = 0;
        $self->{status}{$device->{id}}{status} = "stopped";
        $self->{status}{$device->{id}}{state} = "hard";
      }
      post_config();
    }

  } elsif (defined $self->{device_control}{$device->{id}}{pid}) {
    # Kill The Child
    if ($daemon->Kill_Daemon($self->{device_control}{$device->{id}}{pid})) { 
      # Log
      $logger->info("Stop $device->{id}");
      
      # Set device state
      $self->{device_control}{$device->{id}}{running} = 0;
      $self->{device_control}{$device->{id}}{pid} = undef; 
      
      # Set device status



( run in 1.030 second using v1.01-cache-2.11-cpan-39bf76dae61 )