Cluster-Init

 view release on metacpan or  search on metacpan

lib/Cluster/Init/Util.pm  view on Meta::CPAN

      $pending,
      $cars,
      $ran,
      $died,
      $elapsed);
    }
  }
}

sub _stacktrace
{
  my $out="";
  for (my $i=1;;$i++)
  {
    my @frame = caller($i);
    last unless @frame;
    $out .= "$frame[3] $frame[1] line $frame[2]\n";
  }
  return $out;
}

sub dq
{
  my $self=shift;
  my $e=shift;
  unless (ref $e->w)
  {
    debug "skipping $e -- no watcher";
    return 0;
  }
  my $data=$e->w->data || {};
  # warn dump $data;
  my $event=$data->{_dfa_event};
  my $desc= $e->w->desc;
  debug "$desc: isactive: ". $e->w->is_active;
  $self->killwatcher($e->w) unless $e->w->is_active;
  # delete $data->{_dfa_event};
  # $self->history($event,$data);
  unless ($event)
  {
    # my $debug=$ENV{DEBUG};
    # $ENV{DEBUG}=3;
    debug "ouch -- somehow there's no _dfa_event in \$data:\n"
    .(dump $data)."\n"
    .(dump $self)."\n"
    .(dump $e)."\n"
    ;
    # $ENV{DEBUG}=$debug;
    return 0;
  }
  debug "$desc: calling tick($event,$data)";
  $self->tick($event,$data);
}

sub event
{
  my $self=shift;
  my $event=shift;
  debug "queue event $event";
  my $data=shift || {};
  $self->timer($event,{at=>time},$data);
}

sub watcher
{
  my $self=shift;
  my $type=shift;
  my $event=shift;
  debug "create $type $event";
  my $parm=shift || {};
  my $olddata=shift || {};
  my $class=ref($self);
  # make a copy so it doesn't go 'round and 'round
  my $data = _copy($olddata);
  # $data = eval(dump($data));
  my $desc = "$self $type $event";
  unless ($event)
  {
    my $debug=$ENV{DEBUG};
    $ENV{DEBUG}=3;
    debug "oooh -- $type has no event".(dump $self);
    $ENV{DEBUG}=$debug;
    return 0;
  }
  $data->{_dfa_event}=$event;
  $parm->{desc}=$desc;
  $parm->{cb}=[$self,'dq'];
  $parm->{data}=$data;
  # debug $type, $event, $data;
  my $w = Event->$type(%$parm);
  # warn $w;
  $self->watchers($w);
  return $w;
}

# deep copy, but pass blessed and other complex refs through unchanged
sub _copy
{
  my $in=shift;
  my $ref=ref $in;
  return $in unless $ref;
  $ref eq "SCALAR" && do {my $out; $$out=$$in; return $out};
  $ref eq "ARRAY" && do
  {
    my @out = map {_copy($_)} @$in;
    return \@out;
  };
  $ref eq "HASH" && do
  {
    my %out;
    while (my ($key,$val) = each %$in)
    {
      $out{$key}=_copy($val);
    }
    return \%out;
  };
  return $in;
}

sub watchers
{
  my $self=shift;
  my $w=shift;
  if ($w)
  {
    affirm { ref $w };
    push @{$self->{watchers}}, $w;
  }
  my $out="watchers:\n";
  for my $x (@{$self->{watchers}})
  {
    next unless ref $x;
    $out.="\t".$x->desc."\n";
  }
  # warn $out;
  return @{$self->{watchers}};
}

sub killwatcher
{
  my $self=shift;
  my $w=shift;
  if (ref $w)
  {
    debug "killwatcher ".$w->desc;
    # let it finish any pending requests -- primarily catching CHLD
    # sweep() while $w->pending;
    $w->cancel;
    my @watchers = grep {$_ && $_!=$w} $self->watchers;
    $self->{watchers}=\@watchers;
  }
  return $self->watchers;
}

sub idle     { shift->watcher('idle',  @_) }
sub timer    { shift->watcher('timer', @_) }
sub io       { shift->watcher('io',    @_) }
sub var      { shift->watcher('var',   @_) }
sub sigevent { shift->watcher('signal',@_) }

sub fields
{
  my $self=shift;
  my $class = ref $self;
  affirm { $class };
  my @field=@_;
  for my $field (@field)
  {
    next if $self->can($field);
    my $var = $class."::".$field;
    debug "$var";
    no strict 'refs';
    *$field = sub 
    { 
      my $self=shift; 
      my $val=shift;
      $self->{$var}=$val if defined $val;
      return $self->{$var};
    };
  }
}

sub transit
{
  my ($self,$oldstate,$newstate,$action,@arg)=@_;
  my $class = ref $self;
  my $tag = $self->{tag} || "";
  debug "$class: $tag: newstate=>'$newstate', action=>'$action'\n";
  $self->{status}->newstate($self,$self->{name},$self->{level},$newstate) 
    if $self->{status} && $self->{name} && $self->{level};
  if ($action)
  {
    my $method=lc($action);
    my $code='$self->'.$method.'(@arg)';
    unless ($self->can($method))
    {
      warn "$code not implemented\n";
      return undef;
    }
    else
    {
      my ($event,@res) = eval ($code);
      unless(defined $event)
      {
	die "$class: '$code' died: $@\n";
      }
      debug "$class: '$code' returned '$event'\n";
      $self->event($event,@res) if $event; # =~ /^[A-Z]+[A-Z0-9]+$/;
    }
  }
  # $self->timer("foo",{at=>time});
  # $DB::single=1 if $newstate eq "DONE";
  # `strace -o /tmp/t1 -p $$` if $newstate eq "DONE";
}

sub run
{
  my $seconds=shift;
  Event->timer(at=>time() + $seconds,cb=>sub{unloop()});
  loop();
}

sub destruct
{
  my $self=shift;
  my $debug="destruct ";
  $debug.= $self->{tag} || $self;
  $debug.=" ";
  $debug.= $self->{name} || " ";
  $debug.=" ";
  $debug.= $self->{pid} || " ";
  debug $debug;
  if ($self->{pid})
  {
    debug "killing ".$self->{pid};
    kill(-9, $self->{pid});
    kill(9, $self->{pid});
    # the following line is dangerous -- could hang on hung umount
    # requests etc.
    waitpid($self->{pid},0);
  }
  for my $w ($self->watchers)
  {
    $self->killwatcher($w);
  }
  $self->{status}->remove($self,$self->{name}) 
    if $self->{status} && $self->{name};
  return 1;
}

sub DESTROY
{
  my $self=shift;
  $self->destruct;
}

1;



( run in 1.200 second using v1.01-cache-2.11-cpan-39bf76dae61 )