GBrowse

 view release on metacpan or  search on metacpan

lib/Bio/Graphics/Browser2/Render/Slave/AWS_Balancer.pm  view on Meta::CPAN

	$sg->authorize_incoming(-protocol => 'tcp',
				-port     => 22,
				-source_ip=> "$ip/32"))
	if $self->slave_ssh_key;
    
    $sg->update or croak $ec2->error_str;
    return $self->{slave_security_group} = $sg;
}

sub ec2 {
    my $self = shift;
    # create a new ec2 each time because security credentials may expire
    my @credentials = $self->ec2_credentials;
    return $self->{ec2} = VM::EC2->new(-endpoint    => $self->slave_endpoint,
				       -raise_error => 1,
				       @credentials);
}

sub internal_ip {
    my $self = shift;
    return unless $self->running_as_instance;
    return $self->{instance_metadata}->privateIpAddress;
}

sub master_security_group {
    my $self = shift;
    return unless $self->running_as_instance;
    my $sg = ($self->{instance_metadata}->securityGroups)[0];
    $sg    =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/eg;
    return $sg;
}

sub master_ip {
    my $self = shift;
    my $ip   = $self->option('MASTER','external_ip');
    $ip ||= $self->_get_external_ip;
    return $ip;
}

# poll interval in seconds
sub master_poll {
    my $self = shift;
    my $pi   = $self->option('MASTER','poll_interval');
    return $pi * 60;
}

sub master_server_status_url {
    my $self = shift;
    return $self->option('MASTER','server_status_url') 
	|| 'http://localhost/server-status';
}

sub running_as_instance {
    my $self = shift;
    return -e '/var/lib/cloud/data/previous-instance-id' 
	&& head('http://169.254.169.254');
}


# update conf file with new snapshot images
sub update_data_snapshots {
    my $self = shift;
    my @snapshot_ids = @_;
    my $timestamp     = 'synchronized with local filesystem on '.localtime;
    my $conf_file     = $self->conf_file;
    my ($user,$group) = (stat($conf_file))[4,5];
    open my $in,'<',$conf_file        or die "Couldn't open $conf_file: $!";
    open my $out,'>',"$conf_file.new" or die "Couldn't open $conf_file: $!";
    while (<$in>) {
	chomp;
	s/^(data_snapshots\s*=).*/$1 @snapshot_ids # $timestamp/;
	print $out "$_\n";
    }
    close $in;
    close $out;
    rename "$conf_file","$conf_file.bak" or die "Can't rename $conf_file: $!";
    rename "$conf_file.new","$conf_file" or die "Can't rename $conf_file.new: $!";
    chown $user,$group,$conf_file;
}

#######################
# status
######################

# return true if slave is listening on at least one of the designated ports
sub ping_slave {
    my $self      = shift;
    my $instance  = shift;
    my $ip        = $self->running_as_instance?$instance->privateIpAddress:$instance->ipAddress;
    my ($port) = $self->slave_ports;
    my $ua     = LWP::UserAgent->new;
    my $req    = HTTP::Request->new(HEAD => "http://$ip:$port");
    my $res    = $ua->request($req);
    return $res->code == 403;
}

# returns list of slave instances as VM::EC2::Instance objects
sub running_slaves {
    my $self = shift;
    $self->{running_slaves} ||= {};
    return values %{$self->{running_slaves}};
}

sub add_slave {
    my $self = shift;
    my $instance = shift;
    $self->{running_slaves}{$instance}=$instance;
}

sub remove_slave {
    my $self = shift;
    my $instance = shift;
    delete $self->{running_slaves}{$instance};
}

# given an instance ID, returns the slave VM::EC2::Instance object
sub id2slave {
    my $self = shift;
    my $id   = shift;
    return $self->{running_slaves}{$id};
}

lib/Bio/Graphics/Browser2/Render/Slave/AWS_Balancer.pm  view on Meta::CPAN

    my $self = shift;
    my $id   = shift;
    return $self->{pending_requests}{$id};
}

