Mail-SpamAssassin

 view release on metacpan or  search on metacpan

lib/Mail/SpamAssassin/SpamdForkScaling.pm  view on Meta::CPAN

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# </@LICENSE>

package Mail::SpamAssassin::SpamdForkScaling;

use strict;
use warnings;
# use bytes;
use re 'taint';
use Errno qw();
use Scalar::Util qw(blessed);

use Mail::SpamAssassin::Util qw(am_running_on_windows);
use Mail::SpamAssassin::Logger;
use Mail::SpamAssassin::Timeout;

use base qw( Exporter );

our @PFSTATE_VARS = qw(
  PFSTATE_ERROR PFSTATE_STARTING PFSTATE_IDLE PFSTATE_BUSY PFSTATE_KILLED
  PFORDER_ACCEPT PFSTATE_GOT_SIGCHLD
);

our %EXPORT_TAGS = (
  'pfstates' => [ @PFSTATE_VARS ]
);
our @EXPORT_OK = ( @PFSTATE_VARS );

use constant PFSTATE_ERROR       => -1;
use constant PFSTATE_STARTING    => 0;
use constant PFSTATE_IDLE        => 1;
use constant PFSTATE_BUSY        => 2;
use constant PFSTATE_KILLED      => 3;
use constant PFSTATE_GOT_SIGCHLD => 4;

use constant PFORDER_ACCEPT      => 10;

###########################################################################

# change to 1 to enable the below test instrumentation points
use constant SUPPORT_TEST_INSTRUMENTATION => 0;

# test instrumentation point: simulate random child failures in 1 in
# every N lookups
our $TEST_MODE_CAUSE_RANDOM_KID_FAILURES = 0;

# test instrumentation point: simulate child->parent and parent->child
# write failures (needing retries) once in every N syswrite()s
our $TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES = 0;

# test instrumentation point: simulate ping failures (for unspecified
# reasons) once in every N pings
our $TEST_MODE_CAUSE_RANDOM_PING_FAILURES = 0;

###########################################################################

# we use the following protocol between the master and child processes to
# control when they accept/who accepts: server tells a child to accept with a
# PF_ACCEPT_ORDER, child responds with "B$pid\n" when it's busy, and "I$pid\n"
# once it's idle again.  In addition, the parent sends PF_PING_ORDER
# periodically to ping the child processes.  Very simple protocol.  Note that
# the $pid values are packed into 4 bytes so that the buffers are always of a
# known length; if you need to transfer longer data, assign a new protocol verb
# (the first char) and use the length of the following data buffer as the
# packed value.
use constant PF_ACCEPT_ORDER     => "A....\n";
use constant PF_PING_ORDER       => "P....\n";

# timeout for a sysread() on the command channel.  if we go this long
# without a message from the spamd parent or child, it's an error.
use constant TOUT_READ_MAX       => 300;

# interval between "ping" messages from the spamd parent to all children,
# used as a sanity check to ensure TOUT_READ_MAX isn't hit when things
# are functional.
use constant TOUT_PING_INTERVAL  => 150;

###########################################################################

sub new {
  my $class = shift;
  $class = ref($class) || $class;

  my $self = shift;
  if (!defined $self) { $self = { }; }
  bless ($self, $class);

  $self->{kids} = { };
  $self->{overloaded} = 0;
  $self->{min_children} ||= 1;
  $self->{server_last_ping} = time;

  $self;
}

###########################################################################
# Parent methods

sub add_child {
  my ($self, $pid) = @_;
  $self->set_child_state ($pid, PFSTATE_STARTING);
}

# this is called by the SIGCHLD handler in spamd.  The idea is that
# main_ping_kids etc. can mark a child as probably dead ("K" state), but until
# SIGCHLD is received, the process is still around (in some form), so it
# shouldn't be removed from the list until it's confirmed dead.
#
sub child_exited {
  my ($self, $pid) = @_;

  dbg("prefork: child $pid: just exited");

  # defer removal from the list until after return from the signal
  # handler; it seems that we may be corrupting the list structure
  # by deleting the {kids} hash entry from a sig handler. (bug 5422)
  $self->set_child_state ($pid, PFSTATE_GOT_SIGCHLD);

  # note this for the select()-caller's benefit

lib/Mail/SpamAssassin/SpamdForkScaling.pm  view on Meta::CPAN

    }

  }
  else {
    dbg("prefork: no spare children to accept, waiting for one to complete");
    return;
  }
}

sub wait_for_child_to_accept {
  my ($self, $kid, $sock) = @_;

  while (1) {
    my $state = $self->read_one_message_from_child_socket($sock);

    if ($state == PFSTATE_BUSY) {
      return 1;     # 1 == success
    }
    if ($state == PFSTATE_ERROR) {
      return;
    }
    else {
      if( Scalar::Util::blessed($self->{server_fh}[0]) eq 'IO::Socket::SSL' ) {
        warn "prefork: SSL connection protocol error";
      }
      warn "prefork: ordered child $kid to accept, but they reported state '$state', killing rogue";
      $self->child_error_kill($kid, $sock);
      $self->adapt_num_children();
      sleep 1;

      return;
    }
  }
}

sub child_now_ready_to_accept {
  my ($self, $kid) = @_;
  if ($self->{waiting_for_idle_child}) {
    my $sock = $self->{backchannel}->get_socket_for_child($kid);
    $self->syswrite_with_retry($sock, PF_ACCEPT_ORDER, $kid)
        or die "prefork: $kid claimed it was ready, but write failed on fd ".
                            $sock->fileno.": ".$!;
    $self->{waiting_for_idle_child} = 0;
  }
}

###########################################################################
# Child methods

sub set_my_pid {
  my ($self, $pid) = @_;
  $self->{pid} = $pid;  # save calling $$ all the time
}

sub update_child_status_idle {
  my ($self) = @_;
  # "I  b1 b2 b3 b4 \n "
  $self->report_backchannel_socket("I".pack("l",$self->{pid})."\n");
}

sub update_child_status_busy {
  my ($self) = @_;
  # "B  b1 b2 b3 b4 \n "
  $self->report_backchannel_socket("B".pack("l",$self->{pid})."\n");
}

sub report_backchannel_socket {
  my ($self, $str) = @_;
  my $sock = $self->{backchannel}->get_parent_socket();
  $self->syswrite_with_retry($sock, $str, 'parent')
        or die "syswrite() to parent failed: $!";
}

sub wait_for_orders {
  my ($self) = @_;

  my $sock = $self->{backchannel}->get_parent_socket();
  while (1) {
    # "A  .  .  .  .  \n "
    my $line;
    my $nbytes = $self->sysread_with_timeout($sock, \$line, 6, TOUT_READ_MAX);
    if (!defined $nbytes || $nbytes == 0) {
      if ($sock->eof()) {
        dbg("prefork: parent closed, exiting");
        exit;
      }
      die "prefork: empty order from parent";
    }
    if ($nbytes < 6) {
      warn("prefork: parent gave short message: len=$nbytes bytes=".
	   join(" ", unpack "C*", $line));
    }

    chomp $line;
    if (index ($line, "P") == 0) {  # string starts with "P" = ping
      dbg("prefork: periodic ping from spamd parent");
      if (am_running_on_windows()) {
        sleep 2;  # need this on win32 so that a child can get a signal
      }
      next;
    }
    if (index ($line, "A") == 0) {  # string starts with "A" = accept
      return PFORDER_ACCEPT;
    }
    else {
      die "prefork: unknown order from parent: '$line'";
    }
  }
}

###########################################################################

sub sysread_with_timeout {
  my ($self, $sock, $lineref, $toread, $timeout) = @_;

  $$lineref = '';   # clear the output buffer
  my $readsofar = 0;
  my $deadline; # we only set this if the first read fails
  my $buf;

retry_read:



( run in 2.907 seconds using v1.01-cache-2.11-cpan-0d23b851a93 )