Parallel-Dragons

 view release on metacpan or  search on metacpan

lib/Parallel/Dragons.pm  view on Meta::CPAN


    my ($fname) = $ENV{DRAGON_SOCKET_FILE};
    if ($fname) {
        $fname =~ s{\W+}{-}g;
    } else {
        ($fname) = $0 =~ m{([^/]+)(?:.pl)$};
        $fname = 'there-are-dragons' unless $fname;
    }

    return "/tmp/$fname";
}

sub get_client_socket {
    my $self = shift;

    return $self->{client_socket}
        if $self->{client_socket};

    my $sockfile = $self->sockfile;
    $sockfile .= '.sock';
    return unless -e $sockfile;

    return $self->{client_socket}
        = IO::Socket::UNIX->new(
                Peer    => $sockfile,
                Timeout => 3,
            );
}

sub get_listen_socket {
    my $self = shift;
    return $self->{server_socket}
        if $self->{server_socket};

    my $sfile = $self->sockfile;
    $sfile .= '.sock';

    unlink $sfile if -e $sfile;

    $self->{server_socket} = IO::Socket::UNIX->new(
            Local   => $sfile,
            Type    => SOCK_STREAM,
            Listen  => 5
        );

    chmod 0777, $sfile;

    my $pfile = $self->sockfile;
    $pfile .= '.pid';
    open my $fh, '>', $pfile;
    print $fh $$;
    close $pfile;

    return $self->{server_socket};
}

sub daemon_name {
  my $self = shift;

  my $name = $self->sockfile;
  ($name) = (split m{/}, $name)[-1];
  return "Dragons of $name";
}

sub check_daemon {
    my $self = shift;

    if (my $pid = $self->_get_pid()) {
        my $cnt = kill 0, $pid;
        return $cnt;

    } else {
        my $sock = $self->get_client_socket();

        if ($sock) {
            print $sock "ping\n";
            while ( my $res = <$sock> ) {
                if ($res =~ m{^pong\s*$}i) {
                    return 1;
                }
            }
        }
    }

    return 0;
}

sub stop {
    my $self = shift;
    $self->{stopping} = 1;

    my @pids_to_stop = map { $_->{pid} } @{$self->{_childs}};
    TRACE "Notifying the family: @pids_to_stop";
    kill 'USR1', @pids_to_stop if @pids_to_stop;
}

sub fork_child {
	my ($self, $sub) = @_;

    my $pid= fork;
    if ($pid) {
        # we are in the parent, and we got a kid
        my %pid = (
            pid         => $pid,
            start_time  => time,
        );

        push @{ $self->{_childs} }, \%pid;
    
        return \%pid;
    } elsif (defined $pid) {
        srand();
        eval {
            $sub->();
            1;
        } or do {
            my $error = $@ || 'Zombie error';
            FATAL "Error in child process: %s", $error;
        };

        exit 0;

lib/Parallel/Dragons.pm  view on Meta::CPAN


            TRACE "Starting child_main_run";

            $self->child_main_run();
        }
    );

    if ($self->can('post_child_start') ) {
        TRACE "Calling post_child_start";
        eval {
            $self->post_child_start( $child );
            1;
        } or do {
            my $error = $@;
            FATAL "error on post_child_start: $error\n";
        }
    }

    if ($self->{_dynamic_wait}) {
        $self->{_restart_next} = $self->restart_next;
    } else {
        $self->{_restart_next} = time + $self->{_wait};
    }
}

sub _idle {
  my ($self,$done) = @_;

  return if $done;

  $self->idle()
    if $self->can('idle');

  return;
}

#######################################################################
# UnixSocket Client Session
#######################################################################

my %daemon_commands = (
    stop    => \&_us_cmd_stop_daemon,
    pause   => \&_us_cmd_pause_daemon,
    play    => \&_us_cmd_play_daemon,
    ping    => \&_us_cmd_ping,
    info    => \&_us_cmd_info,
);

sub communicate {
    my $self = shift;

    my $socket  = $self->get_listen_socket();
    my $conn    = $socket->accept();

    local $SIG{ALRM} = sub { TRACE "Timeout in communication attempt. Killing server process\n"; exit 1; };

    alarm(10);

    my $input = <$conn>;

    my @args = split m{\s+}, $input;

    while (my $cmd = shift @args) {
        $cmd = lc $cmd;
        if ($daemon_commands{ $cmd } ) {
            my @res = $daemon_commands{ $cmd }->(
                    $self, $cmd, \@args
                );

            TRACE "Replying to $cmd: ", @res;

            print $conn "$cmd reply:\n";
            print $conn $_,"\n" for @res;
        } else {
            print $conn "Command '$cmd' is not known\n";
        }

        if ($self->{stopping}) {
            TRACE "Stopping now";
            exit 0;
        }
    }

    alarm 0;
    $conn->close;
}

######################################################################
# UnixSocket command handlers - handle piped commands
######################################################################

sub _us_cmd_stop_daemon {
    my $self    = shift;
    my $cmd     = shift;
    my $args    = shift;

    INFO "Got a request to stop - waiting for childs and exiting";
    $self->stop;

    return ('stopping');
}

sub _us_cmd_ping {
    my $self = shift;
    my ($cmd, $args) = @_;

    return ('pong');
}

sub _us_cmd_info {
    my $self = shift;

    my $name    = $self->daemon_name;
    my $childs  = scalar @{ $self->{_childs} };

    my $next = localtime($self->{_restart_next});
    my $stopping = $self->{stopping} ? "\nDaemon is Stopping\n":"";

    my $res = <<EoI;
$name [server pid: $$]



( run in 0.975 second using v1.01-cache-2.11-cpan-71847e10f99 )