Skip to content
Snippets Groups Projects
Commit 2a907dce authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #421 from shivaram/spark-ec2-change

Switch spark_ec2.py to use the new spark-ec2 scripts. 
parents 340cc54e bf675ab4
No related branches found
No related tags found
No related merge requests found
#!/bin/bash
# These variables are automatically filled in by the mesos-ec2 script.
export MESOS_MASTERS="{{master_list}}"
export MESOS_SLAVES="{{slave_list}}"
export MESOS_ZOO_LIST="{{zoo_list}}"
export MESOS_HDFS_DATA_DIRS="{{hdfs_data_dirs}}"
export MESOS_MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}"
export MESOS_SPARK_LOCAL_DIRS="{{spark_local_dirs}}"
export MODULES="{{modules}}"
export SWAP_MB="{{swap}}"
......@@ -84,6 +84,12 @@ def parse_args():
"maximum price (in dollars)")
parser.add_option("-c", "--cluster-type", default="mesos",
help="'mesos' for a mesos cluster, 'standalone' for a standalone spark cluster (default: mesos)")
parser.add_option("-g", "--ganglia", action="store_true", default=True,
help="Setup ganglia monitoring for the cluster. NOTE: The ganglia " +
"monitoring page will be publicly accessible")
parser.add_option("--mesos-scripts", action="store_true", default=False,
help="Use older mesos-ec2 scripts to setup the cluster. NOTE: Ganglia " +
"will not be setup with this option")
parser.add_option("-u", "--user", default="root",
help="The ssh user you want to connect as (default: root)")
parser.add_option("--delete-groups", action="store_true", default=False,
......@@ -164,22 +170,23 @@ def launch_cluster(conn, opts, cluster_name):
master_group.authorize(src_group=zoo_group)
master_group.authorize('tcp', 22, 22, '0.0.0.0/0')
master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
if opts.cluster_type == "mesos":
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
if opts.ganglia:
master_group.authorize('tcp', 80, 80, '0.0.0.0/0')
if slave_group.rules == []: # Group was just now created
slave_group.authorize(src_group=master_group)
slave_group.authorize(src_group=slave_group)
slave_group.authorize(src_group=zoo_group)
slave_group.authorize('tcp', 22, 22, '0.0.0.0/0')
slave_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
if opts.cluster_type == "mesos":
slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0')
slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0')
slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
slave_group.authorize('tcp', 50060, 50060, '0.0.0.0/0')
slave_group.authorize('tcp', 50075, 50075, '0.0.0.0/0')
slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
if zoo_group.rules == []: # Group was just now created
zoo_group.authorize(src_group=master_group)
zoo_group.authorize(src_group=slave_group)
......@@ -358,19 +365,38 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_key):
print "Deploying files to master..."
deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, zoo_nodes)
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
print "Copying SSH key %s to master..." % opts.identity_file
ssh(master, opts, 'mkdir -p ~/.ssh')
scp(master, opts, opts.identity_file, '~/.ssh/id_rsa')
ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa')
print "Running setup on master..."
if opts.cluster_type == "mesos":
setup_mesos_cluster(master, opts)
modules = ['ephemeral-hdfs', 'persistent-hdfs', 'mesos']
elif opts.cluster_type == "standalone":
setup_standalone_cluster(master, slave_nodes, opts)
modules = ['ephemeral-hdfs', 'persistent-hdfs', 'spark-standalone']
if opts.ganglia:
modules.append('ganglia')
if not opts.mesos_scripts:
# NOTE: We should clone the repository before running deploy_files to
# prevent ec2-variables.sh from being overwritten
ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/shivaram/spark-ec2.git")
print "Deploying files to master..."
deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes,
zoo_nodes, modules)
print "Running setup on master..."
if opts.mesos_scripts:
if opts.cluster_type == "mesos":
setup_mesos_cluster(master, opts)
elif opts.cluster_type == "standalone":
setup_standalone_cluster(master, slave_nodes, opts)
else:
setup_spark_cluster(master, opts)
print "Done!"
def setup_mesos_cluster(master, opts):
......@@ -383,6 +409,10 @@ def setup_standalone_cluster(master, slave_nodes, opts):
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
ssh(master, opts, "/root/spark/bin/start-all.sh")
def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
ssh(master, opts, "spark-ec2/setup.sh")
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes):
......@@ -427,7 +457,8 @@ def get_num_disks(instance_type):
# cluster (e.g. lists of masters and slaves). Files are only deployed to
# the first master instance in the cluster, and we expect the setup
# script to be run on that instance to copy them to other nodes.
def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
modules):
active_master = master_nodes[0].public_dns_name
num_disks = get_num_disks(opts.instance_type)
......@@ -459,7 +490,9 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
"cluster_url": cluster_url,
"hdfs_data_dirs": hdfs_data_dirs,
"mapred_local_dirs": mapred_local_dirs,
"spark_local_dirs": spark_local_dirs
"spark_local_dirs": spark_local_dirs,
"swap": str(opts.swap),
"modules": '\n'.join(modules)
}
# Create a temp directory in which we will place all the files to be
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment