App-Diskd

 view release on metacpan or  search on metacpan

lib/App/Diskd.pm  view on Meta::CPAN


  # Do some basic type checking on the unpacked object. We expect an
  # array of arrays.
  unless (ref($objref) eq "ARRAY") {
    warn "unpacked disk list is not an ARRAY";
    return undef;

    for (@$objref) {
      unless (ref($_) eq "ARRAY") {
	warn "unpacked disk element is not an ARRAY";
	return undef;
      }
    }
  }

  $self->{update_time}->{$host} = time();

  return $self->{disks_by_ip}->{$host} = $objref;
}

#
# The remaining packages are used simply to achieve a clean separation
# between different POE sessions and to encapsulate related methods
# without having to worry about namespace issues (like ensuring event
# names and handler routines are unique across all sessions). As a
# consequence of having distinct sessions for each program area, when
# we need to have inter-session communication, we need to use POE's
# post method. An alias is also used to identify each of the sessions.
#


##
## The DiskWatcher package sets up a session to periodically run
## blkid, parse the results and store them in our Info object. Since
## blkid can sometimes hang (due to expected devices or media not
## being present), a timer is set and if the command hasn't completed
## within that timeout, the child process is killed and the child
## session garbage collected.
##

package Local::DiskWatcher;

use POE qw(Wheel::Run Filter::Line);

sub new {

  my $class = shift;
  my %args = (
	      program   => '/sbin/blkid',
	      frequency => 10 * 60 * 1, # seconds between runs
	      timeout   => 15,
	      info      => undef,
	      @_
	     );

  die "DiskWatcher needs info => ref argument\n" unless defined($args{info});

  # by using package_states, POE event names will eq package methods
  my @events =
    qw(
	_start start_child child_timeout got_child_stdout got_child_stderr
	child_cleanup
     );
  my $session = POE::Session->create
    (
     package_states => [$class => \@events],
     args => [%args],
    );

  return bless { session => $session }, $class;
}


# Our _start event is solely concerned with extracting args and saving
# them in the heap. It then queues start_child to run the actual child
# process and timeout watcher.
sub _start {

  #print "DiskWatcher: _start args: ". (join ", ", @_). "\n";

  my ($kernel, $heap, %args) = @_[KERNEL, HEAP, ARG0 .. $#_];

  $heap->{timeout} = $args{timeout};
  $heap->{info}    = $args{info};
  $heap->{program} = $args{program};
  $heap->{delay}   = $args{frequency};
  $heap->{child}   = undef;

  $kernel->yield('start_child');
}

# start_child is responsible for running the program with a timeout
sub start_child {
  my ($kernel, $heap) = @_[KERNEL, HEAP];

  # Using a named timer for timeouts. Set it to undef to deactivate.
  $kernel->delay(child_timeout => $heap->{timeout});

  $heap->{child} = POE::Wheel::Run->new(
    Program      => [$heap->{program}],
    StdioFilter  => POE::Filter::Line->new(),
    StderrFilter => POE::Filter::Line->new(),
    StdoutEvent  => "got_child_stdout",
    StderrEvent  => "got_child_stderr",
    CloseEvent   => "child_cleanup",
  );
  $kernel->sig_child($heap->{child}->PID, "child_cleanup");

  # queue up the next run of this event
  $kernel->delay(start_child => $heap->{delay});
}

# if the child process didn't complete within the timeout, we kill it
sub child_timeout {
  my ($heap) = $_[HEAP];
  my $child  = $heap->{child};

  warn "CHILD KILL TIMEOUT";
  warn "diskid failed to send kill signal\n" unless $child->kill();

  # The kernel should eventually receive a SIGCHLD after this
}

# For our purposes, we don't care whether the child exited by closing
# its output or throwing a SIGCHLD. Wrap the deletion of references to
# the child in if(defined()) to avoid warnings.
sub child_cleanup {

  #print "DiskWatcher: child_cleanup args: ". (join ", ", @_). "\n";

  my ($heap,$kernel) = @_[HEAP,KERNEL];

  # Deactivate the kill timer
  $kernel->delay(child_timeout => undef);

  # We need to commit the new list of disks and recycle the child
  # object. Both of these should only be called once, even if this
  # routine is called twice.
  if (defined($heap->{child})) {
    my $info = $heap->{info};
    $info->commit_our_disk_info;

    delete $heap->{child};
  }
}

# Consume a single line of output (thanks to using POE::Filter::Line)
sub got_child_stdout {
  my ($heap,$_) = @_[HEAP,ARG0];

  my ($uuid,$label,$device) = ();

  $uuid   = $1 if /UUID=\"([^\"]+)/;
  $label  = $1 if /LABEL=\"([^\"]+)/;
  $device = $1 if /^(.*?):/;

  return unless defined($device); # we'll silently fail if blkid
                                  # output format is not as expected.
  return unless defined($label) or defined($uuid);

  my $info = $heap->{info};

  # the call to add_our_disk_info just queues the update, then when we
  # clean up this child, we'll instruct info to "commit" the update.
  # This is needed to take care of removing old disks that are no
  # longer attached.
  $info->add_our_disk_info($uuid,$label,$device);

  #  print "STDOUT: $_\n";
}

# Echo any stderr from the child
sub got_child_stderr {
  my ($heap,$stderr,$wheel) = @_[HEAP, ARG0, ARG1];
  my $child = $heap->{child};
  my $pid   = $child->PID;
  warn "blkid $pid> $stderr\n";
}

##
## The MountWatcher package will be responsible for periodically
## running mount to determine which of the known disks are actually
## mounted. It will follow pretty much the same approach as for the
## DiskWatcher package.
##

package Local::MountWatcher;

use POE qw(Wheel::Run);



##
## The MulticastServer package handles connection to a multicast group
## and sending and receving messages across it.
##

package Local::MulticastServer;

use POE;
use IO::Socket::Multicast;

use constant DATAGRAM_MAXLEN   => 1500;
use constant MCAST_PORT        => 32003;
use constant MCAST_GROUP       => '230.1.2.3';
use constant MCAST_DESTINATION => MCAST_GROUP . ':' . MCAST_PORT;

sub new {

  my $class = shift;
  my %opts = (
	      initial_delay => 5,
	      frequency => 10 * 60,
	      info => undef,
	      ttl => 1,		# set >1 to traverse routers
	      @_
	     );

  die "UnixSocketServer::new requires info => \$var option\n"
    unless defined($opts{info});

  my $session =
    POE::Session->create(
	inline_states => {
	       	   _start         => \&peer_start,
	       	   get_datagram   => \&peer_read,
	       	   send_something => \&send_something,
	       	  },
	heap => {
		 initial_delay => $opts{initial_delay},
		 frequency     => $opts{frequency},
		 info          => $opts{info},
		 ttl           => $opts{ttl},
		},
    );

  return bless { session => $session }, $class;



( run in 0.359 second using v1.01-cache-2.11-cpan-39bf76dae61 )