Skip to content
Snippets Groups Projects
Commit 207548b6 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Open up Job UI ports (33000-33010) on EC2 clusters

parent c99b6744
No related branches found
No related tags found
No related merge requests found
...@@ -9,9 +9,9 @@ ...@@ -9,9 +9,9 @@
# to you under the Apache License, Version 2.0 (the # to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance # "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at # with the License. You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -53,7 +53,7 @@ def parse_args(): ...@@ -53,7 +53,7 @@ def parse_args():
help="Seconds to wait for nodes to start (default: 120)") help="Seconds to wait for nodes to start (default: 120)")
parser.add_option("-k", "--key-pair", parser.add_option("-k", "--key-pair",
help="Key pair to use on instances") help="Key pair to use on instances")
parser.add_option("-i", "--identity-file", parser.add_option("-i", "--identity-file",
help="SSH private key file to use for logging into instances") help="SSH private key file to use for logging into instances")
parser.add_option("-t", "--instance-type", default="m1.large", parser.add_option("-t", "--instance-type", default="m1.large",
help="Type of instance to launch (default: m1.large). " + help="Type of instance to launch (default: m1.large). " +
...@@ -69,7 +69,7 @@ def parse_args(): ...@@ -69,7 +69,7 @@ def parse_args():
parser.add_option("-a", "--ami", default="latest", parser.add_option("-a", "--ami", default="latest",
help="Amazon Machine Image ID to use, or 'latest' to use latest " + help="Amazon Machine Image ID to use, or 'latest' to use latest " +
"available AMI (default: latest)") "available AMI (default: latest)")
parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port", parser.add_option("-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
help="Use SSH dynamic port forwarding to create a SOCKS proxy at " + help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
"the given local address (for use with login)") "the given local address (for use with login)")
parser.add_option("--resume", action="store_true", default=False, parser.add_option("--resume", action="store_true", default=False,
...@@ -99,7 +99,7 @@ def parse_args(): ...@@ -99,7 +99,7 @@ def parse_args():
help="The SSH user you want to connect as (default: root)") help="The SSH user you want to connect as (default: root)")
parser.add_option("--delete-groups", action="store_true", default=False, parser.add_option("--delete-groups", action="store_true", default=False,
help="When destroying a cluster, delete the security groups that were created") help="When destroying a cluster, delete the security groups that were created")
(opts, args) = parser.parse_args() (opts, args) = parser.parse_args()
if len(args) != 2: if len(args) != 2:
parser.print_help() parser.print_help()
...@@ -112,7 +112,7 @@ def parse_args(): ...@@ -112,7 +112,7 @@ def parse_args():
if opts.cluster_type not in ["mesos", "standalone"] and action == "launch": if opts.cluster_type not in ["mesos", "standalone"] and action == "launch":
print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type) print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type)
sys.exit(1) sys.exit(1)
# Boto config check # Boto config check
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html # http://boto.cloudhackers.com/en/latest/boto_config_tut.html
home_dir = os.getenv('HOME') home_dir = os.getenv('HOME')
...@@ -178,6 +178,7 @@ def launch_cluster(conn, opts, cluster_name): ...@@ -178,6 +178,7 @@ def launch_cluster(conn, opts, cluster_name):
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0') master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0') master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0') master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
master_group.authorize('tcp', 33000, 33010, '0.0.0.0/0')
if opts.cluster_type == "mesos": if opts.cluster_type == "mesos":
master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0') master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
if opts.ganglia: if opts.ganglia:
...@@ -257,7 +258,7 @@ def launch_cluster(conn, opts, cluster_name): ...@@ -257,7 +258,7 @@ def launch_cluster(conn, opts, cluster_name):
block_device_map = block_map) block_device_map = block_map)
my_req_ids += [req.id for req in slave_reqs] my_req_ids += [req.id for req in slave_reqs]
i += 1 i += 1
print "Waiting for spot instances to be granted..." print "Waiting for spot instances to be granted..."
try: try:
while True: while True:
...@@ -413,7 +414,7 @@ def setup_standalone_cluster(master, slave_nodes, opts): ...@@ -413,7 +414,7 @@ def setup_standalone_cluster(master, slave_nodes, opts):
slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes]) slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes])
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips)) ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
ssh(master, opts, "/root/spark/bin/start-all.sh") ssh(master, opts, "/root/spark/bin/start-all.sh")
def setup_spark_cluster(master, opts): def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh") ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
ssh(master, opts, "spark-ec2/setup.sh") ssh(master, opts, "spark-ec2/setup.sh")
...@@ -528,7 +529,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes, ...@@ -528,7 +529,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes,
dest.write(text) dest.write(text)
dest.close() dest.close()
# rsync the whole directory over to the master machine # rsync the whole directory over to the master machine
command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " + command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
"'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master)) "'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master))
subprocess.check_call(command, shell=True) subprocess.check_call(command, shell=True)
# Remove the temp directory we created above # Remove the temp directory we created above
...@@ -557,9 +558,9 @@ def ssh(host, opts, command): ...@@ -557,9 +558,9 @@ def ssh(host, opts, command):
print "Error connecting to host {0}, sleeping 30".format(e) print "Error connecting to host {0}, sleeping 30".format(e)
time.sleep(30) time.sleep(30)
tries = tries + 1 tries = tries + 1
# Gets a list of zones to launch instances in # Gets a list of zones to launch instances in
...@@ -618,12 +619,12 @@ def main(): ...@@ -618,12 +619,12 @@ def main():
print "Terminating zoo..." print "Terminating zoo..."
for inst in zoo_nodes: for inst in zoo_nodes:
inst.terminate() inst.terminate()
# Delete security groups as well # Delete security groups as well
if opts.delete_groups: if opts.delete_groups:
print "Deleting security groups (this will take some time)..." print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"] group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"]
attempt = 1; attempt = 1;
while attempt <= 3: while attempt <= 3:
print "Attempt %d" % attempt print "Attempt %d" % attempt
...@@ -639,7 +640,7 @@ def main(): ...@@ -639,7 +640,7 @@ def main():
from_port=rule.from_port, from_port=rule.from_port,
to_port=rule.to_port, to_port=rule.to_port,
src_group=grant) src_group=grant)
# Sleep for AWS eventual-consistency to catch up, and for instances # Sleep for AWS eventual-consistency to catch up, and for instances
# to terminate # to terminate
time.sleep(30) # Yes, it does have to be this long :-( time.sleep(30) # Yes, it does have to be this long :-(
...@@ -650,13 +651,13 @@ def main(): ...@@ -650,13 +651,13 @@ def main():
except boto.exception.EC2ResponseError: except boto.exception.EC2ResponseError:
success = False; success = False;
print "Failed to delete security group " + group.name print "Failed to delete security group " + group.name
# Unfortunately, group.revoke() returns True even if a rule was not # Unfortunately, group.revoke() returns True even if a rule was not
# deleted, so this needs to be rerun if something fails # deleted, so this needs to be rerun if something fails
if success: break; if success: break;
attempt += 1 attempt += 1
if not success: if not success:
print "Failed to delete all security groups after 3 tries." print "Failed to delete all security groups after 3 tries."
print "Try re-running in a few minutes." print "Try re-running in a few minutes."
...@@ -679,7 +680,7 @@ def main(): ...@@ -679,7 +680,7 @@ def main():
elif action == "stop": elif action == "stop":
response = raw_input("Are you sure you want to stop the cluster " + response = raw_input("Are you sure you want to stop the cluster " +
cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
"BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
"AMAZON EBS IF IT IS EBS-BACKED!!\n" + "AMAZON EBS IF IT IS EBS-BACKED!!\n" +
"Stop cluster " + cluster_name + " (y/N): ") "Stop cluster " + cluster_name + " (y/N): ")
if response == "y": if response == "y":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment