From 317e114e11669899618c7c06bbc0091b36618f36 Mon Sep 17 00:00:00 2001
From: Nicholas Chammas <nicholas.chammas@gmail.com>
Date: Sat, 29 Nov 2014 00:31:06 -0800
Subject: [PATCH] [SPARK-3398] [SPARK-4325] [EC2] Use EC2 status checks.

This PR re-introduces [0e648bc](https://github.com/apache/spark/commit/0e648bc2bedcbeb55fce5efac04f6dbad9f063b4) from PR #2339, which somehow never made it into the codebase.

Additionally, it removes a now-unnecessary linear backoff on the SSH checks since we are blocking on EC2 status checks before testing SSH.

Author: Nicholas Chammas <nicholas.chammas@gmail.com>

Closes #3195 from nchammas/remove-ec2-ssh-backoff and squashes the following commits:

efb29e1 [Nicholas Chammas] Revert "Remove linear backoff."
ef3ca99 [Nicholas Chammas] reuse conn
adb4eaa [Nicholas Chammas] Remove linear backoff.
55caa24 [Nicholas Chammas] Check EC2 status checks before SSH.
---
 ec2/spark_ec2.py | 48 ++++++++++++++++++++++++++++++++++++------------
 1 file changed, 36 insertions(+), 12 deletions(-)

diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 742c7765e7..b83decadc2 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -33,6 +33,7 @@ import tempfile
 import time
 import urllib2
 import warnings
+from datetime import datetime
 from optparse import OptionParser
 from sys import stderr
 import boto
@@ -589,7 +590,9 @@ def setup_spark_cluster(master, opts):
 
 
 def is_ssh_available(host, opts):
-    "Checks if SSH is available on the host."
+    """
+    Check if SSH is available on a host.
+    """
     try:
         with open(os.devnull, 'w') as devnull:
             ret = subprocess.check_call(
@@ -604,6 +607,9 @@ def is_ssh_available(host, opts):
 
 
 def is_cluster_ssh_available(cluster_instances, opts):
+    """
+    Check if SSH is available on all the instances in a cluster.
+    """
     for i in cluster_instances:
         if not is_ssh_available(host=i.ip_address, opts=opts):
             return False
@@ -611,8 +617,10 @@ def is_cluster_ssh_available(cluster_instances, opts):
         return True
 
 
-def wait_for_cluster_state(cluster_instances, cluster_state, opts):
+def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
     """
+    Wait for all the instances in the cluster to reach a designated state.
+
     cluster_instances: a list of boto.ec2.instance.Instance
     cluster_state: a string representing the desired state of all the instances in the cluster
            value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
@@ -620,20 +628,27 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts):
            (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
     """
     sys.stdout.write(
-        "Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
+        "Waiting for cluster to enter '{s}' state.".format(s=cluster_state)
     )
     sys.stdout.flush()
 
+    start_time = datetime.now()
+
     num_attempts = 0
+    conn = ec2.connect_to_region(opts.region)
 
     while True:
-        time.sleep(3 * num_attempts)
+        time.sleep(5 * num_attempts)  # seconds
 
         for i in cluster_instances:
-            s = i.update()  # capture output to suppress print to screen in newer versions of boto
+            i.update()
+
+        statuses = conn.get_all_instance_status(instance_ids=[i.id for i in cluster_instances])
 
         if cluster_state == 'ssh-ready':
             if all(i.state == 'running' for i in cluster_instances) and \
+               all(s.system_status.status == 'ok' for s in statuses) and \
+               all(s.instance_status.status == 'ok' for s in statuses) and \
                is_cluster_ssh_available(cluster_instances, opts):
                 break
         else:
@@ -647,6 +662,12 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts):
 
     sys.stdout.write("\n")
 
+    end_time = datetime.now()
+    print "Cluster is now in '{s}' state. Waited {t} seconds.".format(
+        s=cluster_state,
+        t=(end_time - start_time).seconds
+    )
+
 
 # Get number of local disks available for a given EC2 instance type.
 def get_num_disks(instance_type):
@@ -895,7 +916,7 @@ def real_main():
         # See: https://docs.python.org/3.5/whatsnew/2.7.html
         warnings.warn(
             "This option is deprecated and has no effect. "
-            "spark-ec2 automatically waits as long as necessary for clusters to startup.",
+            "spark-ec2 automatically waits as long as necessary for clusters to start up.",
             DeprecationWarning
         )
 
@@ -922,9 +943,10 @@ def real_main():
         else:
             (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
         wait_for_cluster_state(
+            conn=conn,
+            opts=opts,
             cluster_instances=(master_nodes + slave_nodes),
-            cluster_state='ssh-ready',
-            opts=opts
+            cluster_state='ssh-ready'
         )
         setup_cluster(conn, master_nodes, slave_nodes, opts, True)
 
@@ -951,9 +973,10 @@ def real_main():
                 print "Deleting security groups (this will take some time)..."
                 group_names = [cluster_name + "-master", cluster_name + "-slaves"]
                 wait_for_cluster_state(
+                    conn=conn,
+                    opts=opts,
                     cluster_instances=(master_nodes + slave_nodes),
-                    cluster_state='terminated',
-                    opts=opts
+                    cluster_state='terminated'
                 )
                 attempt = 1
                 while attempt <= 3:
@@ -1055,9 +1078,10 @@ def real_main():
             if inst.state not in ["shutting-down", "terminated"]:
                 inst.start()
         wait_for_cluster_state(
+            conn=conn,
+            opts=opts,
             cluster_instances=(master_nodes + slave_nodes),
-            cluster_state='ssh-ready',
-            opts=opts
+            cluster_state='ssh-ready'
         )
         setup_cluster(conn, master_nodes, slave_nodes, opts, False)
 
-- 
GitLab