AI-Evolve-Befunge
view release on metacpan or search on metacpan
lib/AI/Evolve/Befunge/Migrator.pm view on Meta::CPAN
my $pid = fork();
if($pid) {
close($sock2);
return $sock1;
}
close($sock1);
for my $fd (0..sysconf(_SC_OPEN_MAX)-1) {
next if $fd == $sock2->fileno();
next if $fd == STDERR->fileno();
POSIX::close($fd);
}
$sock2->blocking(0);
my $migrator = AI::Evolve::Befunge::Migrator->new(Local => $sock2);
$migrator->spin() while $migrator->alive();
exit(0);
}
=head1 METHODS
=head2 spin
$migrator->spin();
This is the main control component of this module. It looks for
incoming events and responds to them.
=cut
sub spin {
my $self = shift;
$self->spin_reads();
$self->spin_writes();
$self->spin_exceptions();
}
=head2 spin_reads
$migrator->spin_reads();
Handle read-related events. This method will delay for up to 2
seconds if no reading is necessary.
=cut
sub spin_reads {
my $self = shift;
$self->try_connect() unless defined $$self{sock};
my $select = IO::Select->new($$self{loc});
$select->add($$self{sock}) if defined $$self{sock};
my @sockets = $select->can_read(2);
foreach my $socket (@sockets) {
if($socket == $$self{loc}) {
my $rv = $socket->sysread($$self{txbuf}, 4096, length($$self{txbuf}));
$$self{dead} = 1 unless $rv;
} else {
my $rv = $socket->sysread($$self{rxbuf}, 4096, length($$self{rxbuf}));
if(!defined($rv) || $rv < 0) {
debug("Migrator: closing socket due to read error: $!\n");
undef $$self{sock};
next;
}
if(!$rv) {
debug("Migrator: closing socket due to EOF\n");
undef $$self{sock};
}
}
}
}
=head2 spin_writes
$migrator->spin_writes();
Handle write-related events. This method will not block.
=cut
sub spin_writes {
my $self = shift;
$self->try_connect() unless defined $$self{sock};
return unless length($$self{txbuf} . $$self{rxbuf});
my $select = IO::Select->new();
$select->add($$self{loc}) if length $$self{rxbuf};
$select->add($$self{sock}) if(length $$self{txbuf} && defined($$self{sock}));
my @sockets = $select->can_write(0);
foreach my $socket (@sockets) {
if($socket == $$self{loc}) {
my $rv = $socket->syswrite($$self{rxbuf}, length($$self{rxbuf}));
if($rv > 0) {
substr($$self{rxbuf}, 0, $rv, '');
}
debug("Migrator: write on loc socket reported error $!\n") if($rv < 0);
}
if($socket == $$self{sock}) {
my $rv = $socket->syswrite($$self{txbuf}, length($$self{txbuf}));
if(!defined($rv)) {
debug("Migrator: closing socket due to undefined syswrite retval\n");
undef $$self{sock};
next;
}
if($rv > 0) {
substr($$self{txbuf}, 0, $rv, '');
}
if($rv < 0) {
debug("Migrator: closing socket due to write error $!\n");
undef $$self{sock};
}
}
}
}
=head2 spin_exceptions
$migrator->spin_exceptions();
Handle exception-related events. This method will not block.
=cut
sub spin_exceptions {
my $self = shift;
my $select = IO::Select->new();
$select->add($$self{loc});
$select->add($$self{sock}) if defined($$self{sock});
my @sockets = $select->has_exception(0);
foreach my $socket (@sockets) {
if($socket == $$self{loc}) {
debug("Migrator: dying: select exception on loc socket\n");
$$self{dead} = 1;
}
if($socket == $$self{sock}) {
debug("Migrator: closing socket due to select exception\n");
undef $$self{sock};
}
}
}
=head2 alive
exit unless $migrator->alive();
Returns true while migrator still wants to live.
=cut
sub alive {
my $self = shift;
return !$$self{dead};
}
=head2 try_connect
$migrator->try_connect();
Try to establish a new connection to migrationd.
=cut
sub try_connect {
my $self = shift;
my $host = $$self{host};
my $port = $$self{port};
my $last = $$self{lastc};
return if $last > (time() - 2);
return if $$self{dead};
debug("Migrator: attempting to connect to $host:$port\n");
$$self{lastc} = time();
$$self{sock} = IO::Socket::INET->new(
Proto => 'tcp',
PeerAddr => $host,
PeerPort => $port,
Blocking => 0,
);
}
1;
( run in 1.317 second using v1.01-cache-2.11-cpan-39bf76dae61 )