Skip to content
Snippets Groups Projects
Commit 13e3dd98 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Removing mesos support

parent d5f74aa9
No related branches found
No related tags found
No related merge requests found
#!/bin/bash
# These variables are automatically filled in by the mesos-ec2 script.
export MESOS_MASTERS="{{master_list}}"
export MESOS_SLAVES="{{slave_list}}"
export MESOS_ZOO_LIST="{{zoo_list}}"
export MESOS_HDFS_DATA_DIRS="{{hdfs_data_dirs}}"
export MESOS_MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}"
export MESOS_SPARK_LOCAL_DIRS="{{spark_local_dirs}}"
......@@ -94,17 +94,11 @@ def parse_args():
parser.add_option("--spot-price", metavar="PRICE", type="float",
help="If specified, launch slaves as spot instances with the given " +
"maximum price (in dollars)")
parser.add_option("--cluster-type", type="choice", metavar="TYPE",
choices=["mesos", "standalone"], default="standalone",
help="'mesos' for a Mesos cluster, 'standalone' for a standalone " +
"Spark cluster (default: standalone)")
parser.add_option("--ganglia", action="store_true", default=True,
help="Setup Ganglia monitoring on cluster (default: on). NOTE: " +
"the Ganglia page will be publicly accessible")
parser.add_option("--no-ganglia", action="store_false", dest="ganglia",
help="Disable Ganglia monitoring for the cluster")
parser.add_option("--old-scripts", action="store_true", default=False,
help="Use old mesos-ec2 scripts, for Spark <= 0.6 AMIs")
parser.add_option("-u", "--user", default="root",
help="The SSH user you want to connect as (default: root)")
parser.add_option("--delete-groups", action="store_true", default=False,
......@@ -119,9 +113,6 @@ def parse_args():
print >> stderr, ("ERROR: The -i or --identity-file argument is " +
"required for " + action)
sys.exit(1)
if opts.cluster_type not in ["mesos", "standalone"] and action == "launch":
print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type)
sys.exit(1)
# Boto config check
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html
......@@ -219,52 +210,38 @@ def get_spark_ami(opts):
# Launch a cluster of the given name, by setting up its security groups,
# and then starting new instances in them.
# Returns a tuple of EC2 reservation objects for the master, slave
# and zookeeper instances (in that order).
# Returns a tuple of EC2 reservation objects for the master and slaves
# Fails if there already instances running in the cluster's groups.
def launch_cluster(conn, opts, cluster_name):
print "Setting up security groups..."
master_group = get_or_make_group(conn, cluster_name + "-master")
slave_group = get_or_make_group(conn, cluster_name + "-slaves")
zoo_group = get_or_make_group(conn, cluster_name + "-zoo")
if master_group.rules == []: # Group was just now created
master_group.authorize(src_group=master_group)
master_group.authorize(src_group=slave_group)
master_group.authorize(src_group=zoo_group)
master_group.authorize('tcp', 22, 22, '0.0.0.0/0')
master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
if opts.cluster_type == "mesos":
master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
if opts.ganglia:
master_group.authorize('tcp', 5080, 5080, '0.0.0.0/0')
if slave_group.rules == []: # Group was just now created
slave_group.authorize(src_group=master_group)
slave_group.authorize(src_group=slave_group)
slave_group.authorize(src_group=zoo_group)
slave_group.authorize('tcp', 22, 22, '0.0.0.0/0')
slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0')
slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0')
slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
if zoo_group.rules == []: # Group was just now created
zoo_group.authorize(src_group=master_group)
zoo_group.authorize(src_group=slave_group)
zoo_group.authorize(src_group=zoo_group)
zoo_group.authorize('tcp', 22, 22, '0.0.0.0/0')
zoo_group.authorize('tcp', 2181, 2181, '0.0.0.0/0')
zoo_group.authorize('tcp', 2888, 2888, '0.0.0.0/0')
zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0')
# Check if instances are already running in our groups
active_nodes = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False)
if any(active_nodes):
print >> stderr, ("ERROR: There are already instances running in " +
"group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
"group %s, %s or %s" % (master_group.name, slave_group.name))
sys.exit(1)
# Figure out Spark AMI
......@@ -336,9 +313,9 @@ def launch_cluster(conn, opts, cluster_name):
print "Canceling spot instance requests"
conn.cancel_spot_instance_requests(my_req_ids)
# Log a warning if any of these requests actually launched instances:
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes)
running = len(master_nodes) + len(slave_nodes)
if running:
print >> stderr, ("WARNING: %d instances are still running" % running)
sys.exit(0)
......@@ -379,21 +356,17 @@ def launch_cluster(conn, opts, cluster_name):
master_nodes = master_res.instances
print "Launched master in %s, regid = %s" % (zone, master_res.id)
zoo_nodes = []
# Return all the instances
return (master_nodes, slave_nodes, zoo_nodes)
return (master_nodes, slave_nodes)
# Get the EC2 instances in an existing cluster if available.
# Returns a tuple of lists of EC2 instance objects for the masters,
# slaves and zookeeper nodes (in that order).
# Returns a tuple of lists of EC2 instance objects for the masters and slaves
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
print "Searching for existing cluster " + cluster_name + "..."
reservations = conn.get_all_instances()
master_nodes = []
slave_nodes = []
zoo_nodes = []
for res in reservations:
active = [i for i in res.instances if is_active(i)]
if len(active) > 0:
......@@ -402,13 +375,11 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
master_nodes += res.instances
elif group_names == [cluster_name + "-slaves"]:
slave_nodes += res.instances
elif group_names == [cluster_name + "-zoo"]:
zoo_nodes += res.instances
if any((master_nodes, slave_nodes, zoo_nodes)):
print ("Found %d master(s), %d slaves, %d ZooKeeper nodes" %
(len(master_nodes), len(slave_nodes), len(zoo_nodes)))
if any((master_nodes, slave_nodes)):
print ("Found %d master(s), %d slaves" %
(len(master_nodes), len(slave_nodes)))
if (master_nodes != [] and slave_nodes != []) or not die_on_error:
return (master_nodes, slave_nodes, zoo_nodes)
return (master_nodes, slave_nodes)
else:
if master_nodes == [] and slave_nodes != []:
print "ERROR: Could not find master in group " + cluster_name + "-master"
......@@ -421,7 +392,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_key):
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
print "Copying SSH key %s to master..." % opts.identity_file
......@@ -429,12 +400,8 @@ def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_k
scp(master, opts, opts.identity_file, '~/.ssh/id_rsa')
ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa')
if opts.cluster_type == "mesos":
modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'mesos']
elif opts.cluster_type == "standalone":
modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'spark-standalone']
modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'spark-standalone']
if opts.ganglia:
modules.append('ganglia')
......@@ -446,18 +413,12 @@ def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_k
ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/pwendell/spark-ec2.git -b ec2-updates")
print "Deploying files to master..."
deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes,
zoo_nodes, modules)
deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, modules)
print "Running setup on master..."
setup_spark_cluster(master, opts)
print "Done!"
def setup_mesos_cluster(master, opts):
ssh(master, opts, "chmod u+x mesos-ec2/setup")
ssh(master, opts, "mesos-ec2/setup %s %s %s %s" %
("generic", "none", "master", opts.swap))
def setup_standalone_cluster(master, slave_nodes, opts):
slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes])
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
......@@ -466,23 +427,18 @@ def setup_standalone_cluster(master, slave_nodes, opts):
def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
ssh(master, opts, "spark-ec2/setup.sh")
if opts.cluster_type == "mesos":
print "Mesos cluster started at http://%s:8080" % master
elif opts.cluster_type == "standalone":
print "Spark standalone cluster started at http://%s:8080" % master
print "Spark standalone cluster started at http://%s:8080" % master
if opts.ganglia:
print "Ganglia started at http://%s:5080/ganglia" % master
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes):
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
print "Waiting for instances to start up..."
time.sleep(5)
wait_for_instances(conn, master_nodes)
wait_for_instances(conn, slave_nodes)
if zoo_nodes != []:
wait_for_instances(conn, zoo_nodes)
print "Waiting %d more seconds..." % wait_secs
time.sleep(wait_secs)
......@@ -523,8 +479,7 @@ def get_num_disks(instance_type):
# cluster (e.g. lists of masters and slaves). Files are only deployed to
# the first master instance in the cluster, and we expect the setup
# script to be run on that instance to copy them to other nodes.
def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
modules):
def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
active_master = master_nodes[0].public_dns_name
num_disks = get_num_disks(opts.instance_type)
......@@ -537,16 +492,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
spark_local_dirs += ",/mnt%d/spark" % i
if zoo_nodes != []:
zoo_list = '\n'.join([i.public_dns_name for i in zoo_nodes])
cluster_url = "zoo://" + ",".join(
["%s:2181/mesos" % i.public_dns_name for i in zoo_nodes])
elif opts.cluster_type == "mesos":
zoo_list = "NONE"
cluster_url = "%s:5050" % active_master
elif opts.cluster_type == "standalone":
zoo_list = "NONE"
cluster_url = "%s:7077" % active_master
cluster_url = "%s:7077" % active_master
if "." in opts.spark_version:
(spark_v, shark_v) = get_spark_shark_version(opts)
......@@ -558,7 +504,6 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
"master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
"active_master": active_master,
"slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]),
"zoo_list": zoo_list,
"cluster_url": cluster_url,
"hdfs_data_dirs": hdfs_data_dirs,
"mapred_local_dirs": mapred_local_dirs,
......@@ -656,20 +601,20 @@ def main():
if action == "launch":
if opts.resume:
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name)
else:
(master_nodes, slave_nodes, zoo_nodes) = launch_cluster(
(master_nodes, slave_nodes) = launch_cluster(
conn, opts, cluster_name)
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, True)
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
setup_cluster(conn, master_nodes, slave_nodes, opts, True)
elif action == "destroy":
response = raw_input("Are you sure you want to destroy the cluster " +
cluster_name + "?\nALL DATA ON ALL NODES WILL BE LOST!!\n" +
"Destroy cluster " + cluster_name + " (y/N): ")
if response == "y":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
print "Terminating master..."
for inst in master_nodes:
......@@ -677,15 +622,11 @@ def main():
print "Terminating slaves..."
for inst in slave_nodes:
inst.terminate()
if zoo_nodes != []:
print "Terminating zoo..."
for inst in zoo_nodes:
inst.terminate()
# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"]
group_names = [cluster_name + "-master", cluster_name + "-slaves"]
attempt = 1;
while attempt <= 3:
......@@ -725,7 +666,7 @@ def main():
print "Try re-running in a few minutes."
elif action == "login":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name)
master = master_nodes[0].public_dns_name
print "Logging into master " + master + "..."
......@@ -736,7 +677,7 @@ def main():
(opts.identity_file, proxy_opt, opts.user, master), shell=True)
elif action == "get-master":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(conn, opts, cluster_name)
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
print master_nodes[0].public_dns_name
elif action == "stop":
......@@ -746,7 +687,7 @@ def main():
"AMAZON EBS IF IT IS EBS-BACKED!!\n" +
"Stop cluster " + cluster_name + " (y/N): ")
if response == "y":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
print "Stopping master..."
for inst in master_nodes:
......@@ -756,15 +697,9 @@ def main():
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.stop()
if zoo_nodes != []:
print "Stopping zoo..."
for inst in zoo_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.stop()
elif action == "start":
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
conn, opts, cluster_name)
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
print "Starting slaves..."
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
......@@ -773,13 +708,8 @@ def main():
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
if zoo_nodes != []:
print "Starting zoo..."
for inst in zoo_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, False)
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
setup_cluster(conn, master_nodes, slave_nodes, opts, False)
else:
print >> stderr, "Invalid action: %s" % action
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment