App-Diskd
view release on metacpan or search on metacpan
lib/App/Diskd.pm view on Meta::CPAN
# Do some basic type checking on the unpacked object. We expect an
# array of arrays.
unless (ref($objref) eq "ARRAY") {
warn "unpacked disk list is not an ARRAY";
return undef;
for (@$objref) {
unless (ref($_) eq "ARRAY") {
warn "unpacked disk element is not an ARRAY";
return undef;
}
}
}
$self->{update_time}->{$host} = time();
return $self->{disks_by_ip}->{$host} = $objref;
}
#
# The remaining packages are used simply to achieve a clean separation
# between different POE sessions and to encapsulate related methods
# without having to worry about namespace issues (like ensuring event
# names and handler routines are unique across all sessions). As a
# consequence of having distinct sessions for each program area, when
# we need to have inter-session communication, we need to use POE's
# post method. An alias is also used to identify each of the sessions.
#
##
## The DiskWatcher package sets up a session to periodically run
## blkid, parse the results and store them in our Info object. Since
## blkid can sometimes hang (due to expected devices or media not
## being present), a timer is set and if the command hasn't completed
## within that timeout, the child process is killed and the child
## session garbage collected.
##
package Local::DiskWatcher;
use POE qw(Wheel::Run Filter::Line);
sub new {
my $class = shift;
my %args = (
program => '/sbin/blkid',
frequency => 10 * 60 * 1, # seconds between runs
timeout => 15,
info => undef,
@_
);
die "DiskWatcher needs info => ref argument\n" unless defined($args{info});
# by using package_states, POE event names will eq package methods
my @events =
qw(
_start start_child child_timeout got_child_stdout got_child_stderr
child_cleanup
);
my $session = POE::Session->create
(
package_states => [$class => \@events],
args => [%args],
);
return bless { session => $session }, $class;
}
# Our _start event is solely concerned with extracting args and saving
# them in the heap. It then queues start_child to run the actual child
# process and timeout watcher.
sub _start {
#print "DiskWatcher: _start args: ". (join ", ", @_). "\n";
my ($kernel, $heap, %args) = @_[KERNEL, HEAP, ARG0 .. $#_];
$heap->{timeout} = $args{timeout};
$heap->{info} = $args{info};
$heap->{program} = $args{program};
$heap->{delay} = $args{frequency};
$heap->{child} = undef;
$kernel->yield('start_child');
}
# start_child is responsible for running the program with a timeout
sub start_child {
my ($kernel, $heap) = @_[KERNEL, HEAP];
# Using a named timer for timeouts. Set it to undef to deactivate.
$kernel->delay(child_timeout => $heap->{timeout});
$heap->{child} = POE::Wheel::Run->new(
Program => [$heap->{program}],
StdioFilter => POE::Filter::Line->new(),
StderrFilter => POE::Filter::Line->new(),
StdoutEvent => "got_child_stdout",
StderrEvent => "got_child_stderr",
CloseEvent => "child_cleanup",
);
$kernel->sig_child($heap->{child}->PID, "child_cleanup");
# queue up the next run of this event
$kernel->delay(start_child => $heap->{delay});
}
# if the child process didn't complete within the timeout, we kill it
sub child_timeout {
my ($heap) = $_[HEAP];
my $child = $heap->{child};
warn "CHILD KILL TIMEOUT";
warn "diskid failed to send kill signal\n" unless $child->kill();
# The kernel should eventually receive a SIGCHLD after this
}
# For our purposes, we don't care whether the child exited by closing
# its output or throwing a SIGCHLD. Wrap the deletion of references to
# the child in if(defined()) to avoid warnings.
sub child_cleanup {
#print "DiskWatcher: child_cleanup args: ". (join ", ", @_). "\n";
my ($heap,$kernel) = @_[HEAP,KERNEL];
# Deactivate the kill timer
$kernel->delay(child_timeout => undef);
# We need to commit the new list of disks and recycle the child
# object. Both of these should only be called once, even if this
# routine is called twice.
if (defined($heap->{child})) {
my $info = $heap->{info};
$info->commit_our_disk_info;
delete $heap->{child};
}
}
# Consume a single line of output (thanks to using POE::Filter::Line)
sub got_child_stdout {
my ($heap,$_) = @_[HEAP,ARG0];
my ($uuid,$label,$device) = ();
$uuid = $1 if /UUID=\"([^\"]+)/;
$label = $1 if /LABEL=\"([^\"]+)/;
$device = $1 if /^(.*?):/;
return unless defined($device); # we'll silently fail if blkid
# output format is not as expected.
return unless defined($label) or defined($uuid);
my $info = $heap->{info};
# the call to add_our_disk_info just queues the update, then when we
# clean up this child, we'll instruct info to "commit" the update.
# This is needed to take care of removing old disks that are no
# longer attached.
$info->add_our_disk_info($uuid,$label,$device);
# print "STDOUT: $_\n";
}
# Echo any stderr from the child
sub got_child_stderr {
my ($heap,$stderr,$wheel) = @_[HEAP, ARG0, ARG1];
my $child = $heap->{child};
my $pid = $child->PID;
warn "blkid $pid> $stderr\n";
}
##
## The MountWatcher package will be responsible for periodically
## running mount to determine which of the known disks are actually
## mounted. It will follow pretty much the same approach as for the
## DiskWatcher package.
##
package Local::MountWatcher;
use POE qw(Wheel::Run);
##
## The MulticastServer package handles connection to a multicast group
## and sending and receving messages across it.
##
package Local::MulticastServer;
use POE;
use IO::Socket::Multicast;
use constant DATAGRAM_MAXLEN => 1500;
use constant MCAST_PORT => 32003;
use constant MCAST_GROUP => '230.1.2.3';
use constant MCAST_DESTINATION => MCAST_GROUP . ':' . MCAST_PORT;
sub new {
my $class = shift;
my %opts = (
initial_delay => 5,
frequency => 10 * 60,
info => undef,
ttl => 1, # set >1 to traverse routers
@_
);
die "UnixSocketServer::new requires info => \$var option\n"
unless defined($opts{info});
my $session =
POE::Session->create(
inline_states => {
_start => \&peer_start,
get_datagram => \&peer_read,
send_something => \&send_something,
},
heap => {
initial_delay => $opts{initial_delay},
frequency => $opts{frequency},
info => $opts{info},
ttl => $opts{ttl},
},
);
return bless { session => $session }, $class;
( run in 0.359 second using v1.01-cache-2.11-cpan-39bf76dae61 )