sub get_load {
    my $self      = shift;
    $self->{pr} ||= Parse::Apache::ServerStatus->new(url=>$self->master_server_status_url);
    if (-e '/tmp/gbrowse_load') {
	open my $fh,'/tmp/gbrowse_load';
	chomp (my $load = <$fh>);
	return $load;
    }
    my $stats = $self->{pr}->get or $self->fatal("couldn't fetch load from Apache status: ",$self->{pr}->errstr);
    return $stats->{rs};
}


###########################################
# state change
###########################################

# this is called to update the number of live and pending slaves
# according to the load
sub adjust_instances {
    my $self = shift;
    my $load = shift;
    my ($min,$max) = $self->slaves_wanted($load);
    my $current    = $self->pending_spot_requests + $self->running_slaves;
    
    if ($current < $min) {
	$self->log_debug("Need to add more slave spot instances (have $current, wanted $min)\n");
	$self->request_spot_instance while $current++ < $min;
    }

    elsif ($current > $max) {
	$self->log_debug("Need to delete some slave spot instances (have $current, wanted $max)\n");
	my $reconfigure;
	my $ec2        = $self->ec2;
	my @candidates = ($self->pending_spot_requests,$self->running_slaves);
	while ($current-- > $max) {
	    my $c = shift @candidates;
	    if ($c->isa('VM::EC2::Spot::InstanceRequest')) {
		$self->log_debug("Cancelling spot instance request $c\n");
		$ec2->cancel_spot_instance_requests($c);
		$self->remove_spot_request($c);
		$reconfigure++;
	    } elsif ($c->isa('VM::EC2::Instance')) {
		$self->log_debug("Terminating slave instance $c\n");
		$ec2->terminate_instances($c);
		$self->remove_slave($c);
		$reconfigure++;
	    }
	}
	# we reconfigure master immediately to avoid calling instance that were terminated
	$self->reconfigure_master() if $reconfigure;
    }
}

# this is called to act on state changes in spot requests and instances
sub update_requests {
    my $self = shift;
    my @requests = $self->pending_spot_requests;
    for my $sr (@requests) {
	my $state    = $sr->current_status;
	$self->log_debug("Status of $sr is $state");
	my $instance = $sr->instance;
	if ($state eq 'fulfilled' && $instance && $instance->instanceState eq 'running') {
	    $instance->add_tag(Name => 'GBrowse Slave');
	    $self->log_debug("New instance $instance; testing readiness");
	    next unless $self->ping_slave($instance);   # not ready - try again on next poll
	    $self->log_debug("New slave instance is ready");
	    $self->add_slave($instance);
	    $self->remove_spot_request($sr);            # we will never check this request again
	    $self->reconfigure_master();
	} elsif ($sr->current_state =~ /cancelled|failed/ ) {
	    $self->remove_spot_request($sr);
	}
    }
}

# launch a spot instance request
sub request_spot_instance {
    my $self = shift;
    my $ec2  = $self->ec2;

    my $subnet = $self->slave_subnet;
    my @ports  = $self->slave_ports;
    my $key    = $self->slave_ssh_key;

    my @options = (
	-image_id             => $self->slave_image_id,
	-instance_type        => $self->slave_instance_type,
	-instance_count       => 1,
	-security_group_id    => $self->slave_security_group,
	-spot_price           => $self->slave_spot_bid,
	-block_device_mapping => $self->slave_block_device_mapping,
	-user_data         => "#!/bin/sh\nexec /opt/gbrowse/etc/init.d/gbrowse-slave start @ports",
	$subnet? (-subnet_id  => $subnet)          : (),
	$key   ? (-key_name   => $key)             : (),
	);

    my @debug_options;
    for (my $i = 0;$i<@options;$i+=2) {
	my $a  = $options[$i];
	my $v  = $options[$i+1];
	if (ref $v && ref $v eq 'ARRAY') {
	    push @debug_options,($a=>$_) foreach @$v;
	} else {
	    push @debug_options,($a=>$v);
	}
    }
    

    my $debug_options = "@debug_options";
    $debug_options    =~ tr/\n/ /;

    $self->log_debug("Launching a spot request with options: $debug_options\n");
    my @requests = $ec2->request_spot_instances(@options);
    @requests or croak $ec2->error_str;



( run in 0.611 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )