diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 485eea4f5e68395850b61e2ce4e1f45c912e5aa5..abab209a05ba033acc4fd1fbb91139dc112fb6eb 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -39,10 +39,26 @@ from datetime import datetime from optparse import OptionParser from sys import stderr +VALID_SPARK_VERSIONS = set([ + "0.7.3", + "0.8.0", + "0.8.1", + "0.9.0", + "0.9.1", + "0.9.2", + "1.0.0", + "1.0.1", + "1.0.2", + "1.1.0", + "1.1.1", + "1.2.0", +]) + DEFAULT_SPARK_VERSION = "1.2.0" +DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark" SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__)) - MESOS_SPARK_EC2_BRANCH = "branch-1.3" + # A URL prefix from which to fetch AMI information AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/{b}/ami-list".format(b=MESOS_SPARK_EC2_BRANCH) @@ -126,8 +142,8 @@ def parse_args(): help="Version of Spark to use: 'X.Y.Z' or a specific git hash (default: %default)") parser.add_option( "--spark-git-repo", - default="https://github.com/apache/spark", - help="Github repo from which to checkout supplied commit hash") + default=DEFAULT_SPARK_GITHUB_REPO, + help="Github repo from which to checkout supplied commit hash (default: %default)") parser.add_option( "--hadoop-major-version", default="1", help="Major version of Hadoop (default: %default)") @@ -236,6 +252,26 @@ def get_or_make_group(conn, name, vpc_id): return conn.create_security_group(name, "Spark EC2 group", vpc_id) +def get_validate_spark_version(version, repo): + if "." in version: + version = version.replace("v", "") + if version not in VALID_SPARK_VERSIONS: + print >> stderr, "Don't know about Spark version: {v}".format(v=version) + sys.exit(1) + return version + else: + github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version) + request = urllib2.Request(github_commit_url) + request.get_method = lambda: 'HEAD' + try: + response = urllib2.urlopen(request) + except urllib2.HTTPError, e: + print >> stderr, "Couldn't validate Spark commit: {url}".format(url=github_commit_url) + print >> stderr, "Received HTTP response code of {code}.".format(code=e.code) + sys.exit(1) + return version + + # Check whether a given EC2 instance object is in a state we consider active, # i.e. not terminating or terminated. We count both stopping and stopped as # active since we can restart stopped clusters. @@ -243,29 +279,6 @@ def is_active(instance): return (instance.state in ['pending', 'running', 'stopping', 'stopped']) -# Return correct versions of Spark and Shark, given the supplied Spark version -def get_spark_shark_version(opts): - spark_shark_map = { - "0.7.3": "0.7.1", - "0.8.0": "0.8.0", - "0.8.1": "0.8.1", - "0.9.0": "0.9.0", - "0.9.1": "0.9.1", - # These are dummy versions (no Shark versions after this) - "1.0.0": "1.0.0", - "1.0.1": "1.0.1", - "1.0.2": "1.0.2", - "1.1.0": "1.1.0", - "1.1.1": "1.1.1", - "1.2.0": "1.2.0", - } - version = opts.spark_version.replace("v", "") - if version not in spark_shark_map: - print >> stderr, "Don't know about Spark version: %s" % version - sys.exit(1) - return (version, spark_shark_map[version]) - - # Attempt to resolve an appropriate AMI given the architecture and region of the request. # Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/ # Last Updated: 2014-06-20 @@ -619,7 +632,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): print slave.public_dns_name ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar) - modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs', + modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone', 'tachyon'] if opts.hadoop_major_version == "1": @@ -706,9 +719,7 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state): sys.stdout.flush() start_time = datetime.now() - num_attempts = 0 - conn = ec2.connect_to_region(opts.region) while True: time.sleep(5 * num_attempts) # seconds @@ -815,13 +826,11 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): cluster_url = "%s:7077" % active_master if "." in opts.spark_version: - # Pre-built spark & shark deploy - (spark_v, shark_v) = get_spark_shark_version(opts) + # Pre-built Spark deploy + spark_v = get_validate_spark_version(opts.spark_version, opts.spark_git_repo) else: # Spark-only custom deploy spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version) - shark_v = "" - modules = filter(lambda x: x != "shark", modules) template_vars = { "master_list": '\n'.join([i.public_dns_name for i in master_nodes]), @@ -834,7 +843,6 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): "swap": str(opts.swap), "modules": '\n'.join(modules), "spark_version": spark_v, - "shark_version": shark_v, "hadoop_major_version": opts.hadoop_major_version, "spark_worker_instances": "%d" % opts.worker_instances, "spark_master_opts": opts.master_opts @@ -983,6 +991,8 @@ def real_main(): (opts, action, cluster_name) = parse_args() # Input parameter validation + get_validate_spark_version(opts.spark_version, opts.spark_git_repo) + if opts.wait is not None: # NOTE: DeprecationWarnings are silent in 2.7+ by default. # To show them, run Python with the -Wdefault switch.