AnyEvent-Task

 view release on metacpan or  search on metacpan

lib/AnyEvent/Task/Server.pm  view on Meta::CPAN

    my $interface = $arg{interface};

    if (ref $interface eq 'CODE') {
      $self->{interface} = $interface;
    } elsif (ref $interface eq 'HASH') {
      $self->{interface} = sub {
        my $method = shift;
        $interface->{$method}->(@_);
      };
    } else {
      die "interface must be a sub or a hash";
    }
  } else {
    die "unspecified interface";
  }


  return $self;
}


sub fork_task_server {
  my (@args) = @_;

  if (wantarray) {
    return AnyEvent::Task::Util::fork_anyevent_subprocess(sub {
             AnyEvent::Task::Server->new(@args)->run;
           });
  } else {
    AnyEvent::Task::Util::fork_anyevent_subprocess(sub {
      AnyEvent::Task::Server->new(@args)->run;
      return undef;
    });

    return undef;
  }
}




sub handle_new_connection {
  my ($self, $fh) = @_;

  my ($monitor_fh1, $monitor_fh2) = AnyEvent::Util::portable_socketpair;

  $self->{curr_worker_id}++;

  my $rv = fork;

  if ($rv) {
    close($fh);
    close($monitor_fh2);

    $self->{children}->{$rv} = {
      monitor_fh => $monitor_fh1,
    };
  } elsif ($rv == 0) {
    close($monitor_fh1);

    ## Don't want keep-alive pipes of other workers open in this worker
    foreach my $child (keys %{$self->{children}}) {
      close($self->{children}->{$child}->{monitor_fh});
    }

    if (defined $self->{name}) {
      $0 = "AET-Worker:$self->{name}($self->{curr_worker_id})";
    }

    AnyEvent::Task::Server::Worker::handle_worker($self, $fh, $monitor_fh2);
    die "handle_worker should never return";
  } else {
    close($fh);
    close($monitor_fh1);
    close($monitor_fh2);
    die "fork failed: $!";
  }
}


sub run {
  my ($self) = @_;

  $self->{all_done_cv}->recv;
}


1;



( run in 0.443 second using v1.01-cache-2.11-cpan-df04353d9ac )