Skip to content
Snippets Groups Projects
Commit 5912ca67 authored by Nicholas Chammas's avatar Nicholas Chammas Committed by Josh Rosen
Browse files

[SPARK-3398] [EC2] Have spark-ec2 intelligently wait for specific cluster states

Instead of waiting arbitrary amounts of time for the cluster to reach a specific state, this patch lets `spark-ec2` explicitly wait for a cluster to reach a desired state.

This is useful in a couple of situations:
* The cluster is launching and you want to wait until SSH is available before installing stuff.
* The cluster is being terminated and you want to wait until all the instances are terminated before trying to delete security groups.

This patch removes the need for the `--wait` option and removes some of the time-based retry logic that was being used.

Author: Nicholas Chammas <nicholas.chammas@gmail.com>

Closes #2339 from nchammas/spark-ec2-wait-properly and squashes the following commits:

43a69f0 [Nicholas Chammas] short-circuit SSH check; linear backoff
9a9e035 [Nicholas Chammas] remove extraneous comment
26c5ed0 [Nicholas Chammas] replace print with write()
bb67c06 [Nicholas Chammas] deprecate wait option; remove dead code
7969265 [Nicholas Chammas] fix long line (PEP 8)
126e4cf [Nicholas Chammas] wait for specific cluster states
parent b32bb72e
No related branches found
No related tags found
No related merge requests found
...@@ -32,6 +32,7 @@ import sys ...@@ -32,6 +32,7 @@ import sys
import tempfile import tempfile
import time import time
import urllib2 import urllib2
import warnings
from optparse import OptionParser from optparse import OptionParser
from sys import stderr from sys import stderr
import boto import boto
...@@ -61,8 +62,8 @@ def parse_args(): ...@@ -61,8 +62,8 @@ def parse_args():
"-s", "--slaves", type="int", default=1, "-s", "--slaves", type="int", default=1,
help="Number of slaves to launch (default: %default)") help="Number of slaves to launch (default: %default)")
parser.add_option( parser.add_option(
"-w", "--wait", type="int", default=120, "-w", "--wait", type="int",
help="Seconds to wait for nodes to start (default: %default)") help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start")
parser.add_option( parser.add_option(
"-k", "--key-pair", "-k", "--key-pair",
help="Key pair to use on instances") help="Key pair to use on instances")
...@@ -195,18 +196,6 @@ def get_or_make_group(conn, name): ...@@ -195,18 +196,6 @@ def get_or_make_group(conn, name):
return conn.create_security_group(name, "Spark EC2 group") return conn.create_security_group(name, "Spark EC2 group")
# Wait for a set of launched instances to exit the "pending" state
# (i.e. either to start running or to fail and be terminated)
def wait_for_instances(conn, instances):
while True:
for i in instances:
i.update()
if len([i for i in instances if i.state == 'pending']) > 0:
time.sleep(5)
else:
return
# Check whether a given EC2 instance object is in a state we consider active, # Check whether a given EC2 instance object is in a state we consider active,
# i.e. not terminating or terminated. We count both stopping and stopped as # i.e. not terminating or terminated. We count both stopping and stopped as
# active since we can restart stopped clusters. # active since we can restart stopped clusters.
...@@ -619,14 +608,64 @@ def setup_spark_cluster(master, opts): ...@@ -619,14 +608,64 @@ def setup_spark_cluster(master, opts):
print "Ganglia started at http://%s:5080/ganglia" % master print "Ganglia started at http://%s:5080/ganglia" % master
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up def is_ssh_available(host, opts):
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes): "Checks if SSH is available on the host."
print "Waiting for instances to start up..." try:
time.sleep(5) with open(os.devnull, 'w') as devnull:
wait_for_instances(conn, master_nodes) ret = subprocess.check_call(
wait_for_instances(conn, slave_nodes) ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
print "Waiting %d more seconds..." % wait_secs '%s@%s' % (opts.user, host), stringify_command('true')],
time.sleep(wait_secs) stdout=devnull,
stderr=devnull
)
return ret == 0
except subprocess.CalledProcessError as e:
return False
def is_cluster_ssh_available(cluster_instances, opts):
for i in cluster_instances:
if not is_ssh_available(host=i.ip_address, opts=opts):
return False
else:
return True
def wait_for_cluster_state(cluster_instances, cluster_state, opts):
"""
cluster_instances: a list of boto.ec2.instance.Instance
cluster_state: a string representing the desired state of all the instances in the cluster
value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
'running', 'terminated', etc.
(would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
"""
sys.stdout.write(
"Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
)
sys.stdout.flush()
num_attempts = 0
while True:
time.sleep(3 * num_attempts)
for i in cluster_instances:
s = i.update() # capture output to suppress print to screen in newer versions of boto
if cluster_state == 'ssh-ready':
if all(i.state == 'running' for i in cluster_instances) and \
is_cluster_ssh_available(cluster_instances, opts):
break
else:
if all(i.state == cluster_state for i in cluster_instances):
break
num_attempts += 1
sys.stdout.write(".")
sys.stdout.flush()
sys.stdout.write("\n")
# Get number of local disks available for a given EC2 instance type. # Get number of local disks available for a given EC2 instance type.
...@@ -868,6 +907,16 @@ def real_main(): ...@@ -868,6 +907,16 @@ def real_main():
(opts, action, cluster_name) = parse_args() (opts, action, cluster_name) = parse_args()
# Input parameter validation # Input parameter validation
if opts.wait is not None:
# NOTE: DeprecationWarnings are silent in 2.7+ by default.
# To show them, run Python with the -Wdefault switch.
# See: https://docs.python.org/3.5/whatsnew/2.7.html
warnings.warn(
"This option is deprecated and has no effect. "
"spark-ec2 automatically waits as long as necessary for clusters to startup.",
DeprecationWarning
)
if opts.ebs_vol_num > 8: if opts.ebs_vol_num > 8:
print >> stderr, "ebs-vol-num cannot be greater than 8" print >> stderr, "ebs-vol-num cannot be greater than 8"
sys.exit(1) sys.exit(1)
...@@ -890,7 +939,11 @@ def real_main(): ...@@ -890,7 +939,11 @@ def real_main():
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
else: else:
(master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes) wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
)
setup_cluster(conn, master_nodes, slave_nodes, opts, True) setup_cluster(conn, master_nodes, slave_nodes, opts, True)
elif action == "destroy": elif action == "destroy":
...@@ -919,7 +972,11 @@ def real_main(): ...@@ -919,7 +972,11 @@ def real_main():
else: else:
group_names = [opts.security_group_prefix + "-master", group_names = [opts.security_group_prefix + "-master",
opts.security_group_prefix + "-slaves"] opts.security_group_prefix + "-slaves"]
wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='terminated',
opts=opts
)
attempt = 1 attempt = 1
while attempt <= 3: while attempt <= 3:
print "Attempt %d" % attempt print "Attempt %d" % attempt
...@@ -1019,7 +1076,11 @@ def real_main(): ...@@ -1019,7 +1076,11 @@ def real_main():
for inst in master_nodes: for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]: if inst.state not in ["shutting-down", "terminated"]:
inst.start() inst.start()
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes) wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
)
setup_cluster(conn, master_nodes, slave_nodes, opts, False) setup_cluster(conn, master_nodes, slave_nodes, opts, False)
else: else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment